Last active
May 23, 2022 07:55
-
-
Save jxskiss/01816eec9a2b64bae341f4d07f58646e to your computer and use it in GitHub Desktop.
Use "fysom" state machine as global machine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import models | |
from fysom_singleton import * | |
class ItemStatus(object): | |
NEW = 'new' | |
NEED_INFO = 'need_info' | |
REVIEWING = 'reviewing' | |
REDOING = 'redoing' | |
CONFLICT = 'conflict' | |
VERIFIED = 'verified' | |
DELETED = 'deleted' | |
SM_STATES = [ | |
NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED, DELETED | |
] | |
SM_TRANSITIONS = [ | |
# trigger, source, destination | |
['sm_prepare_new', NEW, NEED_INFO], | |
{ | |
'trigger': 'sm_commit_review', | |
'source': NEED_INFO, | |
'dest': REVIEWING, | |
'conditions': [ | |
{'true': 'check_review_ready'}, | |
], | |
}, | |
{ | |
'trigger': 'sm_done_verified', | |
'source': [REVIEWING, REDOING], | |
'dest': VERIFIED, | |
'conditions': [ | |
{'true': 'check_required_fields'}, | |
{'true': 'check_barcodes_valid'}, | |
{'true': 'check_no_conflict', 'else': CONFLICT}, | |
], | |
}, | |
['sm_mark_conflict', [REVIEWING, REDOING], CONFLICT], | |
['sm_revert_verified', [VERIFIED, CONFLICT], REDOING], | |
['sm_require_info', [REVIEWING, REDOING], NEED_INFO], | |
{ | |
'trigger': 'sm_mark_deleted', | |
'source': [ | |
NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED | |
], | |
'dest': DELETED | |
}, | |
['sm_revert_deleted', DELETED, REDOING], | |
{ | |
'trigger': 'sm_update', | |
'source': [NEW, NEED_INFO, REVIEWING, REDOING, VERIFIED], | |
'dest': '=', | |
} | |
] | |
class ItemMachineMixin(object): | |
sm = StateMachine( | |
state_field='status', | |
states=ItemStatus.SM_STATES, | |
transitions=ItemStatus.SM_TRANSITIONS, | |
) | |
# def __getattribute__(self, item): | |
# try: | |
# return super(ItemMachineMixin, self).__getattribute__(item) | |
# except AttributeError: | |
# # proxy transition calling to state machine | |
# if item.startswith('sm_'): | |
# return partial(getattr(self.sm, item), self) | |
# raise | |
def check_review_ready(self): | |
pass | |
def on_enter_verified(self, event): | |
pass | |
# other conditions and callbacks | |
# ... | |
class Item(ItemMachineMixin, models.Model): | |
status = models.CharField(max_length=16, default=ItemStatus.NEW) | |
# many other fields | |
# using the state machine some where | |
obj = Item() | |
obj.sm.sm_prepare_new(obj) | |
# the `obj.sm.sm_prepaare_new(obj)` looks ugly | |
# by overriding the `__getattribute__`` method of ItemMachineMixin class | |
# demonstrated above, it can be used like this: | |
obj.sm_prepare_new() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf-8 -*- | |
# | |
# fysom - pYthOn Finite State Machine - this is a port of Jake | |
# Gordon's javascript-state-machine to python | |
# https://github.com/jakesgordon/javascript-state-machine | |
# | |
# Modified from "fysom" by wsh <[email protected]> | |
# | |
""" | |
Modified from https://github.com/mriehl/fysom | |
Main difference with the original fysom: | |
- the StateMachine class in this module is targeted to be used as a | |
singleton, the object being processed is passed to related methods | |
as an argument | |
- customized for django model integration | |
- this module has less feature support than the original fysom: | |
* no initial state and final state support | |
* no callbacks support | |
- this module implements conditions and conditional transitions support | |
NOTE: this module is not heavily tested, especially for edge cases. | |
""" | |
class StateMachineError(Exception): | |
pass | |
class TransitionInvalidError(StateMachineError): | |
pass | |
class TransitionCanceledError(StateMachineError): | |
pass | |
class StateMachine(object): | |
""" | |
Conditions and callbacks execution order: | |
conditions | |
on_before_<event> | |
on_exit_<state> | |
<<STATE CHANGE>> | |
on_change_state | |
on_enter_<state> | |
on_after_<event> | |
When dest state is same with source state: | |
conditions | |
on_before_<event> | |
on_reenter_<state> | |
on_after_<event> | |
""" | |
WILDCARD = '*' | |
SAME_DEST = '=' | |
def __init__(self, state_field, states, transitions): | |
self.state_field = state_field | |
# events registry => | |
# { | |
# event_name: { | |
# "source": set(), | |
# "dest": dest, | |
# "conditions": [ | |
# {"true/false": "condition_method"}, | |
# {"true/false": "condition_method", "dest": conditional_dest}, | |
# ], | |
# }, | |
# } | |
self._map = {} | |
self.events = [] | |
self.states = states | |
cfg = { | |
'transitions': transitions, | |
} | |
self._build_machine(cfg) | |
def is_state(self, obj, state): | |
return getattr(obj, self.state_field) == state | |
def can(self, obj, event): | |
return ( | |
event in self._map and | |
((getattr(obj, self.state_field) in self._map[event]['source']) or | |
self.WILDCARD in self._map[event]['source']) | |
) | |
def cannot(self, obj, event): | |
return not self.can(obj, event) | |
def _build_machine(self, cfg): | |
transitions = cfg.get('transitions', []) | |
def _add(trans): | |
event = trans['trigger'] | |
if event in self._map: | |
raise StateMachineError( | |
'Improperly configured transitions, event %s ' | |
'already registered' % event) | |
src = trans['source'] | |
if src == self.WILDCARD: | |
src = [self.WILDCARD] | |
elif self._is_base_string(src): | |
src = [src] | |
ev = {'source': set(src), 'dest': trans['dest']} | |
conditions = trans.get('conditions') | |
if conditions: | |
ev['conditions'] = conditions | |
self._map[event] = ev | |
# Construct all transition handlers | |
for trans in transitions: | |
if isinstance(trans, list): | |
trans = {'trigger': trans[0], | |
'source': trans[1], 'dest': trans[2]} | |
_add(trans) | |
event = trans['trigger'] | |
self.events.append(event) | |
setattr(self, event, self._build_event(event)) | |
def _build_event(self, event): | |
""" | |
For every event in the state machine, prepares the event handler. | |
""" | |
def fn(obj, *args, **kwargs): | |
current_state = getattr(obj, self.state_field) | |
# Check if this event can be triggered in the current state. | |
if not self.can(obj, event): | |
raise TransitionInvalidError( | |
'event %s inappropriate in current state %s' | |
% (event, current_state)) | |
# On event occurrence, source will always be the current state. | |
src = current_state | |
# dest may change during checking conditions | |
dst = self._map[event]['dest'] | |
if dst == self.SAME_DEST: | |
dst = src | |
# Check transition conditions first. | |
for c in self._map[event].get('conditions', ()): | |
target = 'true' if 'true' in c else 'false' | |
_c_r = self._check_condition(obj, target, c[target]) | |
if not _c_r: | |
if 'else' in c: | |
dst = c['else'] | |
break | |
else: | |
raise TransitionCanceledError( | |
'Cannot trigger event {0} because the {1} ' | |
'condition returns False'.format( | |
event, c[target]) | |
) | |
# Prepares the object with all the meta data to be passed to | |
# callbacks. | |
e = self._event_obj() | |
e.fsm, e.obj, e.event, e.src, e.dst = self, obj, event, src, dst | |
e.args, e.kwargs = args, kwargs | |
# used to share object saving status between callbacks | |
e.obj_has_saved = False | |
# Try to trigger the before event, unless it gets canceled. | |
if self._before_event(obj, e) is False: | |
raise TransitionCanceledError( | |
'Cannot trigger event {0} because the on_before_{0} ' | |
'handler returns False'.format(event) | |
) | |
# Wraps the activities that must constitute a single successful | |
# transaction. | |
if src != dst: | |
def _trans(): | |
delattr(obj, '_sm_transition') | |
setattr(obj, self.state_field, dst) | |
self._change_state(obj, e) | |
self._enter_state(obj, e) | |
self._after_event(obj, e) | |
obj._sm_transition = _trans | |
# Hook to perform asynchronous transition | |
if self._exit_state(obj, e) is not False: | |
obj._sm_transition() | |
else: | |
self._reenter_state(obj, e) | |
self._after_event(obj, e) | |
fn.__name__ = str(event) | |
fn.__doc__ = ( | |
"Event handler for an {event} event. This event can be " | |
"fired if the machine is in {states} states.".format( | |
event=event, states=self._map[event].keys())) | |
return fn | |
@staticmethod | |
def _check_condition(obj, target, func): | |
if not hasattr(obj, func): | |
raise AttributeError("obj %s hasn't method %s" % (obj, func)) | |
r = getattr(obj, func)() | |
if target == 'true': | |
return r is True | |
else: # false | |
return r is False | |
@staticmethod | |
def _before_event(obj, ev): | |
fn_name = 'on_before_' + ev.event | |
if hasattr(obj, fn_name): | |
return getattr(obj, fn_name)(ev) | |
@staticmethod | |
def _after_event(obj, ev): | |
fn_name = 'on_after_' + ev.event | |
if hasattr(obj, fn_name): | |
return getattr(obj, fn_name)(ev) | |
@staticmethod | |
def _enter_state(obj, ev): | |
fn_name = 'on_enter_' + ev.dst | |
if hasattr(obj, fn_name): | |
return getattr(obj, fn_name)(ev) | |
@staticmethod | |
def _exit_state(obj, ev): | |
fn_name = 'on_exit_' + ev.src | |
if hasattr(obj, fn_name): | |
return getattr(obj, fn_name)(ev) | |
@staticmethod | |
def _reenter_state(obj, ev): | |
fn_name = 'on_reenter_' + ev.dst | |
if hasattr(obj, fn_name): | |
return getattr(obj, fn_name)(ev) | |
@staticmethod | |
def _change_state(obj, ev): | |
fn_name = 'on_change_state' | |
if hasattr(obj, fn_name): | |
return getattr(obj, fn_name)(ev) | |
def trigger(self, obj, event, *args, **kwargs): | |
""" | |
Triggers the given event. | |
The event can be triggered by calling the event handler directly, | |
for ex: fsm.eat(), but this method will come in handy if the event is | |
determined dynamically and you have the event name to trigger as | |
a string. | |
""" | |
if not hasattr(self, event): | |
raise StateMachineError( | |
"There isn't any event registered as %s" % event) | |
return getattr(self, event)(obj, event, *args, **kwargs) | |
@staticmethod | |
def _is_base_string(obj): | |
""" | |
Returns if the object is an instance of basestring. | |
""" | |
return isinstance(obj, str) | |
class _event_obj(object): | |
""" | |
Event object. | |
Attributes: | |
fsm, obj, event, src, dst, args, kwargs, obj_has_saved | |
""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment