/pystatemachine

State machines in Python using generators

Primary LanguagePythonBSD 2-Clause "Simplified" LicenseBSD-2-Clause

A state machine module based on generator functions

Introduction

Yet another state machines with generators module. It is intended to be a lightweight collection of useful functions to model state machines and can be dropped into your project's package.

I wanted something that would be easy to integrate in applications, imposing as little as possible of the modules's conventions on client code. I also wanted an implementation that supported hierarchical and sequential composition of state machines.

Why use generators? Because they are suspend-able functions, they retain state and support entry/exit actions without requiring an object oriented abstraction. An application is free to use this module's functions within classes but it does not (and should not) have to.

The state_machine_from_class() function can be used to create a state machine instance form a class encapsulating the state factory and state transition functions.

Get the source here.

Overview

State and state machines are generator functions. A state is not usually used directly but rather handled by a state machine like the one implemented in state_machine(). A state machine is a state that handles state transitions and dispatches execution to the currently active state. This way states can be nested.

The skeleton_state() below shows a skeleton implementation for a typical state. A state accepts 3 parameters: an application specific context, an unique identifier for the state in the transition table of the parent state machine and an event. All yield statements in a state must yield the same 3 parameters and update the 3 parameters with the value returned by the yield statement like this ctx, state_id_vec, evt = yield (ctx, state_id_vec, evt).

>>> def skeleton_state(ctx, state_id, evt): ... # code executed when entering the state ... try: ... while True: ... # return control to the parent state machine ... ctx, state_id_vec, evt = yield ctx, state_id_vec, evt ... # ... # code that executes every time an event is received ... # ... ... finally: ... # code executed when exiting the state ... pass

The state_machine() generator accepts an application specific context which will be passed to each state, a state factory function called each time an instance of a state is created (this occurs right before entering the state) and a transition function called to retrieve the unique ID of the next state given the current state ID and the event. The transition function is first called with a (state_id, event) tuple of (None, None) in order to retrieve the initial state ID. When the transition function returns None the state machine has reached its final state and stops.

run_sm() is used to run a state machine to completion. The optional callback function parameter is invoked for each event and the optional val parameter is useful for resuming execution of a state machine. The callback function must return a tuple containing the context, state ID and event parameters, the values returned can be different from the values it was passed. The callback function can raise StopIteration() to indicate that the run_sm() should exit. run_sm() returns the last valid context, state ID and event tuple.

state_machine_from_class() returns a state machine instance initialized using the state_factory and transition methods of the class.

Tutorial

Let's start by looking at a simple usage example. An example state is defined as follows:

>>> import smachine as smm >>> def state_ex2(ctx, state_id_vec, evt): ... print "Entering state:", state_id_vec[-1] ... try: ... while True: ... ctx, state_id_vec, evt = yield ctx, state_id_vec, evt ... print "Received event", evt, "while in", state_id_vec[-1] ... finally: ... print "Exiting state:", state_id_vec[-1]

The state factory function always returns an instance of state_ex2 and the transition function moves between states (identified by increasing integers) when the n event is received, stays in the current state when the x event is received and terminates when e is received

>>> s_f = lambda c,n,e: state_ex2(c,n,e) >>> def t_f(c, t): ... if t[0] is None: ... return 1 ... if t[1] == 'x': ... return t[0] ... elif t[1] == 'e': ... return None ... else: ... return t[0] + 1 ...

The state machine is created passing a null context

>>> sm = smm.state_machine(s_f, t_f)(None)

and then run using the xnxnxne sequence of events

>>> l = list(smm.iter_sm(sm, iter('xn'*3+'e'))) Entering state: 1 Received event x while in 1 Exiting state: 1 Entering state: 2 Received event x while in 2 Exiting state: 2 Entering state: 3 Received event x while in 3 Exiting state: 3 Entering state: 4 Exiting state: 4

Define a list of states and some events we want to occur in sequence. The test event does not effect the state, while up and down change the the state to the next one up or down respectively.

>>> states = ['freezing', 'cold', 'cool', 'warm', 'hot'] >>> event_list = ['up', 'test', 'up', 'up', 'down']

A state implementation that simply prints out the last event and current state IDs

>>> def state_ex1(ctx, state_id_vec, evt): ... while True: ... ctx, vec, evt = yield ctx, state_id_vec, evt ...

Wrap callback(), state_factory() and transition() in a class to show how to use state_machine_from_class()

>>> class StateMachineClassEx1(object): ... @staticmethod ... def callback(sm, val): ... # Consume the first event and make it the current event ... ctx, state_id_vec, evt = val ... if len(ctx) > 0: ... return ctx, state_id_vec, ctx.pop(0) ... else: ... raise StopIteration() ... ... @staticmethod ... def state_factory(ctx, state_id_vec, evt): ... # Return the same implementation for every state ... return state_ex1(ctx, state_id_vec, evt) ... ... @staticmethod ... def transition(ctx, t): ... state_id, evt = t ... if t == (None, None): ... # return the initial state ... idx = states.index('cool') ... elif evt == 'test': ... # the 'test' event does not cause a state change ... idx = states.index(state_id) ... else: ... # pick the next state form the list ... idx = states.index(state_id) ... if evt == 'up': ... idx = min(idx+1, len(states)-1) ... else: ... idx = max(idx-1, 0) ... print (state_id, evt), '->', states[idx] ... return states[idx] ...

The event_list is passed as 'context' for the state machine

>>> sm = smm.state_machine_from_class(StateMachineClassEx1)(event_list) >>> end_state = smm.run_sm(sm, StateMachineClassEx1.callback) (None, None) -> cool ('cool', 'up') -> warm ('warm', 'test') -> warm ('warm', 'up') -> hot ('hot', 'up') -> hot ('hot', 'down') -> warm >>> end_state ([], ('warm',), 'down')


>>> class SimpleSM1(object): ... @staticmethod ... def state(ctx, state_id_vec, evt): ... while True: ... ctx, state_id_vec, evt = yield ctx, state_id_vec, evt ... @staticmethod ... def state_factory(ctx, name, evt): ... # Return the same implementation for every state ... return SimpleSM1.state(ctx, name, evt) ... @staticmethod ... def transition(ctx, t): ... s, e = t ... tt = { ... (None, None): 's1', ... ('s1', 'n'): 's2', ... ('s2', 'n'): None, ... } ... return tt.get(t, 's1') ... >>> m1 = smm.state_machine_from_class(SimpleSM1) >>> m2 = smm.state_machine_from_class(SimpleSM1) >>> m3 = smm.state_machine_from_class(SimpleSM1) >>> tt = { ... (None, None): 'm1', ... ('m1', None): 'm2', ... ('m2', None): 'm3', ... ('m3', None): None, ... } >>> s = { ... 'm1': m1, ... 'm2': m2, ... 'm3': m3, ... } >>> s_f = lambda c,n,e: s[n[-1]](c, n, e) >>> t_f = lambda c,t: tt.get(t, t[0]) >>> sm = smm.state_machine(s_f, t_f)(None) >>> for val in smm.iter_sm(sm, iter('n'*20)): ... print (val[1], val[2]) ... (('m1', 's1'), None) (('m1', 's2'), 'n') (('m2', 's1'), None) (('m2', 's2'), 'n') (('m3', 's1'), None) (('m3', 's2'), 'n')

>>> tt = { ... (None, None): 'closed', ... ('closed', 'open'): 'opened', ... ('opened', 'close'): 'closed', ... ('closed', 'lock'): 'locked', ... ('locked', 'unlock'): 'closed', ... } >>> s_f = lambda c,n,e: state_ex1(c, n, e) >>> t_f = lambda c,t: tt.get(t, t[0]) >>> def print_transitions(iter): ... old_s = None ... while True: ... c, s, e = iter.next() ... print (old_s, e), '->', s[-1] ... old_s = s[-1] ... yield c, s, e ... >>> e = ['lock', 'open', 'unlock', 'open', 'close'] >>> sm = smm.state_machine(s_f, t_f)(None) >>> l = [val for val in print_transitions(smm.iter_sm(sm, iter(e)))] (None, None) -> closed ('closed', 'lock') -> locked ('locked', 'open') -> locked ('locked', 'unlock') -> closed ('closed', 'open') -> opened ('opened', 'close') -> closed

# Using run_sm()

>>> def cb(sm, val): ... ctx, state_id, evt = val ... print (ctx['last_state'], evt), '->', state_id[-1] ... ctx['last_state'] = state_id[-1] ... return ctx, state_id, ctx['evt_src'].next() ... >>> ctx = dict([('evt_src', iter(e)), ('last_state', None)]) >>> sm = smm.state_machine(s_f, t_f)(ctx) >>> val = smm.run_sm(sm, cb) (None, None) -> closed ('closed', 'lock') -> locked ('locked', 'open') -> locked ('locked', 'unlock') -> closed ('closed', 'open') -> opened ('opened', 'close') -> closed

# Hierarchical state machine example: pocket calculator

>>> import operator >>> import sys >>> def display(str_or_int): ... sys.stdout.write(' ' + str(str_or_int)) ... >>> class PocketCalcInnerSM(object): ... ... tt = { ... ('on', None): 'op_nd1', ... ('op_nd1', '/+-'): 'op_tor', ... ('op_tor', '0123456789.'): 'op_nd2', ... ('op_nd2', '/+-'): 'op_tor', ... ('op_nd2', '='): 'result', ... ('result', '/+-'): 'op_tor', ... ('result', '0123456789.'): 'op_nd1', ... } ... ... @classmethod ... def transition(cls, c, t): ... s_id = cls.tt.get(t, None) ... if s_id is None: ... for k,v in cls.tt.items(): ... if k[0] == t[0] and t[1] in k[1]: ... s_id = v ... if s_id is None: ... return t[0] ... return s_id ... ... @classmethod ... def state_factory(cls, c, n, e): ... return getattr(cls, n[-1])(c,n,e) ... ... @staticmethod ... def op_nd1(c, s, e): ... if e is not None: ... c['op_nd1'] = int(e) ... else: ... c['op_nd1'] = 0 ... while True: ... display(c['op_nd1']) ... c, s, e = yield c, s, e ... c['op_nd1'] = c['op_nd1']10 + int(e) ... ... @staticmethod ... def op_nd2(c, s, e): ... c['op_nd2'] = int(e) ... while True: ... display(c['op_nd2']) ... c, s, e = yield c, s, e ... c['op_nd2'] = c['op_nd2']10 + int(e) ... ... @staticmethod ... def op_tor(c, s, e): ... while True: ... if c['op_tor'] is not None: ... c['op_nd1'] = c['op_tor'](c['op_nd1'], c['op_nd2']) ... display(c['op_nd1']) ... display(e) ... if e == '+': ... c['op_tor'] = operator.add ... elif e == '-': ... c['op_tor'] = operator.sub ... elif e == '/': ... c['op_tor'] = operator.div ... elif e == '': ... c['op_tor'] = operator.mul ... else: ... raise StopIteration ... c, s, e = yield c, s, e ... ... @staticmethod ... def result(c, s, e): ... c['op_nd1'] = c['op_tor'](c['op_nd1'], c['op_nd2']) ... c['op_tor'] = None ... while True: ... display(c['op_nd1']) ... c, s, e = yield c, s, e ... ... >>> class PocketCalcOuterSM(object): ... ... tt = { ... (None, None): 'off', ... ('off', 'p-on'): 'reset', ... ('on', 'p-on'): 'reset', ... ('reset', None): 'on', ... ('on', 'p-off'): 'off', ... } ... ... @classmethod ... def transition(cls, c, t): ... return cls.tt.get(t, t[0]) ... ... @classmethod ... def state_factory(cls, c, n, e): ... if n[-1] == 'on': ... return smm.state_machine_from_class(PocketCalcInnerSM)(c,n,e) ... return getattr(cls, n[-1])(c,n,e) ... ... @staticmethod ... def off(c, s, e): ... while True: ... c, s, e = yield c, s, e ... ... @staticmethod ... def reset(c, s, e): ... c['op_nd1'] = 0 ... c['op_nd2'] = 0 ... c['op_tor'] = None ... return ... # yield statement is necessary to make this a generator ... c, s, e = yield c, s, e ... >>> ctx = dict() >>> pc_sm = smm.state_machine_from_class(PocketCalcOuterSM)(ctx) >>> e = ['p-on', '2', '+', '3', '=', '-', '1', '=', 'p-on', 'p-off'] >>> l = [(dict(val[0]),)+val[1:] for val in smm.iter_sm(pc_sm, iter(e))] 0 2 + 3 5 - 1 4 0 >>> e = ['p-on', '2', '', '3', '/', '2', '+', '1', '3', '=', 'p-off'] >>> l = [val for val in smm.iter_sm(pc_sm, iter(e), val = l[-1])] 0 2 3 6 / 2 3 + 1 13 16