Last active
February 21, 2018 16:49
-
-
Save shashir/0ae1e9caf88900df6b32 to your computer and use it in GitHub Desktop.
QLearner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import random as rand | |
import datetime | |
class QLearner(object): | |
def __init__(self, \ | |
num_states=100, \ | |
num_actions = 4, \ | |
alpha = 0.2, \ | |
gamma = 0.9, \ | |
rar = 0.5, \ | |
radr = 0.99, \ | |
dyna = 0, \ | |
verbose = False): | |
# Initialize fields | |
self.num_states = num_states | |
self.num_actions = num_actions | |
self.alpha = alpha | |
self.gamma = gamma | |
self.rar = rar | |
self.radr = radr | |
self.dyna = dyna | |
self.verbose = verbose | |
# Initial state and action | |
self.s = 0 | |
self.a = 0 | |
# Initialize Q with values ~ Uniform[-1, 1] | |
self.Q = np.random.rand(num_states, num_actions) * 2 - 1 | |
# If Dyna is specified, initialize transition matrix, transition count matrix, and reward matrix. | |
if self.dyna != 0: | |
self.Tc = 0.00001 * np.ones((num_states, num_actions, num_states)) | |
self.T = self.Tc / self.Tc.sum(axis=2, keepdims=True) | |
self.R = -1.0 * np.ones((num_states, num_actions)) | |
def querysetstate(self, s): | |
""" | |
@summary: Update the state without updating the Q-table | |
@param s: The new state | |
@returns: The selected action | |
""" | |
self.s = s | |
# Draw action from state s such that the quality Q is maximized. | |
action = np.argmax(self.Q[s, :]) | |
if self.verbose: print "s =", s,"a =",action | |
return action | |
def query(self,s_prime,r): | |
""" | |
@summary: Update the Q table and return an action | |
@param s_prime: The new state | |
@param r: The ne state | |
@returns: The selected action | |
""" | |
# Update Q[s, a] <- (1 - alpha) * Q[s, a] + alpha * (r + gamma + max_a'(Q[s', a'])) | |
self.Q[self.s, self.a] = (1 - self.alpha) * self.Q[self.s, self.a] + \ | |
self.alpha * (r + self.gamma * np.max(self.Q[s_prime,:])) | |
# If Dyna is specified, update transition model. | |
if self.dyna != 0: | |
# Increment count of current transition. | |
self.Tc[self.s, self.a, s_prime] += 1 | |
# Normalize all counts to produce the correct probabilities in the transition matrix. | |
self.T[self.s, self.a, :] = self.Tc[self.s, self.a, :] / self.Tc[self.s, self.a, :].sum() | |
# Update rewards R[s, a] <- (1 - alpha) * R[s, a] + alpha * r | |
self.R[self.s, self.a] = (1 - self.alpha) * self.R[self.s, self.a] + \ | |
self.alpha * r | |
# Now run dyna, | |
self._run_dyna() | |
action = None | |
# Choose random action with probability rar or draw action from the transition model. | |
if rand.random() > self.rar: | |
action = np.argmax(self.Q[s_prime, :]) | |
else: | |
action = rand.randrange(self.num_actions) | |
# Decay rar. | |
self.rar *= self.radr | |
if self.verbose: print "s =", s_prime,"a =",action,"r =",r | |
self.s = s_prime | |
self.a = action | |
return action | |
def _run_dyna(self): | |
# Generate state and action samples to speed up hallucination. | |
s_samples = np.random.randint(0, self.num_states, self.dyna) | |
a_samples = np.random.randint(0, self.num_actions, self.dyna) | |
# For each sample... | |
for i in range(self.dyna): | |
s = s_samples[i] | |
a = a_samples[i] | |
# Simulate an action with the transition model and land on an s_prime | |
s_prime = np.argmax(np.random.multinomial(1, self.T[s, a, :])) | |
# Compute reward of simulated action. | |
r = self.R[s, a] | |
# Update Q | |
self.Q[s, a] = (1 - self.alpha) * self.Q[s, a] + \ | |
self.alpha * (r + self.gamma * np.max(self.Q[s_prime,:])) | |
if __name__=="__main__": | |
rand.seed(1035715) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment