Created
February 18, 2021 04:50
-
-
Save kngwyu/2660a9fed7eab75e55609516e0822d02 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dataclasses | |
from typing import Iterable, Optional, Protocol, Tuple | |
import numpy as np | |
Array = np.ndarray | |
@dataclasses.dataclass(frozen=True) | |
class Transition: | |
state: int | |
reward: float | |
class TabularMDP(Protocol): | |
n_states: int | |
n_actions: int | |
def step(self, action: int) -> Transition: | |
pass | |
def reset(self, seed: Optional[int] = None) -> int: | |
pass | |
class OneCycleMDP(TabularMDP): | |
def __init__( | |
self, | |
n_states: int, | |
noise_prob: float = 0.1, | |
seed: int = 0, | |
) -> None: | |
self.n_states = n_states | |
self.n_actions = 2 | |
self._current_state = 0 | |
self._noise_prob = noise_prob | |
self._seed = seed | |
self._rng = np.random.RandomState(seed) | |
def step(self, action: int) -> Transition: | |
assert action in [0, 1], f"Invalid Action {action}" | |
if self._current_state == self.n_states - 1 and action == 1: | |
reward = 1.0 | |
else: | |
reward = 0.0 | |
perturbation = self._rng.uniform() < self._noise_prob | |
if (action == 1) ^ perturbation: | |
self._current_state = (self._current_state + 1) % self.n_states | |
return Transition(self._current_state, reward) | |
def reset(self, seed: Optional[int] = None) -> int: | |
if seed is not None: | |
self._seed = seed | |
self._current_state = 0 | |
self._rng = np.random.RandomState(self._seed) | |
return self._current_state | |
def p_and_r(self) -> Tuple[Array, Array]: | |
p = np.zeros((self.n_states, self.n_actions, self.n_states)) | |
for i in range(self.n_states): | |
i1 = (i + 1) % self.n_states | |
p[i][0][i] = 1.0 - self._noise_prob | |
p[i][0][i1] = self._noise_prob | |
p[i][1][i] = self._noise_prob | |
p[i][1][i1] = 1.0 - self._noise_prob | |
r = np.zeros((self.n_states, self.n_actions)) | |
r[-1][1] = 1.0 | |
return p, r | |
def n_optimal_states(self, pi: Array) -> int: | |
if len(pi.shape) == 1: # Deterministic | |
return np.sum(pi == 1) | |
elif len(pi.shape) == 2: | |
THRESHOLD = 1e-3 | |
return np.sum((pi[:, 1] + THRESHOLD) > 1.0) | |
else: | |
raise ValueError(f"Unsupported shape: {pi.shape}") | |
class TwoCycleMDP(TabularMDP): | |
def __init__( | |
self, | |
n_left_states: int, | |
noise_prob: float = 0.1, | |
seed: int = 0, | |
) -> None: | |
self.n_states = n_left_states * 3 | |
self.n_actions = 2 | |
self._starting_state = n_left_states | |
self._current_state = n_left_states | |
self._noise_prob = noise_prob | |
self._seed = seed | |
self._rng = np.random.RandomState(seed) | |
def step(self, action: int) -> Transition: | |
assert action in [0, 1], f"Invalid Action {action}" | |
if action == 1 and self._current_state == self.n_states - 1: | |
reward = 3.0 | |
elif action == 0 and self._current_state == 0: | |
reward = 1.0 | |
else: | |
reward = 0.0 | |
goes_right = (action == 1) ^ (self._rng.uniform() < self._noise_prob) | |
if self._current_state == self._starting_state: # Center | |
if goes_right: | |
self._current_state += 1 | |
else: | |
self._current_state -= 1 | |
elif self._current_state < self._starting_state: # Left | |
if not goes_right: | |
if self._current_state < 0: | |
self._current_state = self._starting_state | |
else: | |
self._current_state -= 1 | |
else: # Right | |
if goes_right: | |
if self._current_state == self.n_states - 1: | |
self._current_state = self._starting_state | |
else: | |
self._current_state += 1 | |
return Transition(self._current_state, reward) | |
def reset(self, seed: Optional[int] = None) -> int: | |
if seed is not None: | |
self._seed = seed | |
self._current_state = self._starting_state | |
self._rng = np.random.RandomState(self._seed) | |
return self._current_state | |
def p_and_r(self) -> Tuple[Array, Array]: | |
p = np.zeros((self.n_states, self.n_actions, self.n_states)) | |
p[self._starting_state][0][self._starting_state - 1] = 1.0 - self._noise_prob | |
p[self._starting_state][1][self._starting_state - 1] = self._noise_prob | |
p[self._starting_state][0][self._starting_state + 1] = self._noise_prob | |
p[self._starting_state][1][self._starting_state + 1] = 1.0 - self._noise_prob | |
for i in range(self._starting_state): | |
i1 = i - 1 if i > 0 else self._starting_state | |
p[i][0][i] = self._noise_prob | |
p[i][0][i1] = 1.0 - self._noise_prob | |
p[i][1][i] = 1.0 - self._noise_prob | |
p[i][1][i1] = self._noise_prob | |
for i in range(self._starting_state, self.n_states): | |
i1 = i + 1 if i < self.n_states - 1 else self._starting_state | |
p[i][0][i] = 1.0 - self._noise_prob | |
p[i][0][i1] = self._noise_prob | |
p[i][1][i] = self._noise_prob | |
p[i][1][i1] = 1.0 - self._noise_prob | |
r = np.zeros((self.n_states, self.n_actions)) | |
r[0][0] = 1.0 | |
r[self.n_states - 1][1] = 3.0 | |
return p, r | |
def n_optimal_states(self, pi: Array) -> int: | |
left = pi[: self._starting_state] | |
right = pi[self._starting_state :] | |
if len(pi.shape) == 1: # Deterministic | |
return np.sum(left == 0) + np.sum(right == 1) | |
elif len(pi.shape) == 2: | |
THRESHOLD = 1e-3 | |
return np.sum((left[:, 0] + THRESHOLD) > 1.0) + np.sum( | |
(right[:, 1] + THRESHOLD) > 1.0 | |
) | |
else: | |
raise ValueError(f"Unsupported shape: {pi.shape}") | |
def solve_discounted(mdp: TabularMDP, gamma: float, threshold: float) -> Array: | |
p, r = mdp.p_and_r() | |
v = np.zeros(p.shape[0]) | |
for n_iter in range(100000): | |
r_plus_gamma_pv = r + gamma * np.einsum("saS,S->sa", p, v) | |
v_next = r_plus_gamma_pv.max(axis=1) | |
if np.all(np.absolute(v_next - v) < threshold): | |
return v_next, r_plus_gamma_pv | |
v = v_next | |
raise RuntimeError("Value Iteration did not converge >_<") | |
def solve_undiscounted(mdp: TabularMDP, threshold: float) -> Array: | |
p, r = mdp.p_and_r() | |
v = np.zeros(p.shape[0]) | |
prev_diff = np.ones_like(v) * np.inf | |
for n_iter in range(100000): | |
r_plus_pv = r + np.einsum("saS,S->sa", p, v) | |
v_next = r_plus_pv.max(axis=1) | |
diff = v_next - v | |
if np.all(np.absolute(diff - prev_diff) < threshold): | |
return v_next, r_plus_pv | |
v = v_next | |
prev_diff = diff | |
raise RuntimeError("Value Iteration did not converge >_<") | |
def epsilon_greedy(rng: np.random.RandomState, q_value: Array, epsilon: float) -> int: | |
if rng.uniform() < epsilon: | |
return rng.choice(range(q_value.shape[0])) | |
else: | |
return np.argmax(q_value) | |
@dataclasses.dataclass(frozen=True) | |
class Result: | |
rewards: Array | |
n_optimal_states: Array | |
def print_summary(self, n_states: int) -> None: | |
import click | |
click.secho("=====Result Summary=====", bg="black", fg="white", bold=True) | |
n_rewards = self.rewards.shape[0] | |
print(f"Return (total) : {np.sum(self.rewards)}") | |
print(f"Return (second half) : {np.sum(self.rewards[n_rewards // 2:])}") | |
if 0 < len(self.n_optimal_states): | |
(optimal_steps,) = np.where(self.n_optimal_states == n_states) | |
if len(optimal_steps) == 0: | |
print("Not Optimal >_<") | |
else: | |
print(f"Became optimal after {optimal_steps[0]} steps") | |
def plot_result(plots: Iterable[Tuple[str, dict]]) -> None: | |
import pandas as pd | |
import seaborn as sns | |
from matplotlib import pyplot as plt | |
for name, data in plots: | |
if "." in name: | |
raise ValueError("Please pass the filename without file extension") | |
plot = sns.lineplot(data=pd.DataFrame(data), ax=plt.figure(name).add_subplot()) | |
plot.get_figure().savefig(name + ".pdf") | |
@dataclasses.dataclass(frozen=True) | |
class QLearningConfig: | |
alpha: float | |
gamma: float | |
epsilon_max: float | |
epsilon_min: float | |
normalized_update: bool | |
seed: int | |
@dataclasses.dataclass() | |
class QLearningAgent: | |
qvalue: Array | |
epsilon: float | |
def q_learning( | |
mdp: TabularMDP, | |
config: QLearningConfig, | |
n_steps: int, | |
) -> Tuple[Result, QLearningAgent]: | |
state = mdp.reset() | |
rng = np.random.RandomState(config.seed) | |
initital_q_value = rng.randn(mdp.n_states, mdp.n_actions) | |
agent = QLearningAgent(initital_q_value, config.epsilon_max) | |
epsilon_delta = (config.epsilon_max - config.epsilon_min) / n_steps | |
rewards = [] | |
n_optimal_states = [] | |
has_n_optimal_states = hasattr(mdp, "n_optimal_states") | |
for _ in range(n_steps): | |
action = epsilon_greedy(rng, agent.qvalue[state], agent.epsilon) | |
transition = mdp.step(action) | |
next_q_max = np.max(agent.qvalue[transition.state]) | |
td_error = ( | |
transition.reward + config.gamma * next_q_max - agent.qvalue[state, action] | |
) | |
# Update the agent | |
if config.normalized_update: | |
agent.qvalue[state, action] *= 1 - config.alpha | |
agent.qvalue[state, action] += config.alpha * td_error | |
agent.epsilon -= epsilon_delta | |
# Update the state | |
state = transition.state | |
rewards.append(transition.reward) | |
if has_n_optimal_states: | |
n_optimal_states.append(mdp.n_optimal_states(agent.qvalue.argmax(axis=1))) | |
result = Result(np.array(rewards), np.array(n_optimal_states)) | |
return result, agent | |
@dataclasses.dataclass(frozen=True) | |
class RLearningResult(Result): | |
rhos: Array | |
@dataclasses.dataclass(frozen=True) | |
class RLearningConfig: | |
alpha: float | |
beta: float | |
epsilon_max: float | |
epsilon_min: float | |
normalized_update: bool | |
singh_modification: bool | |
seed: int | |
@dataclasses.dataclass() | |
class RLearningAgent: | |
rvalue: Array | |
rho: float | |
epsilon: float | |
def r_learning( | |
mdp: TabularMDP, | |
config: RLearningConfig, | |
n_steps: int, | |
) -> Tuple[RLearningResult, RLearningAgent]: | |
state = mdp.reset() | |
rng = np.random.RandomState(config.seed) | |
initital_r_value = rng.randn(mdp.n_states, mdp.n_actions) | |
agent = RLearningAgent(initital_r_value, 0.0, config.epsilon_max) | |
epsilon_delta = (config.epsilon_max - config.epsilon_min) / n_steps | |
rewards = [] | |
n_optimal_states = [] | |
rhos = [] | |
has_n_optimal_states = hasattr(mdp, "n_optimal_states") | |
for _ in range(n_steps): | |
action = epsilon_greedy(rng, agent.rvalue[state], agent.epsilon) | |
transition = mdp.step(action) | |
next_r_max = np.max(agent.rvalue[transition.state]) | |
r_td = transition.reward + next_r_max - agent.rho | |
if config.singh_modification: | |
rho_td = transition.reward + next_r_max - agent.rvalue[state, action] | |
else: | |
rho_td = transition.reward + next_r_max - np.max(agent.rvalue[state]) | |
# Update the agent | |
if config.normalized_update: | |
agent.rvalue[state, action] *= 1 - config.alpha | |
agent.rho *= 1 - config.beta | |
agent.rvalue[state, action] += config.alpha * r_td | |
agent.rho += config.beta * rho_td | |
agent.epsilon -= epsilon_delta | |
# Update the state | |
state = transition.state | |
# Logging | |
rewards.append(transition.reward) | |
rhos.append(agent.rho) | |
if has_n_optimal_states: | |
n_optimal_states.append(mdp.n_optimal_states(agent.rvalue.argmax(axis=1))) | |
result = RLearningResult( | |
np.array(rewards), | |
np.array(n_optimal_states) if has_n_optimal_states else None, | |
np.array(rhos), | |
) | |
return result, agent | |
@dataclasses.dataclass(frozen=True) | |
class DiffQLearningConfig: | |
alpha: float | |
eta: float | |
epsilon_max: float | |
epsilon_min: float | |
normalized_update: bool | |
seed: int | |
@dataclasses.dataclass() | |
class DiffQLearningAgent: | |
qvalue: Array | |
rho: float | |
epsilon: float | |
DiffQLearningResult = RLearningResult | |
def diffq_learning( | |
mdp: TabularMDP, | |
config: DiffQLearningConfig, | |
n_steps: int, | |
) -> Tuple[DiffQLearningResult, DiffQLearningAgent]: | |
state = mdp.reset() | |
rng = np.random.RandomState(config.seed) | |
initital_r_value = rng.randn(mdp.n_states, mdp.n_actions) | |
agent = DiffQLearningAgent(initital_r_value, 0.0, config.epsilon_max) | |
epsilon_delta = (config.epsilon_max - config.epsilon_min) / n_steps | |
rewards = [] | |
n_optimal_states = [] | |
rhos = [] | |
has_n_optimal_states = hasattr(mdp, "n_optimal_states") | |
alpha_eta = config.alpha * config.eta | |
for _ in range(n_steps): | |
action = epsilon_greedy(rng, agent.qvalue[state], agent.epsilon) | |
transition = mdp.step(action) | |
next_q_max = np.max(agent.qvalue[transition.state]) | |
td_error = ( | |
transition.reward - agent.rho + next_q_max - agent.qvalue[state, action] | |
) | |
# Update the agent | |
if config.normalized_update: | |
agent.qvalue[state, action] *= 1 - config.alpha | |
agent.rho *= 1 - alpha_eta | |
agent.qvalue[state, action] += config.alpha * td_error | |
agent.rho += alpha_eta * td_error | |
agent.epsilon -= epsilon_delta | |
# Update the state | |
state = transition.state | |
# Logging | |
rewards.append(transition.reward) | |
rhos.append(agent.rho) | |
if has_n_optimal_states: | |
n_optimal_states.append(mdp.n_optimal_states(agent.qvalue.argmax(axis=1))) | |
result = RLearningResult( | |
np.array(rewards), | |
np.array(n_optimal_states) if has_n_optimal_states else None, | |
np.array(rhos), | |
) | |
return result, agent | |
if __name__ == "__main__": | |
import typer | |
MDPS = { | |
"one": OneCycleMDP, | |
"onecycle": OneCycleMDP, | |
"two": TwoCycleMDP, | |
"twocycle": TwoCycleMDP, | |
} | |
app = typer.Typer() | |
def normalized(default: bool) -> typer.Option: | |
return typer.Option( | |
default, "--normalize/--unnormalize", "-N/-UN", is_flag=True | |
) | |
@app.command() | |
def q( | |
alpha: float = 0.2, | |
gamma: float = 0.99, | |
epsilon_max: float = 0.4, | |
epsilon_min: float = 0.1, | |
seed: int = 0, | |
n_states: int = 100, | |
n_steps: int = 100000, | |
noise_prob: float = 0.1, | |
normalized_update: bool = normalized(True), | |
mdp: str = "onecycle", | |
plot: bool = typer.Option(False, is_flag=True), | |
) -> None: | |
config = QLearningConfig( | |
alpha, | |
gamma, | |
epsilon_max, | |
epsilon_min, | |
normalized_update, | |
seed, | |
) | |
mdp = MDPS[mdp](n_states, noise_prob, seed) | |
result, agent = q_learning(mdp, config, n_steps) | |
result.print_summary(mdp.n_states) | |
if plot: | |
data = { | |
"Num. optimal states": result.n_optimal_states, | |
} | |
plot_result([("q-learning-opt", data)]) | |
@app.command() | |
def r( | |
alpha: float = 0.2, | |
beta: float = 0.05, | |
epsilon_max: float = 0.4, | |
epsilon_min: float = 0.1, | |
seed: int = 0, | |
n_states: int = 100, | |
n_steps: int = 100000, | |
noise_prob: float = 0.1, | |
normalized_update: bool = normalized(True), | |
singh_modification: bool = typer.Option(False, "-SM", is_flag=True), | |
mdp: str = "onecycle", | |
plot: bool = typer.Option(False, is_flag=True), | |
) -> None: | |
config = RLearningConfig( | |
alpha, | |
beta, | |
epsilon_max, | |
epsilon_min, | |
normalized_update, | |
singh_modification, | |
seed, | |
) | |
mdp = MDPS[mdp](n_states, noise_prob, seed) | |
result, agent = r_learning(mdp, config, n_steps) | |
result.print_summary(mdp.n_states) | |
print(agent.rvalue.argmax(axis=1)) | |
if plot: | |
nopt = { | |
"Num. optimal states": result.n_optimal_states, | |
} | |
rho = {"ρ": result.rhos} | |
plot_result([("r-learning-opt", nopt), ("r-learning-ρ", rho)]) | |
@app.command() | |
def diffq( | |
alpha: float = 0.2, | |
eta: float = 0.25, | |
epsilon_max: float = 0.4, | |
epsilon_min: float = 0.1, | |
seed: int = 0, | |
n_states: int = 1000, | |
n_steps: int = 100000, | |
noise_prob: float = 0.1, | |
normalized_update: bool = normalized(False), | |
mdp: str = "onecycle", | |
plot: bool = typer.Option(False, is_flag=True), | |
) -> None: | |
config = DiffQLearningConfig( | |
alpha, | |
eta, | |
epsilon_max, | |
epsilon_min, | |
normalized_update, | |
seed, | |
) | |
mdp = MDPS[mdp](n_states, noise_prob, seed) | |
result, agent = diffq_learning(mdp, config, n_steps) | |
result.print_summary(mdp.n_states) | |
if plot: | |
nopt = { | |
"Num. optimal states": result.n_optimal_states, | |
} | |
rho = {"ρ": result.rhos} | |
plot_result([("diffq-learning-opt", nopt), ("diffq-learning-ρ", rho)]) | |
@app.command() | |
def solve( | |
discounted: bool = typer.Option(True, "--discounted/--undiscounted"), | |
gamma: float = 0.99, | |
threshold: float = 1e-4, | |
n_states: int = 1000, | |
noise_prob: float = 0.1, | |
) -> None: | |
mdp = OneCycleMDP(n_states, noise_prob) | |
if discounted: | |
_, q = solve_discounted(mdp, gamma, threshold) | |
else: | |
_, q = solve_undiscounted(mdp, threshold) | |
print(q) | |
print(q.argmax(axis=1)) | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment