作为刚入门强化学习的小白,最近几天在写一些基础的代码,使用DQN训练CartPole问题。
DQN是2013年DeepMind提出来的使用Q-learning与神经网络相结合的方法,其实和Q-learning的思想相同,只不过是计算的时候使用神经网络计算Q值。Q-learning简要说一下,就是使用函数逼近的方法,在选择动作时使用epsilon-greedy的方法,在更新Q函数的时候使用Qmax。
这份代码参考了:
https://blog.csdn.net/xinbolai1993/article/details/78280732 博主讲的很详细,感谢~
再插一句,初始化参数和调参也是一个技术活,会影响到更新的速度,我一开始b初始化为0.01,参数更新步长为0.001,结果得2000个episode才能训练完(Average Reward=200),但是我把b初始化为0.1,更新步长为0.005,平均600个episode就能训练好。
全部代码如下:
import tensorflow as tf
import numpy as np
from collections import deque
import random
import gym
EPISODE = 10000
class DeepQNetwork:
learning_rate = 0.001
gamma = 0.9
action_list = None
# 执行步数
step_index = 0
# epsilon的范围
initial_epsilon = 0.5
final_epsilon = 0.01
explore = 10000
# 经验回放存储
memory_size = 10000
BATCH = 32
# 神经网络
state_input = None
Q_val = None
y_input = None
optimizer = None
cost = 0
session = tf.Session()
cost_history = []
def __init__(self, env):
self.replay_memory_store = deque()
self.state_dim = env.observation_space.shape[0]
self.action_dim = env.action_space.n
self.action_list = np.identity(self.action_dim)
self.epsilon = self.initial_epsilon # epsilon_greedy-policy
self.create_network()
self.create_training_method()
self.session = tf.InteractiveSession()
self.session.run(tf.global_variables_initializer())
def print_loss(self):
import matplotlib.pyplot as plt
print(len(self.cost_history))
plt.plot(np.arange(len(self.cost_history)), self.cost_history)
plt.ylabel('Cost')
plt.xlabel('step')
plt.show()
def create_network(self):
self.state_input = tf.placeholder(shape=[None, self.state_dim], dtype=tf.float32)
# 第一层
neuro_layer_1 = 20
w1 = tf.Variable(tf.random_normal([self.state_dim, neuro_layer_1]))
b1 = tf.Variable(tf.zeros([neuro_layer_1]) + 0.1)
l1 = tf.nn.relu(tf.matmul(self.state_input, w1) + b1)
# 第二层
w2 = tf.Variable(tf.random_normal([neuro_layer_1, self.action_dim]))
b2 = tf.Variable(tf.zeros([self.action_dim]) + 0.1)
# 输出层
self.Q_val = tf.matmul(l1, w2) + b2
def egreedy_action(self, state):
self.epsilon -= (0.5 - 0.01) / 10000
Q_val_output = self.session.run(self.Q_val, feed_dict={self.state_input: [state]})[0]
if random.random() <= self.epsilon:
return random.randint(0, self.action_dim - 1) # 左闭右闭区间,np.random.randint为左闭右开区间
else:
return np.argmax(Q_val_output)
def max_action(self, state):
Q_val_output = self.session.run(self.Q_val, feed_dict={self.state_input: [state]})[0]
action = np.argmax(Q_val_output)
return action
def create_training_method(self):
self.action_input = tf.placeholder(shape=[None, self.action_dim], dtype=tf.float32)
self.y_input = tf.placeholder(shape=[None], dtype=tf.float32) # ???是[None]吗?
Q_action = tf.reduce_sum(tf.multiply(self.Q_val, self.action_input), reduction_indices=1)
self.loss = tf.reduce_mean(tf.square(self.y_input - Q_action))
self.optimizer = tf.train.AdamOptimizer(0.0005).minimize(self.loss)
def perceive(self, state, action, reward, next_state, done):
cur_action = self.action_list[action:action + 1]
self.replay_memory_store.append((state, cur_action[0], reward, next_state, done))
if len(self.replay_memory_store) > self.memory_size:
self.replay_memory_store.popleft()
if len(self.replay_memory_store) > self.BATCH:
self.train_Q_network()
def train_Q_network(self):
self.step_index += 1
# obtain random mini-batch from replay memory
mini_batch = random.sample(self.replay_memory_store, self.BATCH)
state_batch = [data[0] for data in mini_batch]
action_batch = [data[1] for data in mini_batch]
reward_batch = [data[2] for data in mini_batch]
next_state_batch = [data[3] for data in mini_batch]
# calculate y
y_batch = []
Q_val_batch = self.session.run(self.Q_val, feed_dict={self.state_input: next_state_batch}) # 预估下一个状态的Q值
for i in range(0, self.BATCH):
done = mini_batch[i][4]
if done:
y_batch.append(reward_batch[i])
else:
y_batch.append(reward_batch[i] + self.gamma * np.max(Q_val_batch[i])) # 选择最优的Q函数进行更新
_, self.cost = self.session.run([self.optimizer, self.loss], feed_dict={
self.y_input: y_batch,
self.state_input: state_batch,
self.action_input: action_batch
})
TEST = 10
STEP = 300
def main():
env = gym.make('CartPole-v0')
agent = DeepQNetwork(env)
for i in range(EPISODE):
# if i % 50 == 0:
# env.render()
# print(i)
state = env.reset()
for step in range(STEP):
# env.render()
action = agent.egreedy_action(state)
next_state, reward, done, _ = env.step(action)
agent.perceive(state, action, reward, next_state, done)
state = next_state
if done:
break
if i % 100 == 0:
total_reward = 0
for _ in range(TEST):
state = env.reset()
for _ in range(STEP):
env.render()
action = agent.max_action(state) # direct action for test
state, reward, done, _ = env.step(action)
total_reward += reward
if done:
break
ave_reward = total_reward / TEST
print('episode: ', i, 'Evaluation Average Reward:', ave_reward)
if ave_reward >= 200:
break
# agent.print_loss()
if __name__ == '__main__':
main()
训练结果如下: