> 文章列表 > DQN基本概念和算法流程(附Pytorch代码)

DQN基本概念和算法流程(附Pytorch代码)

DQN基本概念和算法流程(附Pytorch代码)

❀DQN算法原理

DQN,Deep Q Network本质上还是Q learning算法,它的算法精髓还是让Q估计Q_{估计}Q估计尽可能接近Q现实Q_{现实}Q现实,或者说是让当前状态下预测的Q值跟基于过去经验的Q值尽可能接近。在后面的介绍中Q现实Q_{现实}Q现实也被称为TD Target

再来回顾下DQN算法和核心思想
DQN基本概念和算法流程(附Pytorch代码)

相比于Q Table形式,DQN算法用神经网络学习Q值。
DQN基本概念和算法流程(附Pytorch代码)

我们可以理解为神经网络是一种估计方法,神经网络本身不是DQN的精髓,神经网络可以设计成MLP也可以设计成CNN等等,DQN的巧妙之处在于两个网络、经验回放等trick

下面介绍下DQN算法的一些trick,是希望帮助小伙伴们梳理区分两个网络的作用,阐述清楚经验回放等概念的本质,以及使用它们训练网络的技巧

Trick 1:两个网络

DQN算法采用了2个神经网络,分别是evaluate network(Q值网络)和target network(目标网络),两个网络结构完全相同

  • evaluate network用用来计算策略选择的Q值和Q值迭代更新,梯度下降、反向传播的也是evaluate network
  • target network用来计算TD Target中下一状态的Q值,网络参数更新来自evaluate network网络参数复制

设计target network目的是为了保持目标值稳定,防止过拟合,从而提高训练过程稳定和收敛速度

这里会有容易混淆的地方,梯度更新的是evaluate network的参数,不更新target network,然后每隔一段时间将evaluate network的网络参数复制给target network网络参数,那么优化器optimizer设置的时候用的也是evaluate network的parameters

Trick 2:基本框架

算法分成两个部分,分别是策略选择和策略评估,这也是强化学习算法基本的两个模块,梳理算法逻辑的时候从策略选择和策略评估两个方面入手,更容易弄清楚。策略选择部分,epsilon-greedy策略选择动作,策略评估部分使用贪婪策略

Trick 3:经验回放Experience Replay

DQN算法设计了一个固定大小的记忆库memory,用来记录经验,经验是一条一条的observation或者说是transition,它表示成[s,a,r,s′][s, a, r, s'][s,a,r,s],含义是当前状态→当前状态采取的动作→获得的奖励→转移到下一个状态

一开始记忆库memory中没有经验,也没有训练evaluate network,积累了一定数量的经验之后,再开始训练evaluate network。记忆库memory中的经验可以是自己历史的经验(epsilon-greedy得到的经验),也可以学习其他人的经验。训练evaluate network的时候,是从记忆库memory中随机选择(划重点哦,是随机选择!)batch size大小的经验,喂给evaluate network

设计记忆库memory并且随机选择经验喂给evaluate network的技巧打破了相邻训练样本之间相关性,试着想下,状态→动作→奖励→下一个状态的循环是具有关联的,用相邻的样本连续训练evaluate network会带来网络过拟合泛化能力差的问题,而经验回放技巧增强了训练样本之间的独立性

❀算法流程图

每个episode流程是下面这样
DQN基本概念和算法流程(附Pytorch代码)

其中choose_action、store_transition、learn是相互独立的函数模块,它们内部的算法逻辑是下面这样
DQN基本概念和算法流程(附Pytorch代码)
DQN基本概念和算法流程(附Pytorch代码)
DQN基本概念和算法流程(附Pytorch代码)

❀Pytorch版本代码

采用Pytorch实现了DQN算法,完成了走迷宫Maze游戏,哈哈哈,这个游戏来自莫烦Python教程,代码嘛是自己修改过哒,代码贴在github上啦

ningmengzhihe/DQN_base: DQN algorithm by Pytorch - a simple maze game https://github.com/ningmengzhihe/DQN_base

(1)环境构建代码maze_env.py


import numpy as np
import time
import sys
if sys.version_info.major == 2:import Tkinter as tk
else:import tkinter as tkUNIT = 40   # pixels
MAZE_H = 4  # grid height
MAZE_W = 4  # grid widthclass Maze(tk.Tk, object):def __init__(self):super(Maze, self).__init__()self.action_space = ['u', 'd', 'l', 'r']self.n_actions = len(self.action_space)self.n_features = 2self.title('maze')self.geometry('{0}x{1}'.format(MAZE_W * UNIT, MAZE_H * UNIT))self._build_maze()def _build_maze(self):self.canvas = tk.Canvas(self, bg='white',height=MAZE_H * UNIT,width=MAZE_W * UNIT)# create gridsfor c in range(0, MAZE_W * UNIT, UNIT):x0, y0, x1, y1 = c, 0, c, MAZE_H * UNITself.canvas.create_line(x0, y0, x1, y1)for r in range(0, MAZE_H * UNIT, UNIT):x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, rself.canvas.create_line(x0, y0, x1, y1)# create originorigin = np.array([20, 20])# hellhell1_center = origin + np.array([UNIT * 2, UNIT])self.hell1 = self.canvas.create_rectangle(hell1_center[0] - 15, hell1_center[1] - 15,hell1_center[0] + 15, hell1_center[1] + 15,fill='black')# hell# hell2_center = origin + np.array([UNIT, UNIT * 2])# self.hell2 = self.canvas.create_rectangle(#     hell2_center[0] - 15, hell2_center[1] - 15,#     hell2_center[0] + 15, hell2_center[1] + 15,#     fill='black')# create ovaloval_center = origin + UNIT * 2self.oval = self.canvas.create_oval(oval_center[0] - 15, oval_center[1] - 15,oval_center[0] + 15, oval_center[1] + 15,fill='yellow')# create red rectself.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# pack allself.canvas.pack()def reset(self):self.update()time.sleep(0.1)self.canvas.delete(self.rect)origin = np.array([20, 20])self.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# return observationreturn (np.array(self.canvas.coords(self.rect)[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)def step(self, action):s = self.canvas.coords(self.rect)base_action = np.array([0, 0])if action == 0:   # upif s[1] > UNIT:base_action[1] -= UNITelif action == 1:   # downif s[1] < (MAZE_H - 1) * UNIT:base_action[1] += UNITelif action == 2:   # rightif s[0] < (MAZE_W - 1) * UNIT:base_action[0] += UNITelif action == 3:   # leftif s[0] > UNIT:base_action[0] -= UNITself.canvas.move(self.rect, base_action[0], base_action[1])  # move agentnext_coords = self.canvas.coords(self.rect)  # next state# reward functionif next_coords == self.canvas.coords(self.oval):reward = 1done = Trueelif next_coords in [self.canvas.coords(self.hell1)]:reward = -1done = Trueelse:reward = 0done = Falses_ = (np.array(next_coords[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)return s_, reward, donedef render(self):# time.sleep(0.01)self.update()

(2)DQN算法代码,包括神经网络定义、Q值更新:RL_brain.py

"""
Deep Q Network off-policy
"""
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as pltnp.random.seed(42)
torch.manual_seed(2)class Network(nn.Module):"""Network Structure"""def __init__(self,n_features,n_actions,n_neuron=10):super(Network, self).__init__()self.net = nn.Sequential(nn.Linear(in_features=n_features, out_features=n_neuron, bias=True),nn.Linear(in_features=n_neuron, out_features=n_actions, bias=True),nn.ReLU())def forward(self, s):""":param s: s:return: q"""q = self.net(s)return qclass DeepQNetwork(nn.Module):"""Q Learning Algorithm"""def __init__(self,n_actions,n_features,learning_rate=0.01,reward_decay=0.9,e_greedy=0.9,replace_target_iter=300,memory_size=500,batch_size=32,e_greedy_increment=None):super(DeepQNetwork, self).__init__()self.n_actions = n_actionsself.n_features = n_featuresself.lr = learning_rateself.gamma = reward_decayself.epsilon_max = e_greedyself.replace_target_iter = replace_target_iterself.memory_size = memory_sizeself.batch_size = batch_sizeself.epsilon_increment = e_greedy_incrementself.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max# total learning stepself.learn_step_counter = 0# initialize zero memory [s, a, r, s_]# 这里用pd.DataFrame创建的表格作为memory# 表格的行数是memory的大小,也就是transition的个数# 表格的列数是transition的长度,一个transition包含[s, a, r, s_],其中a和r分别是一个数字,s和s_的长度分别是n_featuresself.memory = pd.DataFrame(np.zeros((self.memory_size, self.n_features*2+2)))# build two network: eval_net and target_netself.eval_net = Network(n_features=self.n_features, n_actions=self.n_actions)self.target_net = Network(n_features=self.n_features, n_actions=self.n_actions)self.loss_function = nn.MSELoss()self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=self.lr)# 记录每一步的误差self.cost_his = []def store_transition(self, s, a, r, s_):if not hasattr(self, 'memory_counter'):# hasattr用于判断对象是否包含对应的属性。self.memory_counter = 0transition = np.hstack((s, [a,r], s_))# replace the old memory with new memoryindex = self.memory_counter % self.memory_sizeself.memory.iloc[index, :] = transitionself.memory_counter += 1def choose_action(self, observation):observation = observation[np.newaxis, :]if np.random.uniform() < self.epsilon:# forward feed the observation and get q value for every actionss = torch.FloatTensor(observation)actions_value = self.eval_net(s)action = [np.argmax(actions_value.detach().numpy())][0]else:action = np.random.randint(0, self.n_actions)return actiondef _replace_target_params(self):# 复制网络参数self.target_net.load_state_dict(self.eval_net.state_dict())def learn(self):# check to replace target parametersif self.learn_step_counter % self.replace_target_iter == 0:self._replace_target_params()print('\\ntarget params replaced\\n')# sample batch memory from all memorybatch_memory = self.memory.sample(self.batch_size) \\if self.memory_counter > self.memory_size \\else self.memory.iloc[:self.memory_counter].sample(self.batch_size, replace=True)# run the nextworks = torch.FloatTensor(batch_memory.iloc[:, :self.n_features].values)s_ = torch.FloatTensor(batch_memory.iloc[:, -self.n_features:].values)q_eval = self.eval_net(s)q_next = self.target_net(s_)# change q_target w.r.t q_eval's actionq_target = q_eval.clone()# 更新值batch_index = np.arange(self.batch_size, dtype=np.int32)eval_act_index = batch_memory.iloc[:, self.n_features].values.astype(int)reward = batch_memory.iloc[:, self.n_features + 1].valuesq_target[batch_index, eval_act_index] = torch.FloatTensor(reward) + self.gamma * q_next.max(dim=1).values# train eval networkloss = self.loss_function(q_target, q_eval)self.optimizer.zero_grad()loss.backward()self.optimizer.step()self.cost_his.append(loss.detach().numpy())# increasing epsilonself.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_maxself.learn_step_counter += 1def plot_cost(self):plt.figure()plt.plot(np.arange(len(self.cost_his)), self.cost_his)plt.show()

(3)每个episode代码:run_this.py

from maze_env import Maze
from RL_brain import DeepQNetworkdef run_maze():step = 0  # 为了记录走到第几步,记忆录中积累经验(也就是积累一些transition)之后再开始学习for episode in range(200):# initial observationobservation = env.reset()while True:# refresh envenv.render()# RL choose action based on observationaction = RL.choose_action(observation)# RL take action and get next observation and rewardobservation_, reward, done = env.step(action)# !! restore transitionRL.store_transition(observation, action, reward, observation_)# 超过200条transition之后每隔5步学习一次if (step > 200) and (step % 5 == 0):RL.learn()# swap observationobservation = observation_# break while loop when end of this episodeif done:breakstep += 1# end of gameprint("game over")env.destroy()if __name__ == "__main__":# maze gameenv = Maze()RL = DeepQNetwork(env.n_actions, env.n_features,learning_rate=0.01,reward_decay=0.9,e_greedy=0.9,replace_target_iter=200,memory_size=2000)env.after(100, run_maze)env.mainloop()RL.plot_cost()

❀参考资料

https://zhuanlan.zhihu.com/p/614697168
这份参考资料清晰的解释了2个Q值网络,pytorch代码值得参考

https://www.bilibili.com/video/BV13W411Y75P?p=14&vd_source=1565223f5f03f44f5674538ab582448c
莫烦Python在B站上的DQN教程