Last active
May 10, 2021 02:25
-
-
Save Pocuston/2d61d64b6db47b3963864d84a8eb8552 to your computer and use it in GitHub Desktop.
Solution of Open AI gym environment "Cartpole-v0" (https://gym.openai.com/envs/CartPole-v0) using DQN and Pytorch. It is is slightly modified version of Pytorch DQN tutorial from http://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html. For results see: https://gym.openai.com/evaluations/eval_KYrmuUX4TWGOWYsJl8i6Kg.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Solution of Open AI gym environment "Cartpole-v0" (https://gym.openai.com/envs/CartPole-v0) using DQN and Pytorch. | |
# It is is slightly modified version of Pytorch DQN tutorial from | |
# http://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html. | |
# The main difference is that it does not take rendered screen as input but it simply uses observation values from the \ | |
# environment. | |
import gym | |
from gym import wrappers | |
import random | |
import math | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.autograd import Variable | |
import torch.nn.functional as F | |
import matplotlib.pyplot as plt | |
# hyper parameters | |
EPISODES = 600 # number of episodes | |
EPS_START = 0.9 # e-greedy threshold start value | |
EPS_END = 0.01 # e-greedy threshold end value | |
EPS_DECAY = 200 # e-greedy threshold decay | |
GAMMA = 0.80 # Q-learning discount factor | |
LR = 0.0005 # NN optimizer learning rate | |
HIDDEN_LAYER = 24 # NN hidden layer size | |
BATCH_SIZE = 128 # Q-learning batch size | |
# if gpu is to be used | |
use_cuda = torch.cuda.is_available() | |
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor | |
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor | |
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor | |
Tensor = FloatTensor | |
class ReplayMemory: | |
def __init__(self, capacity): | |
self.capacity = capacity | |
self.memory = [] | |
def push(self, transition): | |
self.memory.append(transition) | |
if len(self.memory) > self.capacity: | |
del self.memory[0] | |
def sample(self, batch_size): | |
return random.sample(self.memory, batch_size) | |
def __len__(self): | |
return len(self.memory) | |
class Network(nn.Module): | |
def __init__(self): | |
nn.Module.__init__(self) | |
self.l1 = nn.Linear(4, HIDDEN_LAYER) | |
self.l2 = nn.Linear(HIDDEN_LAYER, 16) | |
self.l3 = nn.Linear(16, 2) | |
def forward(self, x): | |
x = F.relu(self.l1(x)) | |
x = F.relu(self.l2(x)) | |
x = self.l3(x) | |
return x | |
env = gym.make('CartPole-v0') | |
env = wrappers.Monitor(env, './tmp/cartpole-v0-1', force=True) | |
model = Network() | |
if use_cuda: | |
model.cuda() | |
memory = ReplayMemory(10000) | |
optimizer = optim.Adam(model.parameters(), LR) | |
steps_done = 0 | |
episode_durations = [] | |
def select_action(state): | |
global steps_done | |
sample = random.random() | |
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY) | |
steps_done += 1 | |
if sample > eps_threshold: | |
return model(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1].view(1, 1) | |
else: | |
return LongTensor([[random.randrange(2)]]) | |
def run_episode(e, environment): | |
state = environment.reset() | |
steps = 0 | |
while True: | |
steps += 1 | |
environment.render() | |
action = select_action(FloatTensor([state])) | |
next_state, reward, done, _ = environment.step(action[0, 0]) | |
# zero reward when attempt ends | |
if done and steps < 200: | |
reward = 0 | |
memory.push((FloatTensor([state]), | |
action, # action is already a tensor | |
FloatTensor([next_state]), | |
FloatTensor([reward]))) | |
learn() | |
state = next_state | |
if done: | |
print("{2} Episode {0} finished after {1} steps" | |
.format(e, steps, '\033[92m' if steps >= 195 else '\033[99m')) | |
episode_durations.append(steps) | |
plot_durations() | |
break | |
def learn(): | |
if len(memory) < BATCH_SIZE: | |
return | |
# random transition batch is taken from experience replay memory | |
transitions = memory.sample(BATCH_SIZE) | |
batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions) | |
batch_state = Variable(torch.cat(batch_state)) | |
batch_action = Variable(torch.cat(batch_action)) | |
batch_reward = Variable(torch.cat(batch_reward)) | |
batch_next_state = Variable(torch.cat(batch_next_state)) | |
# current Q values are estimated by NN for all actions | |
current_q_values = model(batch_state).gather(1, batch_action) | |
# expected Q values are estimated from actions which gives maximum Q value | |
max_next_q_values = model(batch_next_state).detach().max(1)[0] | |
expected_q_values = batch_reward + (GAMMA * max_next_q_values) | |
# loss is measured from error between current and newly expected Q values | |
loss = F.smooth_l1_loss(current_q_values, expected_q_values) | |
# backpropagation of loss to NN | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
def plot_durations(): | |
plt.figure(2) | |
plt.clf() | |
durations_t = torch.FloatTensor(episode_durations) | |
plt.title('Training...') | |
plt.xlabel('Episode') | |
plt.ylabel('Duration') | |
plt.plot(durations_t.numpy()) | |
# take 100 episode averages and plot them too | |
if len(durations_t) >= 100: | |
means = durations_t.unfold(0, 100, 1).mean(1).view(-1) | |
means = torch.cat((torch.zeros(99), means)) | |
plt.plot(means.numpy()) | |
plt.pause(0.001) # pause a bit so that plots are updated | |
for e in range(EPISODES): | |
run_episode(e, env) | |
print('Complete') | |
env.render(close=True) | |
env.close() | |
plt.ioff() | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment