Last active
October 26, 2022 05:58
-
-
Save rlkelly/b56e23e02564728510464c186a56cc50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import time | |
from uuid import uuid4 | |
DB = {} | |
def fish(): | |
time.sleep(random.random() * 3) | |
return 'CATCH_FISH' if random.random() > 0.5 else 'IS_NIGHT' | |
def weigh_fish(): | |
return random.random() > 10 | |
class RESTART: | |
def __init__(self, retries): | |
self.retries = retries | |
self.iterations = 0 | |
class Execute: | |
def __init__(self, state, func): | |
self.state = state | |
self.func = func | |
class Effect: | |
def __init__(self, name, func, retries=0): | |
self.name = name | |
self.func = func | |
self.uuid = self._assign_uuid() | |
self.retries = retries | |
self.iteration = 0 | |
def _assign_uuid(self): | |
self.uuid = str(uuid4()) | |
class Branch: | |
def __init__(self, cond, left, right): | |
self.cond = cond | |
self.left = left | |
self.right = right | |
class WaitFor: | |
def __init__(self, state, do): | |
self.state = state | |
self.do = do | |
class GOTO: | |
def __init__(self, state, index): | |
self.state = state | |
self.index = index | |
class Final: | |
def __init__(self, state): | |
self.state = state | |
class StateMachine: | |
steps = [] | |
state = None | |
wait_val = None | |
def __init_subclass__(cls, **kwargs): | |
def init_decorator(previous_init): | |
def new_init(self, *args, **kwargs): | |
previous_init(self, *args, **kwargs) | |
if type(self) == cls: | |
self.__post_init__() | |
return new_init | |
cls.__init__ = init_decorator(cls.__init__) | |
def __post_init__(self): | |
self.steps = self.process(self.steps) | |
def process(self, steps): | |
global DB | |
processed_steps = [] | |
for step in steps: | |
if isinstance(step, Effect): | |
DB[step.uuid] = None | |
processed_steps.append(step) | |
elif isinstance(step, Branch): | |
processed_steps.append( | |
Branch( | |
step.cond, | |
self.process(step.left), | |
self.process(step.right) | |
) | |
) | |
else: | |
processed_steps.append(step) | |
return processed_steps | |
def run(self): | |
while True: | |
response = self.execute() | |
print('run response', response) | |
if response is None: | |
continue | |
if response in ['FAILURE', 'WAITING', 'DONE']: | |
return response | |
if isinstance(response, Final): | |
# print final statee | |
print(response.state) | |
return response | |
if isinstance(response, WaitFor): | |
return response | |
def execute(self, steps=None): | |
while True: | |
for step in (steps or self.steps): | |
response = self.execute_step(step) | |
if response in ['DONE', 'RESTART', 'FAILURE', 'WAITING']: | |
return response | |
if isinstance(response, RESTART): | |
# for the sake of simplicity, this fully reboots the database state | |
DB = {} | |
if response.iterations == response.retries: | |
return 'FAILURE' | |
self.state = None | |
break | |
elif isinstance(response, Final): | |
return step | |
elif isinstance(response, WaitFor): | |
if self.wait_val != step.state: | |
return 'WAITING' | |
def execute_step(self, step): | |
global DB | |
print('execute', step, self.state) | |
if self.state in ['DONE', 'FAILURE']: | |
return self.state | |
if isinstance(step, Effect): | |
if step.uuid in DB and DB[step.uuid] is not None: | |
self.state = DB[step.uuid] | |
return self.state | |
else: | |
self.state = step.func() | |
step.iteration += 1 | |
DB[step.uuid] = self.state | |
return self.state | |
elif isinstance(step, Branch): | |
if step.cond(self.state): | |
return self.execute(step.left) | |
else: | |
return self.execute(step.right) | |
elif isinstance(step, Final): | |
self.state = 'DONE' | |
return step | |
elif isinstance(step, WaitFor): | |
# WAIT FOR | |
return step | |
elif isinstance(step, Execute): | |
step.func() | |
self.state = step.state | |
return step | |
elif isinstance(step, RESTART): | |
step.iterations += 1 | |
if step.iterations == step.retries: | |
self.state = 'FAILURE' | |
return 'FAILURE' | |
DB = {} | |
return 'RESTART' | |
class FishingStateMachine(StateMachine): | |
""" fish for 3 days """ | |
def __init__(self): | |
super().__init__() | |
self.steps = [ | |
Effect('FISH', fish, retries=3), | |
Branch( | |
cond=lambda x: x == 'IS_NIGHT', | |
left=[ | |
Execute('SLEEP', lambda: time.sleep(3)), | |
RESTART(retries=3), | |
], | |
right=[ | |
WaitFor('CATCH_FISH', weigh_fish), | |
Branch( | |
cond=lambda x: random.random() * 10 > 5, | |
left=[ | |
Final('EAT_FISH'), | |
], | |
right=[ | |
Final('GIVE_UP'), | |
], | |
), | |
], | |
), | |
] | |
if __name__ == '__main__': | |
fsm = FishingStateMachine() | |
response = fsm.run() | |
if response == 'WAITING': | |
# This is just to model the idea that it's waiting for an external | |
# effect to modify it. | |
print('waiting...') | |
time.sleep(1) | |
fsm.wait_val = 'CATCH_FISH' | |
fsm.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment