参考用keras搭建DQN和莫凡的git
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110#RL_brain_diy.py from tensorflow import keras import numpy as np,time class DeepQNetwork: def __init__( self, n_actions, n_features, is_train=True, load_model=True, learning_rate = .01, reward_decay = 0.9, e_greedy = 0.9, replace_target_iter = 300, memory_size = 500, batch_size = 32, # 第一次运行设置1e-3,之后设置None e_greedy_increment = None, # e_greedy_increment = 1e-3, output_graph = False, first_layer_neurno = 8, second_layer_neurno = 1): self.n_actions=n_actions self.n_features=n_features self.lr = learning_rate self.gamma = reward_decay self.replace_target_iter=replace_target_iter self.learn_step_counter=1 self.memory_size = memory_size self.memory=np.zeros((memory_size,2+2*n_features)) self.memory_count=0 self.batch_size = batch_size self.epsilon_max = e_greedy self.epsilon = 0 if e_greedy_increment is not None else e_greedy self.epsilon_increment=e_greedy_increment print(self.epsilon,self.epsilon_increment) self.output_graph = output_graph self.output_1=first_layer_neurno self.output_2=second_layer_neurno self.is_train = is_train self._build_net() if load_model:self.load_model() def _build_net(self): self.model_eval=keras.Sequential() d1=keras.layers.Dense(self.output_1, input_shape=(self.n_features,), activation='relu') self.model_eval.add(d1) d2 = keras.layers.Dense(self.n_actions) self.model_eval.add(d2) optimizer=keras.optimizers.Adam(learning_rate=3e-3) # optimizer =keras.optimizers.RMSprop(lr=self.lr, rho=0.9, epsilon=1e-08, decay=0.0) # optimizer =keras.optimizers.RMSprop(lr=self.lr) self.model_eval.compile(loss='mse',optimizer=optimizer) self.model_target = keras.Sequential() d1=keras.layers.Dense(self.output_1, input_shape=(self.n_features,), activation='relu') self.model_target.add(d1) d2 = keras.layers.Dense(self.n_actions) self.model_target.add(d2) def store_transition(self,observation, action, reward, observation_): index=self.memory_count%self.memory_size data=np.hstack((observation, action, reward, observation_)) self.memory[index]=data self.memory_count += 1 def choose_action(self,observation): if np.random.uniform()<self.epsilon: observation = observation[np.newaxis, :] actions_value=self.model_eval.predict(observation) # action=np.argmax(actions_value) # 如果最大值有多个,从中随机取 action=np.random.choice(np.where(actions_value == np.max(actions_value))[1]) print(actions_value, action, observation) else: action = np.random.randint(0,self.n_actions) print('random>>', action, observation) return action def learn(self): if self.learn_step_counter%self.replace_target_iter==0: self.model_target.set_weights(self.model_eval.get_weights()) # print('t target_params_replacedn') sample_index=np.random.choice(min(self.memory_count,self.memory_size),self.batch_size) batch=self.memory[sample_index] o_s=batch[:,:self.n_features] a_s=batch[:,self.n_features].astype(int) r_s=batch[:,1+self.n_features] o_s_=batch[:,-self.n_features:] q_next=self.model_target.predict(o_s_ ,batch_size=self.batch_size) q_eval = self.model_eval.predict(o_s,batch_size=self.batch_size) # q_target = q_eval.copy() q_target = q_eval target_part=r_s+self.gamma * np.max(q_next,axis=1) q_target[range(self.batch_size),a_s]=target_part self.model_eval.train_on_batch(o_s,q_target) if self.epsilon < self.epsilon_max: self.epsilon +=self.epsilon_increment self.learn_step_counter+=1 def plot_cost(self): pass def save_model(self): self.model_eval.save_weights('model_eval.h5') self.model_target.save_weights('model_target.h5') def load_model(self): try: self.model_eval.load_weights('model_eval.h5') self.model_target.load_weights('model_target.h5') except Exception: print('没找到模型,不加载')
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57#run_this.py from dqn.day2.maze_env import Maze # from dqn.day2.RL_brain import DeepQNetwork # from dqn.day2.RL_brain_Keras import DeepQNetwork from dqn.day2.RL_brain_diy import DeepQNetwork import numpy as np def run_maze(): step = 0 success,acc=0,.01 for episode in range(100): # initial observation observation = env.reset() if episode>0: acc=success/episode print('episode:',episode,'success:',acc) while True: # fresh env env.render() # RL choose action based on observation action = RL.choose_action(observation) # RL take action and get next observation and reward observation_, reward, done = env.step(action) if reward==1:success+=1 RL.store_transition(observation, action, reward, observation_) if (step > 20) and (step % 5 == 0): RL.learn() # swap observation observation = observation_ # break while loop when end of this episode if done: break step += 1 # end of game print('game over') env.destroy() if __name__ == "__main__": # maze game env = Maze() RL = DeepQNetwork(env.n_actions, env.n_features, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9, replace_target_iter=200, memory_size=2000, # output_graph=True ) env.after(100, run_maze) env.mainloop() RL.save_model() RL.plot_cost() for i in range(4): for j in range(4): x=.25*(i-2) y=.25*(j-2) z=RL.model_eval.predict(np.array((x,y)).reshape(1,-1)) print(x,y,z)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114#maze_env.py import numpy as np import time import sys if sys.version_info.major == 2: import Tkinter as tk else: import tkinter as tk UNIT = 40 # pixels MAZE_H = 4 # grid height MAZE_W = 4 # grid width class Maze(tk.Tk, object): def __init__(self): super(Maze, self).__init__() self.action_space = ['u', 'd', 'l', 'r'] self.n_actions = len(self.action_space) self.n_features = 2 self.title('maze') self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT)) self._build_maze() def _build_maze(self): self.canvas = tk.Canvas(self, bg='white', height=MAZE_H * UNIT, width=MAZE_W * UNIT) # create grids for c in range(0, MAZE_W * UNIT, UNIT): x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT self.canvas.create_line(x0, y0, x1, y1) for r in range(0, MAZE_H * UNIT, UNIT): x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r self.canvas.create_line(x0, y0, x1, y1) # create origin origin = np.array([20, 20]) # hell hell1_center = origin + np.array([UNIT * 2, UNIT]) self.hell1 = self.canvas.create_rectangle( hell1_center[0] - 15, hell1_center[1] - 15, hell1_center[0] + 15, hell1_center[1] + 15, fill='black') # hell # hell2_center = origin + np.array([UNIT, UNIT * 2]) # self.hell2 = self.canvas.create_rectangle( # hell2_center[0] - 15, hell2_center[1] - 15, # hell2_center[0] + 15, hell2_center[1] + 15, # fill='black') # create oval oval_center = origin + UNIT * 2 self.oval = self.canvas.create_oval( oval_center[0] - 15, oval_center[1] - 15, oval_center[0] + 15, oval_center[1] + 15, fill='yellow') # create red rect self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # pack all self.canvas.pack() def reset(self): self.update() time.sleep(0.1) self.canvas.delete(self.rect) origin = np.array([20, 20]) self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # return observation return (np.array(self.canvas.coords(self.rect)[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT) def step(self, action): s = self.canvas.coords(self.rect) base_action = np.array([0, 0]) if action == 0: # up if s[1] > UNIT: base_action[1] -= UNIT elif action == 1: # down if s[1] < (MAZE_H - 1) * UNIT: base_action[1] += UNIT elif action == 3: # right if s[0] < (MAZE_W - 1) * UNIT: base_action[0] += UNIT elif action == 2: # left if s[0] > UNIT: base_action[0] -= UNIT self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent next_coords = self.canvas.coords(self.rect) # next state # reward function if next_coords == self.canvas.coords(self.oval): reward = 1 done = True elif next_coords in [self.canvas.coords(self.hell1)]: reward = -1 done = True else: reward = 0 done = False s_ = (np.array(next_coords[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT) return s_, reward, done def render(self): # time.sleep(0.01) self.update()
最后
以上就是优雅荷花最近收集整理的关于dqn解简易迷宫的全部内容,更多相关dqn解简易迷宫内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复