Created
October 20, 2016 20:19
-
-
Save chetandhembre/1561bf025f4564477e808affd3bca79d to your computer and use it in GitHub Desktop.
Windy Gride Sarsa Implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#graph solution | |
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_episode_length_sarsa.png | |
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_episode_reward_sarsa.png | |
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_episode_timestamp_sarsa.png | |
# https://dl.dropboxusercontent.com/u/47591917/windy_gride_path_sarsa.png | |
import numpy as np | |
import matplotlib | |
import math | |
import pandas as pd | |
from matplotlib import pyplot as plt | |
UP = 0 | |
DOWN = 1 | |
RIGHT = 2 | |
LEFT = 3 | |
ALLOWED_ACTION = [UP, DOWN, RIGHT, LEFT] | |
class State(object): | |
def __init__(self, x, y): | |
self.x = x | |
self.y = y | |
def __hash__(self): | |
return hash((self.x, self.y)) | |
def __eq__(self, other): | |
return (self.x == other.x) and (self.y == other.y) | |
def __str__(self): | |
return str((self.x, self.y)) | |
class ActionState(object): | |
def __init__(self, state, action): | |
self.state = state | |
self.action = action | |
def __hash__(self): | |
return hash((self.state.x, self.state.y, self.action)) | |
def __eq__(self, other): | |
return (self.state == other.state) and (self.action == other.action) | |
def __str__(self): | |
return str((self.state.x, self.state.y, self.action)) | |
class ValueMap(object): | |
def __init__(self, width, height, discount_factor=1.0, alpha=0.5, epsilon=0.1): | |
self.width = width | |
self.height = height | |
self.state_action_map = {} | |
self.wind_strength = [0, 0, 0, 1, 1, 1, 2, 2, 1, 0] | |
self.discount_factor = discount_factor | |
self.alpha = alpha | |
self.epsilon = epsilon | |
self.initializesd() | |
def initializesd(self): | |
for i in range(self.height): | |
for j in range(self.width): | |
for action in ALLOWED_ACTION: | |
state = State(i, j) | |
action_state = ActionState(state, action) | |
self.state_action_map[action_state] = 0.0 | |
def get_start_position(self): | |
x = np.random.randint(0, 1,size=1)[0] | |
y = np.random.randint(0, self.height,size=1)[0] | |
return State(3, 0) | |
def get_end_position(self): | |
x = np.random.randint(int(7), self.width,size=1)[0] | |
y = np.random.randint(0, self.height,size=1)[0] | |
return State(3, 7) | |
def get_greedy_action(self, state): | |
max_value = - np.inf | |
max_action = None | |
for action in ALLOWED_ACTION: | |
action_state = ActionState(state, action) | |
value = self.state_action_map[action_state] | |
if value > max_value: | |
max_value = value | |
max_action = action | |
return max_action | |
def _get_next_state(self, current_state, current_action): | |
x, y = current_state.x, current_state.y | |
new_x, new_y = x, y | |
if current_action == 0: | |
new_x = x + 1 | |
elif current_action == 1: | |
new_x = x - 1 | |
elif current_action == 2: | |
new_y = y + 1 | |
elif current_action == 3: | |
new_y = y - 1 | |
new_y = max(0, new_y) | |
new_y = min(self.width - 1, new_y) | |
# we suppose to multiply by -1 but they way we are created gride we are just multipling by 1 | |
new_x = new_x + self.wind_strength[y] * 1 | |
new_x = max(0, new_x) | |
new_x = min(self.height - 1, new_x) | |
return State(new_x, new_y) | |
def select_action(self, state, is_greedy=False): | |
actions_probability = np.ones(len(ALLOWED_ACTION)) * self.epsilon / len(ALLOWED_ACTION) | |
greedy_action = self.get_greedy_action(state) | |
if is_greedy: | |
return greedy_action | |
actions_probability[greedy_action] = actions_probability[greedy_action] + (1 - self.epsilon) | |
return np.random.choice(np.arange(len(actions_probability)), p=actions_probability) | |
def get_reward(self, state, action, end): | |
next_state = self._get_next_state(state, action) | |
if next_state == end: | |
return 1 | |
return -1 | |
def get_next_state(self, current_state, current_action, end, is_greedy=False): | |
next_state = self._get_next_state(current_state, current_action) | |
next_action = self.select_action(next_state, is_greedy=is_greedy) | |
reward = self.get_reward(next_state, next_action, end) | |
return next_state, next_action, reward | |
def update_value_map(self, current_state, current_action, next_state, next_action, reward): | |
current_state_action = ActionState(current_state, current_action) | |
old_value = self.state_action_map[current_state_action] | |
next_state_action = ActionState(next_state, next_action) | |
next_value = self.state_action_map[next_state_action] | |
target = reward + self.discount_factor * next_value | |
self.state_action_map[current_state_action] = old_value + float(self.alpha * (target - old_value)) | |
class Game(object): | |
def __init__(self, width, height, no_episodes, discount_factor=1.0, alpha=0.1, epsilon=0.1): | |
self.width = width | |
self.height = height | |
self.value_map = ValueMap(width, height, discount_factor=discount_factor, alpha=alpha, epsilon=epsilon) | |
self.no_episodes = no_episodes | |
self.start = self.value_map.get_start_position() | |
self.end = self.value_map.get_end_position() | |
self.action_per_episodes = [] | |
self.rewards_per_episodes = [] | |
self.last_episode_actions = [] | |
def plot(self): | |
noshow = True | |
# Plot the episode length over time | |
fig1 = plt.figure(figsize=(10,5)) | |
plt.plot(self.action_per_episodes) | |
plt.xlabel("Epsiode") | |
plt.ylabel("Epsiode Length") | |
plt.title("Episode Length over Time") | |
plt.savefig('windy_gride_episode_length_sarsa.png') | |
if noshow: | |
plt.close(fig1) | |
else: | |
plt.show(fig1) | |
# Plot the episode reward over time | |
fig2 = plt.figure(figsize=(10,5)) | |
smoothing_window = 10 | |
rewards_smoothed = pd.Series(self.rewards_per_episodes).rolling(smoothing_window, min_periods=smoothing_window).mean() | |
plt.plot(rewards_smoothed) | |
plt.xlabel("Epsiode") | |
plt.ylabel("Epsiode Reward (Smoothed)") | |
plt.title("Episode Reward over Time (Smoothed over window size {})".format(smoothing_window)) | |
plt.savefig('windy_gride_episode_reward_sarsa.png') | |
if noshow: | |
plt.close(fig2) | |
else: | |
plt.show(fig2) | |
# Plot time steps and episode number | |
fig3 = plt.figure(figsize=(10,5)) | |
plt.plot(np.cumsum(self.action_per_episodes), np.arange(len(self.action_per_episodes))) | |
plt.xlabel("Time Steps") | |
plt.ylabel("Episode") | |
plt.title("Episode per time step") | |
plt.savefig('windy_gride_episode_timestamp_sarsa.png') | |
if noshow: | |
plt.close(fig3) | |
else: | |
plt.show(fig3) | |
plt.plot(self.start.y, self.start.x, 'x', markersize=20) | |
previous_x = self.start.y | |
previous_y = self.start.x | |
for position in self.last_episode_actions[1:]: | |
x, y = position.y, position.x | |
plt.arrow(previous_x, previous_y, x - previous_x, y - previous_y, head_width=0.3, head_length=0.3, overhang=0) | |
plt.plot(x, y, 'o', markersize=5) | |
previous_x = x | |
previous_y = y | |
plt.plot(self.end.y, self.end.x, 'x', markersize=20) | |
axes = plt.gca() | |
axes.set_xticks(range(-2, self.width + 2)) | |
axes.set_yticks(range(-2, self.height + 2)) | |
axes.set_title('path to reach destination') | |
plt.grid() | |
# plt.show() | |
plt.savefig('windy_gride_path_sarsa.png') | |
def play(self): | |
for i in range(self.no_episodes): | |
is_greedy = i == self.no_episodes - 1 | |
current_state = self.start | |
current_action = self.value_map.select_action(current_state, is_greedy=is_greedy) | |
actions = 0 | |
rewards = 0 | |
self.last_episode_actions = [] | |
self.last_episode_actions.append(current_state) | |
while not (current_state == self.end): | |
if actions > 2000: | |
print "episode %s failed" % (str(i)) | |
break | |
next_state, next_action, reward = self.value_map.get_next_state(current_state, current_action, self.end, is_greedy=is_greedy) | |
# print next_state, next_action, self.end | |
self.value_map.update_value_map(current_state, current_action, next_state, next_action, reward) | |
current_state = next_state | |
current_action = next_action | |
actions = actions + 1 | |
rewards = rewards + reward | |
self.last_episode_actions.append(current_state) | |
self.action_per_episodes.append(actions) | |
self.rewards_per_episodes.append(rewards) | |
game = Game(10, 7, 1000) | |
game.play() | |
game.plot() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment