一、PG回顾
1、对于离散动作,策略搜索使用神经网络来参数化随机策略中的动作概率,网络的输入是智能体的当前状态,网络输出为当前所有动作的概率,该网络是一种分类网络。网络训练使用数据为一个episode数据(s,a,r). 参考https://blog.csdn.net/weixin_40493501/article/details/110384894
2、对于连续性动作来说,一般使用随机高斯策略,网络的输入是智能体当前状态,网络的输出的高斯策略的均值和标准差,网络是一个拟合网络。
无论是连续动作还是离散动作,在使用PG时,必须先弄清下面公式【主要推导上一个文章已给出】,离散动作和连续动作最大的不同就在于。
在离散动作的网络设计中,输入是状态,输出可以看做是每个动作的概率,与动作标签进行对比,尽量使动作概率的输出接近标签。
在连续动作中,将看成是动作的分布,例如高斯分布
二、连续动作PG算法网络
输入层为状态,连续动作空间的输出层不再是动作,二是动作描述的一种分布参数,利用输出的参数可以得到动作的分布,可以根据分布来选择动作。
【需要补充具体的计算过程】
三、代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203import tensorflow as tf import numpy as np import gym import matplotlib.pyplot as plt RENDER = False ''' 连续动作 使用的是高斯策略,用神经网络参数化高斯分布的均值和方差 状态空间为 2 动作空间为 1 其取值范围在[-2,2]之间 注意其误差的构建 ''' #利用当前策略进行采样,产生数据 class Sample(): def __init__(self,env, policy_net): self.env = env self.policy_net=policy_net self.gamma = 0.95 def sample_episodes(self, num_episodes): #产生num_episodes条轨迹 batch_obs=[] batch_actions=[] batch_rs =[] for i in range(num_episodes): observation = self.env.reset() #将一个episode的回报存储起来 reward_episode = [] while True: # if RENDER:self.env.render() #根据策略网络产生一个动作 state = np.reshape(observation,[1,3]) action = self.policy_net.choose_action(state) observation_, reward, done, info = self.env.step(action) # print("observation",observation_) batch_obs.append(np.reshape(observation,[1,3])[0,:]) # print('observation', np.reshape(observation,[1,3])[0,:]) batch_actions.append(action) reward_episode.append((reward+8)/8) #一个episode结束 if done: #处理回报函数 reward_sum = 0 discouted_sum_reward = np.zeros_like(reward_episode) for t in reversed(range(0, len(reward_episode))): reward_sum = reward_sum*self.gamma + reward_episode[t] discouted_sum_reward[t] = reward_sum #归一化处理 discouted_sum_reward -= np.mean(discouted_sum_reward) discouted_sum_reward/= np.std(discouted_sum_reward) #将归一化的数据存储到批回报中 for t in range(len(reward_episode)): batch_rs.append(discouted_sum_reward[t]) # print(discouted_sum_reward[t]) break #智能体往前推进一步 observation = observation_ #reshape 观测和回报 batch_obs = np.reshape(batch_obs, [len(batch_obs), self.policy_net.n_features]) batch_actions = np.reshape(batch_actions,[len(batch_actions),1]) batch_rs = np.reshape(batch_rs,[len(batch_rs),1]) return batch_obs, batch_actions,batch_rs #定义策略网络 class Policy_Net(): def __init__(self, env, action_bound, lr = 0.0001, model_file=None): self.learning_rate = lr #输入特征的维数 self.n_features = env.observation_space.shape[0] #输出动作空间的维数 self.n_actions = 1 # 注意动作维度只有一个 #1.1 输入层 self.obs = tf.placeholder(tf.float32, shape=[None, self.n_features]) #1.2.第一层隐含层 self.f1 = tf.layers.dense(inputs=self.obs, units=200, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1), bias_initializer=tf.constant_initializer(0.1)) #1.3 第二层,均值,需要注意的是激活函数为tanh,使得输出在-1~+1 mu = tf.layers.dense(inputs=self.f1, units=self.n_actions, activation=tf.nn.tanh, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1), bias_initializer=tf.constant_initializer(0.1)) #1.3 第二层,标准差 sigma = tf.layers.dense(inputs=self.f1, units=self.n_actions, activation=tf.nn.softplus, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1), bias_initializer=tf.constant_initializer(0.1)) #均值乘以2,使得均值取值范围在(-2,2) self.mu = 2*mu self.sigma =sigma # 定义带参数的正态分布 self.normal_dist = tf.contrib.distributions.Normal(self.mu, self.sigma) #根据正态分布采样一个动作 self.action = tf.clip_by_value(self.normal_dist.sample(1), action_bound[0],action_bound[1]) #1.5 当前动作 self.current_act = tf.placeholder(tf.float32, [None,1]) self.current_reward = tf.placeholder(tf.float32, [None,1]) #TODO 2. 构建损失函数 log_prob = self.normal_dist.log_prob(self.current_act) self.loss = tf.reduce_mean(log_prob*self.current_reward+0.01*self.normal_dist.entropy()) #3. 定义一个优化器 self.train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(-self.loss) #4. tf工程 self.sess = tf.Session() #5. 初始化图中的变量 self.sess.run(tf.global_variables_initializer()) #6.定义保存和恢复模型 self.saver = tf.train.Saver() if model_file is not None: self.restore_model(model_file) #依概率选择动作 def choose_action(self, state): action = self.sess.run(self.action, {self.obs:state}) return action[0] #定义训练 def train_step(self, state_batch, label_batch, reward_batch): loss, _ =self.sess.run([self.loss, self.train_op], feed_dict={self.obs:state_batch, self.current_act:label_batch, self.current_reward:reward_batch}) return loss #定义存储模型函数 def save_model(self, model_path): self.saver.save(self.sess, model_path) #定义恢复模型函数 def restore_model(self, model_path): self.saver.restore(self.sess, model_path) def policy_train(env, brain, training_num): reward_sum = 0 reward_sum_line = [] training_time = [] brain = brain env = env for i in range(training_num): temp = 0 sampler = Sample(env, brain) # 采样1个episode train_obs, train_actions, train_rs = sampler.sample_episodes(1) brain.train_step(train_obs, train_actions, train_rs) if i == 0: reward_sum = policy_test(env, brain,RENDER,1)[0] else: reward_sum = 0.95 * reward_sum + 0.05 * policy_test(env, brain,RENDER,1)[0] # print(policy_test(env, brain)) reward_sum_line.append(reward_sum) training_time.append(i) print("training episodes is %d,trained reward_sum is %f" % (i, reward_sum)) if reward_sum > -200: break brain.save_model('./Pendulum_models/current_bset_pg_pendulum') plt.figure(1) plt.plot(training_time, reward_sum_line) plt.xlabel("training number") plt.ylabel("score") # plt.show() def policy_test(env, policy,RENDER,test_number): test_reward = [] for i in range(test_number): observation = env.reset() reward_sum = 0 # 将一个episode的回报存储起来 while True: if RENDER: env.render() # 根据策略网络产生一个动作 state = np.reshape(observation, [1, 3]) action = policy.choose_action(state) observation_, reward, done, info = env.step(action) reward_sum+=reward if done: break observation = observation_ test_reward.append(reward_sum) return test_reward return reward_sum if __name__=='__main__': #构建单摆类 env_name = 'Pendulum-v0' env = gym.make(env_name) env.unwrapped env.seed(1) #定义力矩取值区间 action_bound = [-env.action_space.high, env.action_space.high] #实例化一个策略网络 brain = Policy_Net(env,action_bound) training_num = 200 #训练策略网络 policy_train(env, brain, training_num) #测试训练好的策略网络 test_reward_sum = policy_test(env, brain, False, 20) plt.figure(2) plt.title("Test") plt.plot(test_reward_sum) plt.xlabel("Testing number") plt.ylabel("score") plt.show() plt.show()
四、训练结果
一共训练200个episodes,训练完成后在进行测试20个episode(一个episode即为一条轨迹)
测试结果:
问题:
1、从训练的回报函数来看,网络并没有收敛,并且不是训练次数的问题,测试的结果波动较大,算法没有收敛是算法本身的缺陷还是实现过程出错?
2、从实验画面显示来看,倒立摆没有立起来,如何在不改变算法的前提下来改进训练效果,即如何调节PG算法收敛
继续学习中。。。。。。
最后
以上就是失眠绿草最近收集整理的关于02 强化学习——策略梯度法(PG)(连续动作)一、PG回顾二、连续动作PG算法网络三、代码实现四、训练结果的全部内容,更多相关02内容请搜索靠谱客的其他文章。
发表评论 取消回复