DDPG code implementation

Code and explanation

1. Super parameter setting

import argparse
parser = argparse.ArgumentParser()  
parser.add_argument('--train', dest='train', default=True)  

parser.add_argument('--random_seed', type=int, default=0)  
#Whether to render during training  
parser.add_argument('--render', type=bool, default=False)  
parser.add_argument('--train_episodes', type=int, default=5000)  
parser.add_argument('--test_episodes', type=int, default=10)  
parser.add_argument('--max_steps', type=int, default=200)  
parser.add_argument('--gamma', type=float, default=0.9)  
#Learning rate of actor
parser.add_argument('--lr_a', type=float, default=0.001) 
#Critical learning rate
parser.add_argument('--lr_c', type=float, default=0.002)  
parser.add_argument('--batch_size', type=int, default=128)  
#Standard deviation, used to add randomness to the selected action  
parser.add_argument('--var', type=float, default=2)  
#Soft update parameters τ, Used when updating target 
parser.add_argument('--tau', type=float, default=0.01)  
args = parser.parse_args()  
# ENV_ID = 'BipedalWalker-v3'  
ENV_ID = 'Pendulum-v0'

2. Implementation of replaybuffer

  • The implementation of ReplayBuffer is the same as that in DQN. (the code is connected at the end of the text)
import random
class ReplayBuffer:  
	def __init__(self, capacity=10000):  
		self.capacity = capacity  
		self.buffer = []  
		self.position = 0  

	def push(self, state, action, reward, next_state, done):  
		if len(self.buffer) < self.capacity:  
		self.buffer[self.position] = (state, action, reward, next_state, done)  
		self.position = int((self.position + 1) % self.capacity)  

	def sample(self, batch_size = args.batch_size):  
		batch = random.sample(self.buffer, batch_size)  
		state, action, reward, next_state, done = map(np.stack, zip(*batch))  
		return state, action, reward, next_state, done  

	def __len__(self):  
		return len(self.buffer)

3. Implementation of agent class

  • Agent class mainly implements 8 methods.
    • _ init_: Initialize agent.
    • ema_update: soft update of target network parameters.
    • get_action: select an action.
    • replay: update the parameters of value network and policy network.
    • train: used to collect the parameters required by the training model.
    • test_episode: used to test the model.
    • saveModel: save the model.
    • loadModel: load the model.
Create policy network (actor)
import tensorflow as tf  
import tensorlayer as tl
def get_actor(input_state_shape, name=''):  
	input_layer = tl.layers.Input(input_state_shape, name='A_input')  
	layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='A_l1')(input_layer)  
	layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='A_l2')(layer)  
	layer = tl.layers.Dense(n_units=self.action_dim, act=tf.nn.tanh, W_init=W_init, b_init=b_init, name='A_a')(layer)  
	layer = tl.layers.Lambda(lambda x: self.action_range * x)(layer)  
	return tl.models.Model(inputs=input_layer, outputs=layer, name='Actor' + name)  
  • W_init and b_init is an initialization parameter.
  • A tanh function must be passed before output. The purpose is to limit the output to [- 1,1], and then scale the output according to the specific environment.
Creating value networks
def get_critic(input_state_shape, 
	state_input = tl.layers.Input(input_state_shape, name='C_s_input')  
	action_input = tl.layers.Input(input_action_shape, name='C_a_input')  
	layer = tl.layers.Concat(1)([state_input, action_input])  
	layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='C_l1')(layer)  
	layer = tl.layers.Dense(n_units=64, act=tf.nn.relu, W_init=W_init, b_init=b_init, name='C_l2')(layer)  
	layer = tl.layers.Dense(n_units=1, W_init=W_init, b_init=b_init, name='C_out')(layer)  
	return tl.models.Model(inputs=[state_input, action_input], outputs=layer, name='Critic' + name) 
  • tl.layers.Concat(concat_dim) is used to connect array s along a certain dimension (concat_dim). Take Pendulum-v0 as an example:
#Insert the output code into the function:
print(tl.layers.Concat(1)([state_input, action_input]))
tf.Tensor([[1. 1. 1.]], shape=(1, 3), dtype=float32)
tf.Tensor([[1.]], shape=(1, 1), dtype=float32)
tf.Tensor([[1. 1. 1. 1.]], shape=(1, 4), dtype=float32)
Copy network parameters
def copy_para(from_model, to_model):  
	for i, j in zip(from_model.trainable_weights, to_model.trainable_weights):  
def __init__(self, env):  
	self.env = env  
	self.state_dim = env.observation_space.shape[0]  
	self.action_dim = env.action_space.shape[0]  
	self.action_range = env.action_space.high  

	W_init = tf.random_normal_initializer(mean=0, stddev=0.3)  
	b_init = tf.constant_initializer(0.1)  

	self.actor = get_actor([None, self.state_dim])  
	self.critic = get_critic([None, self.state_dim], [None, self.action_dim])  

	self.actor_target = get_actor([None, self.state_dim], name='_target')  
	copy_para(self.actor, self.actor_target)  

	self.critic_target = get_critic([None, self.state_dim], [None, self.action_dim], name='_target')  
	copy_para(self.critic, self.critic_target)  

	self.ema = tf.train.ExponentialMovingAverage(decay=1 - args.tau)  # soft replacement  

	self.actor_opt = tf.optimizers.Adam(args.lr_a)  
	self.critic_opt = tf.optimizers.Adam(args.lr_c)  

	self.replay_buffer = ReplayBuffer()
  • tf.random_normal_initializer: the function returns an initializer that generates tensors with a normal distribution.
    • In the parameters, mean represents the mean and stddev represents the standard deviation.
  • tf.constant_initializer: the function returns an initializer that generates tensors with constant values.
    • The main parameters are value, whose value is Python scalar, value list or tuple, or n-dimensional Numpy array. All elements of the initialization variable will be set to the corresponding value according to the value parameter.
  • tf.train.ExponentialMovingAverage: sliding average interface, used to update (target) model parameters.
    • value = (1-τ)·value+τ·old_value
    • decay(=1- τ) It is generally set to a number very close to 1.
    • Moving average can reduce the periodic interference and has a good effect in the case of high fluctuation frequency.
  • Use the ema algorithm to update the target parameter.
def ema_update(self):  
	paras = self.actor.trainable_weights + self.critic.trainable_weights
	for i, j in zip(self.actor_target.trainable_weights + self.critic_target.trainable_weights, paras):  
  • Trainable_weights and trainable_weights are one-dimensional lists. The elements stored in them are the classes used by tensorflow to store parameters.
  • The "+" operation of the two lists can realize splicing:
a = [1,2,3]  
b = [4,5,6,7]  
#Output: [1, 2, 3, 4, 5, 6, 7]
import numpy as np
def get_action(self, state, greedy=False):  
	action = self.actor(np.array([state]))[0]  
	if greedy:  
		return action  
	return np.clip(  
		np.random.normal(action, args.var), -self.action_range, self.action_range  
	).astype(np.float32)  # add randomness to action selection for exploration
  • The np.random.normal function can randomly select a number using the positive Pacific distribution.
    • Here, action is the mean of normal distribution and args.var is the standard deviation.
    • Using the np.random.normal function can add randomness to the actions output by the actor.
  • The clip function ensures that the output range is legal.
def replay(self):  
	#Variance decay rate
	args.var *= .9995  
	states, actions, rewards, next_states, done = self.replay_buffer.sample(args.batch_size)  
	rewards = rewards[:, np.newaxis]  
	done = done[:, np.newaxis]  

	#TD algorithm updates value network
	with tf.GradientTape() as tape:  
		next_actions = self.actor_target(next_states)  
		next_q = self.critic_target([next_states, next_actions])  
		target = rewards + (1 - done) * args.gamma * next_q  
		q = self.critic([states, actions])  
		td_error = tf.losses.mean_squared_error(target, q)  
	critic_grads = tape.gradient(td_error, self.critic.trainable_weights)  
	self.critic_opt.apply_gradients(zip(critic_grads, self.critic.trainable_weights))  

	#Gradient rise update strategy network
	with tf.GradientTape() as tape:  
		actions = self.actor(states)  
		q = self.critic([states, actions])  
		actor_loss = -tf.reduce_mean(q)  # maximize the q  
	actor_grads = tape.gradient(actor_loss, self.actor.trainable_weights)  
	self.actor_opt.apply_gradients(zip(actor_grads, self.actor.trainable_weights))  
	#Update target
  • DPG: g = ∂ q ( s , π ( s ; θ ) ; w ) ∂ θ g=\frac{∂q(s,π(s;θ);w)}{∂θ} g=∂θ∂q(s,π(s;θ);w)​
  • actor_loss = -tf.reduce_mean(q)
    • DPG is different from PG. DPG is a q-function pair θ Derivation.
    • The reason for adding the "-" sign is that tf generally makes a gradient drop for the loss function to make the loss smaller and smaller. Adding a negative sign in front and then making a gradient drop for the obtained function is equivalent to making a gradient rise for the target function to make the loss larger and larger.
    • As for why we should use reduce_mean to calculate the average, it may be out of practical experience.
import time
def train(self,train_episodes):  
	t0 = time.time()  
	if args.train: # train  

		all_episode_reward = []  
		for episode in range(train_episodes):  
			state = env.reset().astype(np.float32)  
			episode_reward = 0  
			for step in range(args.max_steps):  
				if args.render:  
				# Add exploration noise  
				action = agent.get_action(state)  
				next_state, reward, done, info = env.step(action)  
				next_state = np.array(next_state, dtype=np.float32)  
				done = 1 if done is True else 0  
				self.replay_buffer.push(state, action, reward, next_state, done)  

				if len(self.replay_buffer) >= args.batch_size:  

				state = next_state  
				episode_reward += reward  
				if done:  

			if episode == 0:  
				all_episode_reward.append(all_episode_reward[-1] * 0.9 + episode_reward * 0.1)  
				'Training  | Episode: {}/{}  | Episode Reward: {:.4f}  | Running Time: {:.4f}'.format(  
					episode + 1, train_episodes, episode_reward,  
					time.time() - t0  
			if episode%100==0:  

Convert frame to gif function
from matplotlib import animation  
import matplotlib.pyplot as plt  
def display_frames_as_gif(frames, path):  
	patch = plt.imshow(frames[0])  

	def animate(i):  

	anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=5)  
	anim.save(path, writer='pillow', fps=30)
  • For more specific usage, see the DQN code at the end of the article.
def test_episode(self, test_episodes):  
	t0 = time.time()  
	for episode in range(test_episodes):  
		state = env.reset().astype(np.float32)  
		episode_reward = 0  
		frames = []  
		for step in range(args.max_steps):  

			next_state, reward, done, info = env.step(agent.get_action(state, greedy=True))  
			next_state = next_state.astype(np.float32)  

			state = next_state  
			episode_reward += reward  
			if done:  
			'Testing  | Episode: {}/{}  | Episode Reward: {:.4f}  | Running Time: {:.4f}'.format(  
				episode + 1, test_episodes, episode_reward,  
				time.time() - t0  
		# Save this game as gif  
		dir_path = os.path.join('testVideo', '_'.join([ALG_NAME, ENV_ID]))  
		if not os.path.exists(dir_path):  
		display_frames_as_gif(frames, dir_path + '\\' + str(episode) + ".gif")
import os
def saveModel(self):  
	path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID]))  
	if not os.path.exists(path):  
	tl.files.save_weights_to_hdf5(os.path.join(path, 'actor.hdf5'), self.actor)  
	tl.files.save_weights_to_hdf5(os.path.join(path, 'actor_target.hdf5'), self.actor_target)  
	tl.files.save_weights_to_hdf5(os.path.join(path, 'critic.hdf5'), self.critic)  
	tl.files.save_weights_to_hdf5(os.path.join(path, 'critic_target.hdf5'), self.critic_target)  
	print('Saved weights.')
def loadModel(self):  
	path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID]))  
	if os.path.exists(path):  
		print('Load DQN Network parametets ...')  
		tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'actor.hdf5'), self.actor)  
		tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'actor_target.hdf5'), self.actor_target)  
		tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'critic.hdf5'), self.critic)  
		tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'critic_target.hdf5'), self.critic_target)  
		print('Load weights!')  
		print("No model file find, please train model first...")

4. Main program

if __name__ == '__main__':  
	env = gym.make(ENV_ID)  
	# reproducible  

	agent = Agent(env)  

Training results

2000 times

