Skip to content

Instantly share code, notes, and snippets.

@lmclupr
Created May 29, 2017 15:03
Show Gist options
  • Save lmclupr/d6a324cb330e887640b2cd852d0a2252 to your computer and use it in GitHub Desktop.
Save lmclupr/d6a324cb330e887640b2cd852d0a2252 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import gym\n",
"import os\n",
"import sys\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from gym import wrappers\n",
"from datetime import datetime\n",
"from sklearn.preprocessing import StandardScaler\n",
"from keras.models import Sequential\n",
"from keras.layers import Dense, Dropout, Activation\n",
"from keras.layers import Embedding\n",
"from keras.optimizers import SGD, RMSprop, Adam, Adamax"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def plot_running_avg(totalrewards):\n",
" N = len(totalrewards)\n",
" running_avg = np.empty(N)\n",
" for t in range(N):\n",
" running_avg[t] = totalrewards[max(0, t-100):(t+1)].mean()\n",
" plt.plot(running_avg)\n",
" plt.title(\"Running Average\")\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"env = gym.make('LunarLander-v2')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# build a set of samples so we can get a scaler fitted.\n",
"observation_samples = []\n",
"\n",
"# play a bunch of games randomly and collect observations\n",
"for n in range(1000):\n",
" observation = env.reset()\n",
" observation_samples.append(observation)\n",
" done = False\n",
" while not done:\n",
" action = np.random.randint(0, env.action_space.n)\n",
" observation, reward, done, _ = env.step(action)\n",
" observation_samples.append(observation)\n",
" \n",
"observation_samples = np.array(observation_samples)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"env = wrappers.Monitor(env, 'monitor-folder', force=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"# Create scaler and fit\n",
"scaler = StandardScaler()\n",
"scaler.fit(observation_samples)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Using the excellent Keras to build standard feedforward neural network.\n",
"# single output node, linear activation on the output\n",
"# To keep things simple, one NN is created per action. So\n",
"# in this problem, 4 independant neural networks are create\n",
"# Admax optimizer is the most efficient one, using default parameters.\n",
"\n",
"def create_nn():\n",
" model = Sequential()\n",
" model.add(Dense(128, init='lecun_uniform', input_shape=(8,)))\n",
" model.add(Activation('relu'))\n",
"# model.add(Dropout(0.3)) #I'm not using dropout, but maybe you wanna give it a try?\n",
"\n",
" model.add(Dense(256, init='lecun_uniform'))\n",
" model.add(Activation('tanh'))\n",
"# model.add(Dropout(0.5))\n",
"\n",
" model.add(Dense(1, init='lecun_uniform'))\n",
" model.add(Activation('linear')) #linear output so we can have range of real-valued outputs\n",
"\n",
"# rms = RMSprop(lr=0.005)\n",
"# sgd = SGD(lr=0.1, decay=0.0, momentum=0.0, nesterov=False)\n",
"# try \"adam\"\n",
"# adam = Adam(lr=0.0005)\n",
" adamax = Adamax() #Adamax(lr=0.001)\n",
" model.compile(loss='mse', optimizer=adamax)\n",
"# model.summary()\n",
" return model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Holds one nn for each action\n",
"class Model:\n",
" def __init__(self, env, scaler):\n",
" self.env = env\n",
" self.scaler = scaler\n",
" self.models = []\n",
" for i in range(env.action_space.n):\n",
" model = create_nn() # one nn per action\n",
" self.models.append(model) \n",
"\n",
" def predict(self, s):\n",
" X = self.scaler.transform(np.atleast_2d(s))\n",
" return np.array([m.predict(np.array(X), verbose=0)[0] for m in self.models])\n",
"\n",
" def update(self, s, a, G):\n",
" X = self.scaler.transform(np.atleast_2d(s))\n",
" self.models[a].fit(np.array(X), np.array([G]), nb_epoch=1, verbose=0)\n",
"\n",
" def sample_action(self, s, eps):\n",
" if np.random.random() < eps:\n",
" return self.env.action_space.sample()\n",
" else:\n",
" return np.argmax(self.predict(s))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def play_one(env, model, eps, gamma):\n",
" observation = env.reset()\n",
" done = False\n",
" full_reward_received = False\n",
" totalreward = 0\n",
" iters = 0\n",
" while not done:\n",
" action = model.sample_action(observation, eps)\n",
" prev_observation = observation\n",
" observation, reward, done, info = env.step(action)\n",
" \n",
" # update the model\n",
" # standard Q learning TD(0)\n",
" next = model.predict(observation)\n",
" G = reward + gamma*np.max(next)\n",
" model.update(prev_observation, action, G)\n",
" totalreward += reward\n",
" iters += 1\n",
" \n",
" return totalreward, iters\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [],
"source": [
"model = Model(env, scaler)\n",
"gamma = 0.99\n",
"\n",
"N = 8010\n",
"totalrewards = np.empty(N)\n",
"costs = np.empty(N)\n",
"for n in range(N):\n",
" eps = 1.0/np.sqrt(n+1)\n",
" totalreward, iters = play_one(env, model, eps, gamma)\n",
" totalrewards[n] = totalreward\n",
" if n % 100 == 0:\n",
" print(\"episode:\", n, \"iters\", iters, \"total reward:\", totalreward, \"eps:\", eps, \"avg reward (last 100):\", totalrewards[max(0, n-100):(n+1)].mean())\n",
" if totalrewards[max(0, n-100):(n+1)].mean() >= 200:\n",
" break\n",
"\n",
"print(\"avg reward for last 100 episodes:\", totalrewards[-100:].mean())\n",
"print(\"total steps:\", totalrewards.sum())\n",
"\n",
"plt.plot(totalrewards)\n",
"plt.title(\"Rewards\")\n",
"plt.show()\n",
"\n",
"plot_running_avg(totalrewards)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"env.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda env:tensorflow]",
"language": "python",
"name": "conda-env-tensorflow-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
@lmclupr
Copy link
Author

lmclupr commented May 29, 2017

Used TD(0) method to solve using the Lunar Lander V1. Used 4 individual feedforward neural networks (one per action) for function approximation using a Keras and Tensorflow stack.

@pablocastilla
Copy link

Clean and concise, superb!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment