Last active
October 12, 2016 18:20
-
-
Save chetandhembre/0e2709cc98b878c2d17cb562a371212a to your computer and use it in GitHub Desktop.
SARSA on policy learning
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!#/usr/bin/python2 | |
import numpy as np | |
import matplotlib | |
import math | |
from matplotlib import pyplot as plt | |
from mpl_toolkits.mplot3d import Axes3D | |
import matplotlib.mlab as mlab | |
from matplotlib.patches import Circle, Wedge, Polygon | |
from matplotlib.collections import PatchCollection | |
import matplotlib.patches as patches | |
""" | |
1: right | |
2: left | |
3: up | |
4: down | |
5: right-up | |
6: left-up | |
7: right-down | |
8: left-down | |
""" | |
ALLOWED_DIRECTIONS = [1, 2, 3, 4, 5, 6, 7, 8] | |
ALLOWED_VELOCITY_VALUE = [1, 2, 3, 4, 5] | |
HEIGHT = 5 | |
WIDTH = 10 | |
MAX_VELOCITY = 5 | |
def create_number_tiles_row(): | |
start = int(math.ceil(np.random.uniform(1, int(WIDTH / 2)))) | |
end = int(math.ceil(np.random.uniform(start, WIDTH + 1))) | |
return start, end | |
def get_next_position(x1, y1, v, d): | |
if d == 1: | |
#right | |
x1 = x1 + v | |
elif d == 2: | |
#left | |
x1 = x1 - v | |
elif d == 3: | |
# up | |
y1 = y1 - v | |
elif d == 4: | |
# down | |
y1 = y1 + v | |
elif d == 5: | |
# right-up | |
x1 = x1 + v | |
y1 = y1 - v | |
elif d == 6: | |
# left-up | |
x1 = x1 - v | |
y1 = y1 - v | |
elif d == 7: | |
# right-down | |
x1 = x1 + v | |
y1 = y1 + v | |
else: | |
# left-down | |
x1 = x1 - v | |
y1 = y1 + v | |
return x1, y1 | |
class State(object): | |
def __init__(self, x, y): | |
self.x = x | |
self.y = y | |
def __hash__(self): | |
return hash((self.x, self.y)) | |
def __eq__(self, other): | |
return (self.x == other.x) and (self.y == other.y) | |
def __str__(self): | |
return str((self.x, self.y)) | |
class Action(object): | |
def __init__(self, velocity, direction): | |
self.velocity = velocity | |
self.direction = direction | |
def __eq__(self, other): | |
return (self.velocity == other.velocity) and (self.direction == other.direction) | |
def __hash__(self): | |
return hash((self.velocity, self.direction)) | |
def __str__(self): | |
return str((self.velocity, self.direction)) | |
class StateAction(object): | |
def __init__(self, state, action): | |
self.state = state | |
self.action = action | |
def __eq__(self, other): | |
return self.state == other.state and self.action == other.action | |
def __hash__(self): | |
return hash((self.state, self.action)) | |
def __str__(self): | |
return str((str(self.state), str(self.action))) | |
class GridRoad(object): | |
def __init__(self, width=WIDTH, height=HEIGHT): | |
self.width = width | |
self.height = height | |
self.value_map = {} | |
self.allowed_location = np.zeros((height, width)) | |
self.state_action_visited = {} | |
self.state_visited = np.zeros((height, width)) | |
self.N0 = 500 | |
for i in range(height): | |
start, end = create_number_tiles_row() | |
self.allowed_location[i][int(start): int(end)] = 1 | |
print self.allowed_location | |
self.initialized() | |
def is_allowed_poisition(self, position): | |
x, y = position | |
if x > self.height - 1 or y > self.width - 1 or x < 0 or y < 0: | |
return False | |
return self.allowed_location[x][y] | |
def initialized(self): | |
for i in range(self.height): | |
for j in range(self.width): | |
for v in ALLOWED_VELOCITY_VALUE: | |
for d in ALLOWED_DIRECTIONS: | |
state = State(i, j) | |
action = Action(v, d) | |
state_action = StateAction(state, action) | |
if self.allowed_location[i][j] == 0: | |
self.value_map[state_action] = -np.inf | |
else: | |
self.value_map[state_action] = 0 | |
self.state_action_visited[state_action] = 0 | |
def select_start_end(self): | |
for i in range(self.width): | |
if self.is_allowed_poisition((0, i)): | |
start = (0, i) | |
break | |
for i in range(self.width - 1, -1, -1): | |
if self.is_allowed_poisition((self.height - 1, i)): | |
end = (self.height - 1, i) | |
break | |
return start, end | |
def get_epsilon(self, position): | |
x, y = position | |
return self.N0 / float(self.N0 + (self.state_visited[x][y])) | |
def get_allowed_velocity(self, current_action=None): | |
if not current_action: | |
return [1] | |
v = current_action.velocity | |
allowed_velocity = [v, v - 1, v + 1] | |
return allowed_velocity | |
def get_intermediate_positions_allowed(self, current_position, current_action): | |
positions = [] | |
position = current_position | |
x, y = position | |
v, d = current_action.velocity, current_action.direction | |
x, y = get_next_position(x, y, 1, d) | |
x1, y1 = get_next_position(x, y, v, d) | |
while not (x, y) == (x1, y1): | |
if not self.is_allowed_poisition((x, y)): | |
return False | |
x, y = get_next_position(x, y, 1, d) | |
return True | |
def get_greedy_action(self, position, current_action): | |
x, y = position | |
state = State(x, y) | |
max_action_val = - np.inf | |
max_action = None | |
allowed_velocity = self.get_allowed_velocity(current_action=current_action) | |
actions = [] | |
is_same = True | |
for v in allowed_velocity: | |
if v > MAX_VELOCITY or v < 1: | |
continue | |
for d in ALLOWED_DIRECTIONS: | |
action = Action(v, d) | |
if not self.get_intermediate_positions_allowed(position, action): | |
continue | |
state_action = StateAction(state, action) | |
val = self.value_map[state_action] | |
if max_action_val <= val: | |
max_action = action | |
max_action_val = val | |
return max_action | |
def select_action(self, position, current_action=None, is_greedy=False, is_q_learning=False, is_next_action=False): | |
epsilon = self.get_epsilon(position) | |
actions = [] | |
allowed_velocity = self.get_allowed_velocity(current_action=current_action) | |
for v in allowed_velocity: | |
if v > MAX_VELOCITY or v < 1: | |
continue | |
for d in ALLOWED_DIRECTIONS: | |
action = Action(v, d) | |
if not self.get_intermediate_positions_allowed(position, action): | |
continue | |
actions.append(action) | |
if len(actions) == 0: | |
return None | |
action_probability = np.full(len(actions), epsilon / len(actions)) | |
greedy_action = self.get_greedy_action(position, current_action=current_action) | |
if is_greedy or (is_next_action and is_q_learning): | |
return greedy_action | |
action_probability[actions.index(greedy_action)] = action_probability[actions.index(greedy_action)] + (1 - epsilon) | |
return actions[np.random.choice(np.arange(len(action_probability)), p=action_probability)] | |
def get_reward(self, position): | |
if not self.is_allowed_poisition(position): | |
return -5 | |
return -1 | |
def take_action(self, position, action): | |
x, y = position | |
x1, y1 = x, y | |
v, d = action.velocity, action.direction | |
x1, y1 = get_next_position(x1, y1, v, d) | |
is_allowed = self.is_allowed_poisition((x1, y1)) | |
reward = self.get_reward(position) | |
return (x1, y1), reward, is_allowed | |
def visited_state_action(self, position, action): | |
x, y = position | |
state = State(x, y) | |
self.state_visited[x][y] = self.state_visited[x][y] + 1 | |
if action is None: | |
return | |
action_state = StateAction(state, action) | |
self.state_action_visited[action_state] = self.state_action_visited[action_state] + 1 | |
def get_alpha(self, position, action): | |
x, y = position | |
state = State(x, y) | |
state_action = StateAction(state, action) | |
return float(1 / self.state_action_visited[state_action]) | |
def get_new_val(gamma, current_position, current_action, reward, next_position, next_action): | |
pass | |
def update_state_action_value(self, gamma, current_position, current_action, reward, next_position, next_action): | |
alpha = self.get_alpha(current_position, current_action) | |
x, y = current_position | |
current_state_action = StateAction(State(x, y), current_action) | |
current_val = self.value_map[current_state_action] | |
x1, y1 = next_position | |
is_allowed = self.is_allowed_poisition(next_position) | |
next_val = - np.inf | |
if is_allowed: | |
next_state_action = StateAction(State(x1, y1), next_action) | |
next_val = self.value_map[next_state_action] | |
new_val = current_val + alpha * (reward + (gamma * next_val) - current_val) | |
if math.isnan(new_val): | |
return False | |
if not math.isnan(new_val): | |
self.value_map[current_state_action] = new_val | |
return True | |
class Game(object): | |
def __init__(self, gamma, no_episode, is_q_learning=False): | |
self.gamma = gamma | |
self.grid_road = GridRoad() | |
self.no_episode = no_episode | |
self.start, self.end = self.grid_road.select_start_end() | |
self.episode_visits = {} | |
self.is_q_learning = is_q_learning | |
def plot(self): | |
total_plots = len(self.episode_visits.keys()) | |
self.episode_visits[total_plots] = self.episode(is_greedy=True) | |
total_plots = total_plots + 1 | |
print self.episode_visits.keys() | |
cols = 5 | |
print total_plots / cols | |
rows = int(math.ceil(total_plots / cols)) if int(math.ceil(total_plots / cols)) > 0 else 2 | |
episodes = 0 | |
rows = 3 | |
fig, axs = plt.subplots(rows, cols) | |
plt.figure(1, figsize=(50,50)) | |
for i in range(rows): | |
for j in range(cols): | |
if episodes > total_plots - 1: | |
break | |
visited_positions = self.episode_visits[episodes] | |
x = [] | |
y = [] | |
ax = axs[i][j] | |
ax.plot(self.start[0], self.start[1], 'x', markersize=20) | |
previous_x = self.start[0] | |
previous_y = self.start[1] | |
for position in visited_positions[1:]: | |
x, y = position | |
ax.arrow(previous_x, previous_y, x - previous_x, y - previous_y, head_width=0.3, head_length=0.3, overhang=-1) | |
ax.plot(x, y, 'o', markersize=5) | |
previous_x = x | |
previous_y = y | |
ax.plot(self.end[0], self.end[1], 'x', markersize=20) | |
height, width = self.grid_road.allowed_location.shape | |
ax.add_patch( | |
patches.Rectangle( | |
(0, 0), # (x,y) | |
HEIGHT, # width | |
WIDTH, # height | |
alpha=0.1 | |
) | |
) | |
for ii in range(height): | |
for jj in range(width): | |
if self.grid_road.allowed_location[ii][jj] == 0: | |
ax.plot(ii, jj, '^', markersize=5, c='red') | |
ax.set_xticks(np.arange(-WIDTH,2 * WIDTH + 1,1)) | |
ax.set_yticks(np.arange(-HEIGHT,2 * HEIGHT + 1,1)) | |
ax.set_title('episode ' + str(episodes * self.PLOT_FREQUENCY + 1)) | |
ax.set_autoscale_on(False) | |
ax.grid() | |
episodes = episodes + 1 | |
print "not visited!!!" | |
for i in range(HEIGHT): | |
for j in range(WIDTH): | |
if self.grid_road.state_visited[i][j] == 0 and self.grid_road.allowed_location[i][j] == 1: | |
print i, j | |
plt.show() | |
def episode(self, is_greedy=False): | |
moves = 0 | |
current_position = self.start | |
current_action = None | |
visited_positions = [] | |
while moves < HEIGHT * WIDTH * 2: | |
moves = moves + 1 | |
visited_positions.append(current_position) | |
previous_action = current_action | |
current_action = self.grid_road.select_action(current_position, current_action=current_action, is_greedy=is_greedy, is_q_learning=self.is_q_learning) | |
self.grid_road.visited_state_action(current_position, current_action) | |
if State(current_position[0], current_position[1]) == State(self.end[0], self.end[1]): | |
if is_greedy: | |
print current_position, current_action, next_position, next_action | |
print "breaking" | |
break | |
if current_action is None: | |
print "can not process" | |
break | |
next_position, reward, is_allowed = self.grid_road.take_action(current_position, current_action) | |
if State(next_position[0], next_position[1]) == State(self.end[0], self.end[1]): | |
reward = 1 | |
next_action = None | |
if is_allowed: | |
next_action = self.grid_road.select_action(next_position, current_action=current_action, is_greedy=is_greedy, is_q_learning=self.is_q_learning, is_next_action=True) | |
result = self.grid_road.update_state_action_value(self.gamma, current_position, current_action, reward, next_position, next_action) | |
if not is_allowed or not result: | |
if is_greedy: | |
print current_position, current_action, next_position, next_action | |
print "as" | |
break | |
current_position = next_position | |
return visited_positions | |
def play(self): | |
print self.start, self.end | |
self.PLOT_FREQUENCY = int(self.no_episode / 10) | |
for i in range(self.no_episode): | |
visited_positions = self.episode() | |
if i % self.PLOT_FREQUENCY == 0: | |
self.episode_visits[int(i / self.PLOT_FREQUENCY)] = visited_positions | |
game = Game(0.90, 10000, is_q_learning=True) | |
game.play() | |
game.plot() | |
game = Game(0.90, 10000, is_q_learning=False) | |
game.play() | |
game.plot() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment