import time
from collections import deque, namedtuple
import gym
import numpy as np
import PIL.Image
import tensorflow as tf
import utils
from pyvirtualdisplay import Display
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam
Hyperparameters
Lunar Landing
We will be using OpenAIβs Gym Library. The Gym library provides a wide variety of environments for reinforcement learning. To put it simply, an environment represents a problem or task to be solved. We will try to solve the Lunar Lander environment using reinforcement learning.
- The goal of the Lunar Lander environment is to land the lunar lander safely on the landing pad on the surface of the moon.
- The landing pad is designated by two flag poles and its center is at coordinates
(0,0)
but the lander is also allowed to land outside of the landing pad. - The lander starts at the top center of the environment with a random initial force applied to its center of mass and has infinite fuel.
- The environment is considered solved if you get
200
points.
Action Space
The agent has four discrete actions available:
- Do nothing.
- Fire right engine.
- Fire main engine.
- Fire left engine.
Each action has a corresponding numerical value:
Do nothing = 0 Fire right engine = 1 Fire main engine = 2 Fire left engine = 3
Observation Space
The agentβs observation space consists of a state vector with 8 variables:
- Its (π₯,π¦)(x,y) coordinates. The landing pad is always at coordinates (0,0)(0,0).
- Its linear velocities (π₯Λ,π¦Λ)(xΛ,yΛ).
- Its angle πΞΈ.
- Its angular velocity πΛΞΈΛ.
- Two booleans, πl and πr, that represent whether each leg is in contact with the ground or not.
Rewards
After every step, a reward is granted. The total reward of an episode is the sum of the rewards for all the steps within that episode.
For each step, the reward:
- is increased/decreased the closer/further the lander is to the landing pad.
- is increased/decreased the slower/faster the lander is moving.
- is decreased the more the lander is tilted (angle not horizontal).
- is increased by 10 points for each leg that is in contact with the ground.
- is decreased by 0.03 points each frame a side engine is firing.
- is decreased by 0.3 points each frame the main engine is firing.
The episode receives an additional reward of -100 or +100 points for crashing or landing safely respectively.
Episode Termination
An episode ends (i.e the environment enters a terminal state) if:
- The lunar lander crashes (i.e if the body of the lunar lander comes in contact with the surface of the moon).
- The absolute value of the landerβs π₯x-coordinate is greater than 1 (i.e. it goes beyond the left or right border)
You can check out the Open AI Gym documentation for a full description of the environment.
Setup
# Set up a virtual display to render the Lunar Lander environment.
=0, size=(840, 480)).start();
Display(visible
# Set the random seed for TensorFlow
tf.random.set_seed(utils.SEED)
= 100_000 # size of memory buffer
MEMORY_SIZE = 0.995 # discount factor
GAMMA = 1e-3 # learning rate
ALPHA = 4 # perform a learning update every C time steps NUM_STEPS_FOR_UPDATE
Load Environment
We start by loading the LunarLander-v2
environment from the gym
library by using the .make()
method. LunarLander-v2
is the latest version of the Lunar Lander environment and you can read about its version history in the Open AI Gym documentation.
= gym.make('LunarLander-v2') env
Reset to Orig State
Once we load the environment we use the .reset()
method to reset the environment to the initial state. The lander starts at the top center of the environment and we can render the first frame of the environment by using the .render()
method.
env.reset()='rgb_array')) PIL.Image.fromarray(env.render(mode
Get the size of the state vector and the number of valid actions.
= env.observation_space.shape
state_size = env.action_space.n
num_actions
print('State Shape:', state_size)
print('Number of actions:', num_actions)
State Shape: (8,) Number of actions: 4
Interact with Environment
In the standard βagent-environment loopβ formalism, an agent interacts with the environment in discrete time steps π‘=0,1,2,β¦t=0,1,2,β¦. At each time step π‘t, the agent uses a policy πΟ to select an action π΄π‘At based on its observation of the environmentβs state ππ‘St. The agent receives a numerical reward π π‘Rt and on the next time step, moves to a new state ππ‘+1St+1.
Exploring the Environmentβs Dynamics
In Open AIβs Gym environments, we use the .step()
method to run a single time step of the environmentβs dynamics. In the version of gym
that we are using the .step()
method accepts an action and returns four values:
observation
(object): an environment-specific object representing your observation of the environment. In the Lunar Lander environment this corresponds to a numpy array containing the positions and velocities of the lander as described in section 3.2 Observation Space.
reward
(float): amount of reward returned as a result of taking the given action. In the Lunar Lander environment this corresponds to a float of typenumpy.float64
as described in section 3.3 Rewards.
done
(boolean): When done isTrue
, it indicates the episode has terminated and itβs time to reset the environment.
info
(dictionary): diagnostic information useful for debugging. We wonβt be using this variable in this notebook but it is shown here for completeness.
To begin an episode, we need to reset the environment to an initial state. We do this by using the .reset()
method.
# Reset the environment and get the initial state.
= env.reset() current_state
Once the environment is reset, the agent can start taking actions in the environment by using the .step()
method. Note that the agent can only take one action per time step.
In the cell below you can select different actions and see how the returned values change depending on the action taken. Remember that in this environment the agent has four discrete actions available and we specify them in code by using their corresponding numerical value:
Do nothing = 0 Fire right engine = 1 Fire main engine = 2 Fire left engine = 3
# Select an action
= 0
action
# Run a single time step of the environment's dynamics with the given action.
= env.step(action)
next_state, reward, done, _
# Display table with values.
utils.display_table(current_state, action, next_state, reward, done)
# Replace the `current_state` with the state after the action is taken
= next_state current_state
Deep Q - Learning
In cases where both the state and action space are discrete we can estimate the action-value function iteratively by using the Bellman equation:
This iterative method converges to the optimal action-value function πβ(π ,π)Qβ(s,a) as πββiββ. This means that the agent just needs to gradually explore the state-action space and keep updating the estimate of π(π ,π)Q(s,a) until it converges to the optimal action-value function πβ(π ,π)Qβ(s,a). However, in cases where the state space is continuous it becomes practically impossible to explore the entire state-action space. Consequently, this also makes it practically impossible to gradually estimate π(π ,π)Q(s,a) until it converges to πβ(π ,π)Qβ(s,a).
In the Deep πQ-Learning, we solve this problem by using a neural network to estimate the action-value function π(π ,π)βπβ(π ,π)Q(s,a)βQβ(s,a). We call this neural network a πQ-Network and it can be trained by adjusting its weights at each iteration to minimize the mean-squared error in the Bellman equation.
Unfortunately, using neural networks in reinforcement learning to estimate action-value functions has proven to be highly unstable. Luckily, thereβs a couple of techniques that can be employed to avoid instabilities. These techniques consist of using a Target Network and Experience Replay. We will explore these two techniques in the following sections.
Target Network
We can train the πQ-Network by adjusting itβs weights at each iteration to minimize the mean-squared error in the Bellman equation, where the target values are given by:
where π€w are the weights of the πQ-Network. This means that we are adjusting the weights π€w at each iteration to minimize the following error:
Notice that this forms a problem because the π¦y target is changing on every iteration. Having a constantly moving target can lead to oscillations and instabilities. To avoid this, we can create a separate neural network for generating the π¦y targets. We call this separate neural network the target πΜ Q^-Network and it will have the same architecture as the original πQ-Network. By using the target πΜ Q^-Network, the above error becomes:
where π€βwβ and π€w are the weights of the target πΜ Q^-Network and πQ-Network, respectively.
In practice, we will use the following algorithm: every πΆC time steps we will use the πΜ Q^-Network to generate the π¦y targets and update the weights of the target πΜ Q^-Network using the weights of the πQ-Network. We will update the weights π€βwβ of the the target πΜ Q^-Network using a soft update. This means that we will update the weights π€βwβ using the following rule:
where πβͺ1Οβͺ1. By using the soft update, we are ensuring that the target values, π¦y, change slowly, which greatly improves the stability of our learning algorithm.
Create NN
We will create the πQ and target πΜ Q^ networks and set the optimizer. Remember that the Deep πQ-Network (DQN) is a neural network that approximates the action-value function π(π ,π)βπβ(π ,π)Q(s,a)βQβ(s,a). It does this by learning how to map states to πQ values.
To solve the Lunar Lander environment, we are going to employ a DQN with the following architecture:
- An
Input
layer that takesstate_size
as input. - A
Dense
layer with64
units and arelu
activation function. - A
Dense
layer with64
units and arelu
activation function. - A
Dense
layer withnum_actions
units and alinear
activation function. This will be the output layer of our network.
In the cell below you should create the πQ-Network and the target πΜ Q^-Network using the model architecture described above. Remember that both the πQ-Network and the target πΜ Q^-Network have the same architecture.
Lastly, you should set Adam
as the optimizer with a learning rate equal to ALPHA
. Recall that ALPHA
was defined in the Hyperparameters section. We should note that for this exercise you should use the already imported packages:
# Create the Q-Network
= Sequential([
q_network =state_size),
Input(shape=64, activation='relu'),
Dense(units=64, activation='relu'),
Dense(units=num_actions, activation='linear'),
Dense(units
])
# Create the target Q^-Network
= Sequential([
target_q_network =state_size),
Input(shape=64, activation='relu'),
Dense(units=64, activation='relu'),
Dense(units=num_actions, activation='linear'),
Dense(units
])
= Adam(learning_rate=ALPHA) optimizer
Experience Replay
When an agent interacts with the environment, the states, actions, and rewards the agent experiences are sequential by nature. If the agent tries to learn from these consecutive experiences it can run into problems due to the strong correlations between them. To avoid this, we employ a technique known as Experience Replay to generate uncorrelated experiences for training our agent. Experience replay consists of storing the agentβs experiences (i.e the states, actions, and rewards the agent receives) in a memory buffer and then sampling a random mini-batch of experiences from the buffer to do the learning. The experience tuples (ππ‘,π΄π‘,π π‘,ππ‘+1)(St,At,Rt,St+1) will be added to the memory buffer at each time step as the agent interacts with the environment.
For convenience, we will store the experiences as named tuples.
By using experience replay we avoid problematic correlations, oscillations and instabilities. In addition, experience replay also allows the agent to potentially use the same experience in multiple weight updates, which increases data efficiency.
# Store experiences as named tuples
= namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"]) experience
Q-Learning
Now that we know all the techniques that we are going to use, we can put them together to arrive at the Deep Q-Learning Algorithm With Experience Replay.
Loss Function
We will implement line 12 of the algorithm outlined in Fig 3 above and you will also compute the loss between the
π¦y targets and the π(π ,π)Q(s,a) values. In the cell below, complete the compute_loss
function by setting the π¦y targets equal to:
Here are a couple of things to note:
- The
compute_loss
function takes in a mini-batch of experience tuples. This mini-batch of experience tuples is unpacked to extract thestates
,actions
,rewards
,next_states
, anddone_vals
. You should keep in mind that these variables are TensorFlow Tensors whose size will depend on the mini-batch size. For example, if the mini-batch size is64
then bothrewards
anddone_vals
will be TensorFlow Tensors with64
elements. - Using
if/else
statements to set the π¦y targets will not work when the variables are tensors with many elements. However, notice that you can use thedone_vals
to implement the above in a single line of code. To do this, recall that thedone
variable is a Boolean variable that takes the valueTrue
when an episode terminates at step π+1j+1 and it isFalse
otherwise. Taking into account that a Boolean value ofTrue
has the numerical value of1
and a Boolean value ofFalse
has the numerical value of0
, you can use the factor(1 - done_vals)
to implement the above in a single line of code. Hereβs a hint: notice that(1 - done_vals)
has a value of0
whendone_vals
isTrue
and a value of1
whendone_vals
isFalse
.
Lastly, compute the loss by calculating the Mean-Squared Error (MSE
) between the y_targets
and the q_values
. To calculate the mean-squared error you should use the already imported package MSE
:
def compute_loss(experiences, gamma, q_network, target_q_network):
"""
Calculates the loss.
Args:
experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "done"] namedtuples
gamma: (float) The discount factor.
q_network: (tf.keras.Sequential) Keras model for predicting the q_values
target_q_network: (tf.keras.Sequential) Keras model for predicting the targets
Returns:
loss: (TensorFlow Tensor(shape=(0,), dtype=int32)) the Mean-Squared Error between
the y targets and the Q(s,a) values.
"""
# Unpack the mini-batch of experience tuples
= experiences
states, actions, rewards, next_states, done_vals
# Compute max Q^(s,a)
= tf.reduce_max(target_q_network(next_states), axis=-1)
max_qsa
# Set y = R if episode terminates, otherwise set y = R + Ξ³ max Q^(s,a).
= rewards + (gamma * max_qsa * (1 - done_vals))
y_targets
# Get the q_values and reshape to match y_targets
= q_network(states)
q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),
q_values =1))
tf.cast(actions, tf.int32)], axis
# Compute the loss
= MSE(y_targets, q_values)
loss
return loss
Update Network Weights
We will use the agent_learn
function below to implement lines 12 -14 of the algorithm outlined in Fig 3. The agent_learn
function will update the weights of the πQ and target πΜ Q^ networks using a custom training loop. Because we are using a custom training loop we need to retrieve the gradients via a tf.GradientTape
instance, and then call optimizer.apply_gradients()
to update the weights of our πQ-Network. Note that we are also using the @tf.function
decorator to increase performance. Without this decorator our training will take twice as long. If you would like to know more about how to increase performance with @tf.function
take a look at the TensorFlow documentation.
The last line of this function updates the weights of the target πΜ Q^-Network using a soft update. If you want to know how this is implemented in code we encourage you to take a look at the utils.update_target_network
function in the utils
module.
@tf.function
def agent_learn(experiences, gamma):
"""
Updates the weights of the Q networks.
Args:
experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "done"] namedtuples
gamma: (float) The discount factor.
"""
# Calculate the loss
with tf.GradientTape() as tape:
= compute_loss(experiences, gamma, q_network, target_q_network)
loss
# Get the gradients of the loss with respect to the weights.
= tape.gradient(loss, q_network.trainable_variables)
gradients
# Update the weights of the q_network.
zip(gradients, q_network.trainable_variables))
optimizer.apply_gradients(
# update the weights of target q_network
utils.update_target_network(q_network, target_q_network)
Train the Agent
We are now ready to train our agent to solve the Lunar Lander environment. In the cell below we will implement the algorithm in Fig 3 line by line (please note that we have included the same algorithm below for easy reference. This will prevent you from scrolling up and down the notebook):
- Line 1: We initialize the
memory_buffer
with a capacity of π=N=MEMORY_SIZE
. Notice that we are using adeque
as the data structure for ourmemory_buffer
. - Line 2: We skip this line since we already initialized the
q_network
in Exercise 1.
- Line 3: We initialize the
target_q_network
by setting its weights to be equal to those of theq_network
.
- Line 4: We start the outer loop. Notice that we have set π=M=
num_episodes = 2000
. This number is reasonable because the agent should be able to solve the Lunar Lander environment in less than2000
episodes using this notebookβs default parameters.
- Line 5: We use the
.reset()
method to reset the environment to the initial state and get the initial state.
- Line 6: We start the inner loop. Notice that we have set π=T=
max_num_timesteps = 1000
. This means that the episode will automatically terminate if the episode hasnβt terminated after1000
time steps.
- Line 7: The agent observes the current
state
and chooses anaction
using an πΟ΅-greedy policy. Our agent starts out using a value of π=Ο΅=epsilon = 1
which yields an πΟ΅-greedy policy that is equivalent to the equiprobable random policy. This means that at the beginning of our training, the agent is just going to take random actions regardless of the observedstate
. As training progresses we will decrease the value of πΟ΅ slowly towards a minimum value using a given πΟ΅-decay rate. We want this minimum value to be close to zero because a value of π=0Ο΅=0 will yield an πΟ΅-greedy policy that is equivalent to the greedy policy. This means that towards the end of training, the agent will lean towards selecting theaction
that it believes (based on its past experiences) will maximize π(π ,π)Q(s,a). We will set the minimum πΟ΅ value to be0.01
and not exactly 0 because we always want to keep a little bit of exploration during training. If you want to know how this is implemented in code we encourage you to take a look at theutils.get_action
function in theutils
module.
- Line 8: We use the
.step()
method to take the givenaction
in the environment and get thereward
and thenext_state
.
- Line 9: We store the
experience(state, action, reward, next_state, done)
tuple in ourmemory_buffer
. Notice that we also store thedone
variable so that we can keep track of when an episode terminates. This allowed us to set the π¦y targets in Exercise 2.
- Line 10: We check if the conditions are met to perform a learning update. We do this by using our custom
utils.check_update_conditions
function. This function checks if πΆ=C=NUM_STEPS_FOR_UPDATE = 4
time steps have occured and if ourmemory_buffer
has enough experience tuples to fill a mini-batch. For example, if the mini-batch size is64
, then ourmemory_buffer
should have more than64
experience tuples in order to pass the latter condition. If the conditions are met, then theutils.check_update_conditions
function will return a value ofTrue
, otherwise it will return a value ofFalse
.
- Lines 11 - 14: If the
update
variable isTrue
then we perform a learning update. The learning update consists of sampling a random mini-batch of experience tuples from ourmemory_buffer
, setting the π¦y targets, performing gradient descent, and updating the weights of the networks. We will use theagent_learn
function we defined in Section 8 to perform the latter 3.
Line 15: At the end of each iteration of the inner loop we set
next_state
as our newstate
so that the loop can start again from this new state. In addition, we check if the episode has reached a terminal state (i.e we check ifdone = True
). If a terminal state has been reached, then we break out of the inner loop.Line 16: At the end of each iteration of the outer loop we update the value of πΟ΅, and check if the environment has been solved. We consider that the environment has been solved if the agent receives an average of
200
points in the last100
episodes. If the environment has not been solved we continue the outer loop and start a new episode.
Finally, we wanted to note that we have included some extra variables to keep track of the total number of points the agent received in each episode. This will help us determine if the agent has solved the environment and it will also allow us to see how our agent performed during training. We also use the time
module to measure how long the training takes.
= time.time()
start
= 2000
num_episodes = 1000
max_num_timesteps
= []
total_point_history
= 100 # number of total points to use for averaging
num_p_av = 1.0 # initial Ξ΅ value for Ξ΅-greedy policy
epsilon
# Create a memory buffer D with capacity N
= deque(maxlen=MEMORY_SIZE)
memory_buffer
# Set the target network weights equal to the Q-Network weights
target_q_network.set_weights(q_network.get_weights())
for i in range(num_episodes):
# Reset the environment to the initial state and get the initial state
= env.reset()
state = 0
total_points
for t in range(max_num_timesteps):
# From the current state S choose an action A using an Ξ΅-greedy policy
= np.expand_dims(state, axis=0) # state needs to be the right shape for the q_network
state_qn = q_network(state_qn)
q_values = utils.get_action(q_values, epsilon)
action
# Take action A and receive reward R and the next state S'
= env.step(action)
next_state, reward, done, _
# Store experience tuple (S,A,R,S') in the memory buffer.
# We store the done variable as well for convenience.
memory_buffer.append(experience(state, action, reward, next_state, done))
# Only update the network every NUM_STEPS_FOR_UPDATE time steps.
= utils.check_update_conditions(t, NUM_STEPS_FOR_UPDATE, memory_buffer)
update
if update:
# Sample random mini-batch of experience tuples (S,A,R,S') from D
= utils.get_experiences(memory_buffer)
experiences
# Set the y targets, perform a gradient descent step,
# and update the network weights.
agent_learn(experiences, GAMMA)
= next_state.copy()
state += reward
total_points
if done:
break
total_point_history.append(total_points)= np.mean(total_point_history[-num_p_av:])
av_latest_points
# Update the Ξ΅ value
= utils.get_new_eps(epsilon)
epsilon
print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}", end="")
if (i+1) % num_p_av == 0:
print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}")
# We will consider that the environment is solved if we get an
# average of 200 points in the last 100 episodes.
if av_latest_points >= 200.0:
print(f"\n\nEnvironment solved in {i+1} episodes!")
'lunar_lander_model.h5')
q_network.save(break
= time.time() - start
tot_time
print(f"\nTotal Runtime: {tot_time:.2f} s ({(tot_time/60):.2f} min)")
We can plot the total point history along with the moving average to see how our agent improved during training. If you want to know about the different plotting options available in the utils.plot_history
function we encourage you to take a look at the utils
module.
# Plot the total point history along with the moving average
utils.plot_history(total_point_history)
Trained Agent in Action
Now that we have trained our agent, we can see it in action. We will use the utils.create_video
function to create a video of our agent interacting with the environment using the trained πQ-Network. The utils.create_video
function uses the imageio
library to create the video. This library produces some warnings that can be distracting, so, to suppress these warnings we run the code below.
# Suppress warnings from imageio
import logging
logging.getLogger().setLevel(logging.ERROR)
Create Video
In the cell below we create a video of our agent interacting with the Lunar Lander environment using the trained q_network
. The video is saved to the videos
folder with the given filename
. We use the utils.embed_mp4
function to embed the video in the Jupyter Notebook so that we can see it here directly without having to download it.
We should note that since the lunar lander starts with a random initial force applied to its center of mass, every time you run the cell below you will see a different video. If the agent was trained properly, it should be able to land the lunar lander in the landing pad every time, regardless of the initial force applied to its center of mass.
= "./videos/lunar_lander.mp4"
filename
utils.create_video(filename, env, q_network) utils.embed_mp4(filename)