Created
June 8, 2017 22:18
-
-
Save kkweon/52ea1e118101eb574b2a83b933851379 to your computer and use it in GitHub Desktop.
PyTorch DQN implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
DQN in PyTorch | |
""" | |
import argparse | |
import torch | |
import torch.nn | |
import numpy as np | |
import random | |
import gym | |
from collections import namedtuple | |
from collections import deque | |
from typing import List, Tuple | |
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument("--gamma", | |
type=float, | |
default=0.99, | |
help="Discount rate for Q_target") | |
parser.add_argument("--env", | |
type=str, | |
default="CartPole-v0", | |
help="Gym environment name") | |
parser.add_argument("--n-episode", | |
type=int, | |
default=1000, | |
help="Number of epsidoes to run") | |
parser.add_argument("--batch-size", | |
type=int, | |
default=64, | |
help="Mini-batch size") | |
parser.add_argument("--hidden-dim", | |
type=int, | |
default=12, | |
help="Hidden dimension") | |
parser.add_argument("--capacity", | |
type=int, | |
default=50000, | |
help="Replay memory capacity") | |
parser.add_argument("--max-episode", | |
type=int, | |
default=50, | |
help="e-Greedy target episode (eps will be the lowest at this episode)") | |
parser.add_argument("--min-eps", | |
type=float, | |
default=0.01, | |
help="Min epsilon") | |
FLAGS = parser.parse_args() | |
class DQN(torch.nn.Module): | |
def __init__(self, input_dim: int, output_dim: int, hidden_dim: int) -> None: | |
"""DQN Network | |
Args: | |
input_dim (int): `state` dimension. | |
`state` is 2-D tensor of shape (n, input_dim) | |
output_dim (int): Number of actions. | |
Q_value is 2-D tensor of shape (n, output_dim) | |
hidden_dim (int): Hidden dimension in fc layer | |
""" | |
super(DQN, self).__init__() | |
self.layer1 = torch.nn.Sequential( | |
torch.nn.Linear(input_dim, hidden_dim), | |
torch.nn.BatchNorm1d(hidden_dim), | |
torch.nn.PReLU() | |
) | |
self.layer2 = torch.nn.Sequential( | |
torch.nn.Linear(hidden_dim, hidden_dim), | |
torch.nn.BatchNorm1d(hidden_dim), | |
torch.nn.PReLU() | |
) | |
self.final = torch.nn.Linear(hidden_dim, output_dim) | |
def forward(self, x: torch.Tensor) -> torch.Tensor: | |
"""Returns a Q_value | |
Args: | |
x (torch.Tensor): `State` 2-D tensor of shape (n, input_dim) | |
Returns: | |
torch.Tensor: Q_value, 2-D tensor of shape (n, output_dim) | |
""" | |
x = self.layer1(x) | |
x = self.layer2(x) | |
x = self.final(x) | |
return x | |
Transition = namedtuple("Transition", | |
field_names=["state", "action", "reward", "next_state", "done"]) | |
class ReplayMemory(object): | |
def __init__(self, capacity: int) -> None: | |
"""Replay memory class | |
Args: | |
capacity (int): Max size of this memory | |
""" | |
self.capacity = capacity | |
self.cursor = 0 | |
self.memory = [] | |
def push(self, | |
state: np.ndarray, | |
action: int, | |
reward: int, | |
next_state: np.ndarray, | |
done: bool) -> None: | |
"""Creates `Transition` and insert | |
Args: | |
state (np.ndarray): 1-D tensor of shape (input_dim,) | |
action (int): action index (0 <= action < output_dim) | |
reward (int): reward value | |
next_state (np.ndarray): 1-D tensor of shape (input_dim,) | |
done (bool): whether this state was last step | |
""" | |
if len(self) < self.capacity: | |
self.memory.append(None) | |
self.memory[self.cursor] = Transition(state, | |
action, reward, next_state, done) | |
self.cursor = (self.cursor + 1) % self.capacity | |
def pop(self, batch_size: int) -> List[Transition]: | |
"""Returns a minibatch of `Transition` randomly | |
Args: | |
batch_size (int): Size of mini-bach | |
Returns: | |
List[Transition]: Minibatch of `Transition` | |
""" | |
return random.sample(self.memory, batch_size) | |
def __len__(self) -> int: | |
"""Returns the length """ | |
return len(self.memory) | |
class Agent(object): | |
def __init__(self, input_dim: int, output_dim: int, hidden_dim: int) -> None: | |
"""Agent class that choose action and train | |
Args: | |
input_dim (int): input dimension | |
output_dim (int): output dimension | |
hidden_dim (int): hidden dimension | |
""" | |
self.dqn = DQN(input_dim, output_dim, hidden_dim) | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
self.loss_fn = torch.nn.MSELoss() | |
self.optim = torch.optim.Adam(self.dqn.parameters()) | |
def _to_variable(self, x: np.ndarray) -> torch.Tensor: | |
"""torch.Variable syntax helper | |
Args: | |
x (np.ndarray): 2-D tensor of shape (n, input_dim) | |
Returns: | |
torch.Tensor: torch variable | |
""" | |
return torch.autograd.Variable(torch.Tensor(x)) | |
def get_action(self, states: np.ndarray, eps: float) -> int: | |
"""Returns an action | |
Args: | |
states (np.ndarray): 2-D tensor of shape (n, input_dim) | |
eps (float): ๐บ-greedy for exploration | |
Returns: | |
int: action index | |
""" | |
if np.random.rand() < eps: | |
return np.random.choice(self.output_dim) | |
else: | |
self.dqn.train(mode=False) | |
scores = self.get_Q(states) | |
_, argmax = torch.max(scores.data, 1) | |
return int(argmax.numpy()) | |
def get_Q(self, states: np.ndarray) -> torch.FloatTensor: | |
"""Returns `Q-value` | |
Args: | |
states (np.ndarray): 2-D Tensor of shape (n, input_dim) | |
Returns: | |
torch.FloatTensor: 2-D Tensor of shape (n, output_dim) | |
""" | |
states = self._to_variable(states.reshape(-1, self.input_dim)) | |
self.dqn.train(mode=False) | |
return self.dqn(states) | |
def train(self, Q_pred: torch.FloatTensor, Q_true: torch.FloatTensor) -> float: | |
"""Computes `loss` and backpropagation | |
Args: | |
Q_pred (torch.FloatTensor): Predicted value by the network, | |
2-D Tensor of shape(n, output_dim) | |
Q_true (torch.FloatTensor): Target value obtained from the game, | |
2-D Tensor of shape(n, output_dim) | |
Returns: | |
float: loss value | |
""" | |
self.dqn.train(mode=True) | |
self.optim.zero_grad() | |
loss = self.loss_fn(Q_pred, Q_true) | |
loss.backward() | |
self.optim.step() | |
return loss | |
def train_helper(agent: Agent, minibatch: List[Transition], gamma: float) -> float: | |
"""Prepare minibatch and train them | |
Args: | |
agent (Agent): Agent has `train(Q_pred, Q_true)` method | |
minibatch (List[Transition]): Minibatch of `Transition` | |
gamma (float): Discount rate of Q_target | |
Returns: | |
float: Loss value | |
""" | |
states = np.vstack([x.state for x in minibatch]) | |
actions = np.array([x.action for x in minibatch]) | |
rewards = np.array([x.reward for x in minibatch]) | |
next_states = np.vstack([x.next_state for x in minibatch]) | |
done = np.array([x.done for x in minibatch]) | |
Q_predict = agent.get_Q(states) | |
Q_target = Q_predict.clone().data.numpy() | |
Q_target[np.arange(len(Q_target)), actions] = rewards + gamma * np.max(agent.get_Q(next_states).data.numpy(), axis=1) * ~done | |
Q_target = agent._to_variable(Q_target) | |
return agent.train(Q_predict, Q_target) | |
def play_episode(env: gym.Env, | |
agent: Agent, | |
replay_memory: ReplayMemory, | |
eps: float, | |
batch_size: int) -> int: | |
"""Play an epsiode and train | |
Args: | |
env (gym.Env): gym environment (CartPole-v0) | |
agent (Agent): agent will train and get action | |
replay_memory (ReplayMemory): trajectory is saved here | |
eps (float): ๐บ-greedy for exploration | |
batch_size (int): batch size | |
Returns: | |
int: reward earned in this episode | |
""" | |
s = env.reset() | |
done = False | |
total_reward = 0 | |
while not done: | |
a = agent.get_action(s, eps) | |
s2, r, done, info = env.step(a) | |
total_reward += r | |
if done: | |
r = -1 | |
replay_memory.push(s, a, r, s2, done) | |
if len(replay_memory) > batch_size: | |
minibatch = replay_memory.pop(batch_size) | |
train_helper(agent, minibatch, FLAGS.gamma) | |
s = s2 | |
return total_reward | |
def get_env_dim(env: gym.Env) -> Tuple[int, int]: | |
"""Returns input_dim & output_dim | |
Args: | |
env (gym.Env): gym Environment (CartPole-v0) | |
Returns: | |
int: input_dim | |
int: output_dim | |
""" | |
input_dim = env.observation_space.shape[0] | |
output_dim = env.action_space.n | |
return input_dim, output_dim | |
def epsilon_annealing(epsiode: int, max_episode: int, min_eps: float) -> float: | |
"""Returns ๐บ-greedy | |
1.0---|\ | |
| \ | |
| \ | |
min_e +---+-------> | |
| | |
max_episode | |
Args: | |
epsiode (int): Current episode (0<= episode) | |
max_episode (int): After max episode, ๐บ will be `min_eps` | |
min_eps (float): ๐บ will never go below this value | |
Returns: | |
float: ๐บ value | |
""" | |
slope = (min_eps - 1.0) / max_episode | |
return max(slope * epsiode + 1.0, min_eps) | |
def main(): | |
"""Main | |
""" | |
try: | |
env = gym.make(FLAGS.env) | |
env = gym.wrappers.Monitor(env, directory="monitors", force=True) | |
rewards = deque(maxlen=100) | |
input_dim, output_dim = get_env_dim(env) | |
agent = Agent(input_dim, output_dim, FLAGS.hidden_dim) | |
replay_memory = ReplayMemory(FLAGS.capacity) | |
for i in range(FLAGS.n_episode): | |
eps = epsilon_annealing(i, FLAGS.max_episode, FLAGS.min_eps) | |
r = play_episode(env, agent, replay_memory, eps, FLAGS.batch_size) | |
print("[Episode: {:5}] Reward: {:5} ๐บ-greedy: {:5.2f}".format(i + 1, r, eps)) | |
rewards.append(r) | |
if len(rewards) == rewards.maxlen: | |
if np.mean(rewards) >= 200: | |
print("Game cleared in {} games with {}".format(i + 1, np.mean(rewards))) | |
break | |
finally: | |
env.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @kkweon. I'm working on an OpenAI gym environment and would like to include an example agent implementation in my repo. Would it be OK if I modify and redistribute this code? If so, under what license?