DDPG code implementation
Code and explanation
1. Super parameter setting
import argparse parser = argparse.ArgumentParser() parser.add_argument('--train', dest='train', default=True) parser.add_argument('--random_seed', type=int, default=0) #Whether to render during training parser.add_argument('--render', type=bool, default=False) parser.add_argument('--train_episodes', type=int, default=5000) parser.add_argument('--test_episodes', type=int, default=10) parser.add_argument('--max_steps', type=int, default=200) parser.add_argument('--gamma', type=float, default=0.9) #Learning rate of actor parser.add_argument('--lr_a', type=float, default=0.001) #Critical learning rate parser.add_argument('--lr_c', type=float, default=0.002) parser.add_argument('--batch_size', type=int, default=128) #Standard deviation, used to add randomness to the selected action parser.add_argument('--var', type=float, default=2) #Soft update parameters τ, Used when updating target parser.add_argument('--tau', type=float, default=0.01) args = parser.parse_args() ALG_NAME = 'DDPG' # ENV_ID = 'BipedalWalker-v3' ENV_ID = 'Pendulum-v0'
2. Implementation of replaybuffer
- The implementation of ReplayBuffer is the same as that in DQN. (the code is connected at the end of the text)
import random class ReplayBuffer: def __init__(self, capacity=10000): self.capacity = capacity self.buffer = [] self.position = 0 def push(self, state, action, reward, next_state, done): if len(self.buffer) < self.capacity: self.buffer.append(None) self.buffer[self.position] = (state, action, reward, next_state, done) self.position = int((self.position + 1) % self.capacity) def sample(self, batch_size = args.batch_size): batch = random.sample(self.buffer, batch_size) state, action, reward, next_state, done = map(np.stack, zip(*batch)) return state, action, reward, next_state, done def __len__(self): return len(self.buffer)
3. Implementation of agent class
- Agent class mainly implements 8 methods.
- _ init_: Initialize agent.
- ema_update: soft update of target network parameters.
- get_action: select an action.
- replay: update the parameters of value network and policy network.
- train: used to collect the parameters required by the training model.
- test_episode: used to test the model.
- saveModel: save the model.
- loadModel: load the model.
3.1._init_
Create policy network (actor)
import tensorflow as tf import tensorlayer as tl def get_actor(input_state_shape, name=''): input_layer = tl.layers.Input(input_state_shape, name='A_input') layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='A_l1')(input_layer) layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='A_l2')(layer) layer = tl.layers.Dense(n_units=self.action_dim, act=tf.nn.tanh, W_init=W_init, b_init=b_init, name='A_a')(layer) layer = tl.layers.Lambda(lambda x: self.action_range * x)(layer) return tl.models.Model(inputs=input_layer, outputs=layer, name='Actor' + name)
- W_init and b_init is an initialization parameter.
- A tanh function must be passed before output. The purpose is to limit the output to [- 1,1], and then scale the output according to the specific environment.
Creating value networks
def get_critic(input_state_shape, state_input = tl.layers.Input(input_state_shape, name='C_s_input') action_input = tl.layers.Input(input_action_shape, name='C_a_input') layer = tl.layers.Concat(1)([state_input, action_input]) layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='C_l1')(layer) layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='C_l2')(layer) layer = tl.layers.Dense(n_units=1, W_init=W_init, b_init=b_init, name='C_out')(layer) return tl.models.Model(inputs=[state_input, action_input], outputs=layer, name='Critic' + name)
- tl.layers.Concat(concat_dim) is used to connect array s along a certain dimension (concat_dim). Take Pendulum-v0 as an example:
#Insert the output code into the function: print(state_input) print(action_input) print(tl.layers.Concat(1)([state_input, action_input])) #Output: tf.Tensor([[1. 1. 1.]], shape=(1, 3), dtype=float32) tf.Tensor([[1.]], shape=(1, 1), dtype=float32) tf.Tensor([[1. 1. 1. 1.]], shape=(1, 4), dtype=float32)
Copy network parameters
def copy_para(from_model, to_model): for i, j in zip(from_model.trainable_weights, to_model.trainable_weights): j.assign(i)
_init_
def __init__(self, env): self.env = env self.state_dim = env.observation_space.shape[0] self.action_dim = env.action_space.shape[0] self.action_range = env.action_space.high W_init = tf.random_normal_initializer(mean=0, stddev=0.3) b_init = tf.constant_initializer(0.1) self.actor = get_actor([None, self.state_dim]) self.critic = get_critic([None, self.state_dim], [None, self.action_dim]) self.actor.train() self.critic.train() self.actor_target = get_actor([None, self.state_dim], name='_target') copy_para(self.actor, self.actor_target) self.actor_target.eval() self.critic_target = get_critic([None, self.state_dim], [None, self.action_dim], name='_target') copy_para(self.critic, self.critic_target) self.critic_target.eval() self.ema = tf.train.ExponentialMovingAverage(decay=1 - args.tau) # soft replacement self.actor_opt = tf.optimizers.Adam(args.lr_a) self.critic_opt = tf.optimizers.Adam(args.lr_c) self.replay_buffer = ReplayBuffer()
- tf.random_normal_initializer: the function returns an initializer that generates tensors with a normal distribution.
- In the parameters, mean represents the mean and stddev represents the standard deviation.
- tf.constant_initializer: the function returns an initializer that generates tensors with constant values.
- The main parameters are value, whose value is Python scalar, value list or tuple, or n-dimensional Numpy array. All elements of the initialization variable will be set to the corresponding value according to the value parameter.
- tf.train.ExponentialMovingAverage: sliding average interface, used to update (target) model parameters.
- value = (1-τ)·value+τ·old_value
- decay(=1- τ) It is generally set to a number very close to 1.
- Moving average can reduce the periodic interference and has a good effect in the case of high fluctuation frequency.
3.2.ema_update
- Use the ema algorithm to update the target parameter.
def ema_update(self): paras = self.actor.trainable_weights + self.critic.trainable_weights self.ema.apply(paras) for i, j in zip(self.actor_target.trainable_weights + self.critic_target.trainable_weights, paras): i.assign(self.ema.average(j))
- Trainable_weights and trainable_weights are one-dimensional lists. The elements stored in them are the classes used by tensorflow to store parameters.
- The "+" operation of the two lists can realize splicing:
a = [1,2,3] b = [4,5,6,7] print(a+b) #Output: [1, 2, 3, 4, 5, 6, 7]
3.3.get_action
import numpy as np def get_action(self, state, greedy=False): action = self.actor(np.array([state]))[0] if greedy: return action return np.clip( np.random.normal(action, args.var), -self.action_range, self.action_range ).astype(np.float32) # add randomness to action selection for exploration
- The np.random.normal function can randomly select a number using the positive Pacific distribution.
- Here, action is the mean of normal distribution and args.var is the standard deviation.
- Using the np.random.normal function can add randomness to the actions output by the actor.
- The clip function ensures that the output range is legal.
3.4.replay
def replay(self): #Variance decay rate args.var *= .9995 states, actions, rewards, next_states, done = self.replay_buffer.sample(args.batch_size) rewards = rewards[:, np.newaxis] done = done[:, np.newaxis] #TD algorithm updates value network with tf.GradientTape() as tape: next_actions = self.actor_target(next_states) next_q = self.critic_target([next_states, next_actions]) target = rewards + (1 - done) * args.gamma * next_q q = self.critic([states, actions]) td_error = tf.losses.mean_squared_error(target, q) critic_grads = tape.gradient(td_error, self.critic.trainable_weights) self.critic_opt.apply_gradients(zip(critic_grads, self.critic.trainable_weights)) #Gradient rise update strategy network with tf.GradientTape() as tape: actions = self.actor(states) q = self.critic([states, actions]) actor_loss = -tf.reduce_mean(q) # maximize the q actor_grads = tape.gradient(actor_loss, self.actor.trainable_weights) self.actor_opt.apply_gradients(zip(actor_grads, self.actor.trainable_weights)) #Update target self.ema_update()
- DPG: g = ∂ q ( s , π ( s ; θ ) ; w ) ∂ θ g=\frac{∂q(s,π(s;θ);w)}{∂θ} g=∂θ∂q(s,π(s;θ);w)
- actor_loss = -tf.reduce_mean(q)
- DPG is different from PG. DPG is a q-function pair θ Derivation.
- The reason for adding the "-" sign is that tf generally makes a gradient drop for the loss function to make the loss smaller and smaller. Adding a negative sign in front and then making a gradient drop for the obtained function is equivalent to making a gradient rise for the target function to make the loss larger and larger.
- As for why we should use reduce_mean to calculate the average, it may be out of practical experience.
3.5.train
import time def train(self,train_episodes): t0 = time.time() if args.train: # train self.loadModel() all_episode_reward = [] for episode in range(train_episodes): state = env.reset().astype(np.float32) episode_reward = 0 for step in range(args.max_steps): if args.render: env.render() # Add exploration noise action = agent.get_action(state) next_state, reward, done, info = env.step(action) next_state = np.array(next_state, dtype=np.float32) done = 1 if done is True else 0 self.replay_buffer.push(state, action, reward, next_state, done) if len(self.replay_buffer) >= args.batch_size: agent.replay() state = next_state episode_reward += reward if done: break if episode == 0: all_episode_reward.append(episode_reward) else: all_episode_reward.append(all_episode_reward[-1] * 0.9 + episode_reward * 0.1) print( 'Training | Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format( episode + 1, train_episodes, episode_reward, time.time() - t0 ) ) if episode%100==0: self.saveModel() plt.plot(all_episode_reward) else: self.loadModel() self.test_episode(args.test_episodes)
3.6.test_episode
Convert frame to gif function
from matplotlib import animation import matplotlib.pyplot as plt def display_frames_as_gif(frames, path): patch = plt.imshow(frames[0]) plt.axis('off') def animate(i): patch.set_data(frames[i]) anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=5) anim.save(path, writer='pillow', fps=30)
- For more specific usage, see the DQN code at the end of the article.
test_episode
def test_episode(self, test_episodes): t0 = time.time() for episode in range(test_episodes): state = env.reset().astype(np.float32) episode_reward = 0 frames = [] for step in range(args.max_steps): env.render() frames.append(self.env.render(mode='rgb_array')) next_state, reward, done, info = env.step(agent.get_action(state, greedy=True)) next_state = next_state.astype(np.float32) state = next_state episode_reward += reward if done: break print( 'Testing | Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format( episode + 1, test_episodes, episode_reward, time.time() - t0 ) ) # Save this game as gif dir_path = os.path.join('testVideo', '_'.join([ALG_NAME, ENV_ID])) if not os.path.exists(dir_path): os.makedirs(dir_path) display_frames_as_gif(frames, dir_path + '\\' + str(episode) + ".gif")
3.7.saveModel
import os def saveModel(self): path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID])) if not os.path.exists(path): os.makedirs(path) tl.files.save_weights_to_hdf5(os.path.join(path, 'actor.hdf5'), self.actor) tl.files.save_weights_to_hdf5(os.path.join(path, 'actor_target.hdf5'), self.actor_target) tl.files.save_weights_to_hdf5(os.path.join(path, 'critic.hdf5'), self.critic) tl.files.save_weights_to_hdf5(os.path.join(path, 'critic_target.hdf5'), self.critic_target) print('Saved weights.')
3.8.loadModel
def loadModel(self): path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID])) if os.path.exists(path): print('Load DQN Network parametets ...') tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'actor.hdf5'), self.actor) tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'actor_target.hdf5'), self.actor_target) tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'critic.hdf5'), self.critic) tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'critic_target.hdf5'), self.critic_target) print('Load weights!') else: print("No model file find, please train model first...")
4. Main program
if __name__ == '__main__': env = gym.make(ENV_ID) # reproducible env.seed(args.random_seed) np.random.seed(args.random_seed) tf.random.set_seed(args.random_seed) agent = Agent(env) agent.train(train_episodes=args.train_episodes) env.close()
Training results
2000 times