This may not be useful to many people, but this is a very simple finite state machine. It is defined as a class so that you can have multiple state machines running at the same time - all with different state/transition lists.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | #! /usr/bin/env python
""" Generic finite state machine class
Initialise the class with a list of tuples - or by adding transitions
Tony Flury - November 2012
Released under an MIT License - free to use so long as the author and other contributers are credited.
"""
class fsm(object):
""" A simple to use finite state machine class.
Allows definition of multiple states, condition functions from state to state and optional callbacks
"""
def __init__(self, states=[]):
self._states=states
self.currentState = None
def start(self,startState=None):
""" Start the finite state machine
"""
if not startState or not (startState in [x[0] for x in self._states]):
raise ValueError("Not a valid start state")
self.currentState = startState
def stop(self):
""" Stop the finite state machine
"""
# Bug fix 15 Dec 2012 - self.currentState should be reset, not startState - Identified by Holger Waldmann
self.currentState = None
def addTransition(self,fromState, toState, condition, callback=None):
""" Add a state transition to the list, order is irellevant, loops are undetected
Can only add a transition if the state machine isn't started.
"""
if not self.currentState:
raise ValueError("StateMachine already Started - cannot add new transitions")
# add a transition to the state table
self._states.append( (fromState, toState,condition, callback))
def event(self, value):
""" Trigger a transition - return a tuple (<new_state>, <changed>)
Raise an exception if no valid transition exists.
Callee needs to determine if the value will be consumed or re-used
"""
if not self.currentState:
raise ValueError("StateMachine not Started - cannot process event")
# get a list of transitions which are valid
self.nextStates = [ x for x in self._states\
if x[0] == self.currentState \
and (x[2]==True or (callable(x[2]) and x[2](value))) ]
if not self.nextStates:
raise ValueError("No Transition defined from state {0} with value '{1}'".format(self.currentState, value))
elif len(self.nextStates) > 1:
raise ValueError("Ambiguous transitions from state {0} with value '{1}' -> New states defined {2}".format(self.currentState, value, [x[0] for x in self.nextStates]))
else:
if len(self.nextStates[0]) == 4:
current, next, condition, callback = self.nextStates[0]
else:
current, next, condition = self.nextStates[0]
callback = None
self.currentState, changed = (next,True) \
if self.currentState != next else (next, False)
# Execute the callback if defined
if callable(callback):
callback(self, value)
return self.currentState, changed
def CurrentState(self):
""" Return the current State of the finite State machine
"""
return self.currentState
# -------------------------------------------------------------------------------------------------
# Example classes to demonstrate the use of the Finite State Machine Class
# They implement a simple lexical tokeniser.
# These classes are not neccesary for the FSM class to work.
# -------------------------------------------------------------------------------------------------
# Simple storage object for each token
class token(object):
def __init__(self, type):
self.tokenType = type
self.tokenText = ""
def addCharacter(self, char):
self.tokenText += char
def __repr__(self):
return "{0}<{1}>".format(self.tokenType, self.tokenText)
# Token list object - demonstrating the definition of state machine callbacks
class tokenList(object):
def __init__(self):
self.tokenList = []
self.currentToken = None
def StartToken(self, fss, value):
self.currentToken = token(fss.CurrentState())
self.currentToken.addCharacter(value)
def addCharacter(self, fss, value):
self.currentToken.addCharacter(value)
def EndToken(self, fss, value):
self.tokenList.append(self.currentToken)
self.currentToken = None
# Example code - showing population of the state machine in the constructor
# the Machine could also be constructed by multiple calls to addTransition method
# Example code is a simple tokeniser
# Machine transitions back to the Start state whenever the end of a token is detected
if __name__ == "__main__":
t = tokenList()
fs = fsm( [ ("Start","Start",lambda x: x.isspace() ),
("Start","Identifier",str.isalpha, t.StartToken ),
("Identifier","Identifier", str.isalnum, t.addCharacter ),
("Identifier","Start",lambda x: not x.isalnum(), t.EndToken ),
("Start","Operator", lambda x: x in "=+*/-()", t.StartToken ),
("Operator","Start", True, t.EndToken),
("Start","Number",str.isdigit, t.StartToken ),
("Number","Number",lambda x: x.isdigit() or x == ".", t.addCharacter ),
("Number","Start",lambda x: not x.isdigit() and x != ".", t.EndToken ),
("Start","StartQuote",lambda x: x == "\'"),
("StartQuote","String", lambda x: x != "\'", t.StartToken),
("String","String",lambda x: x != "\'", t.addCharacter ),
("String","EndQuote", lambda x: x == "\'", t.EndToken ),
("EndQuote","Start", True ) ] )
fs.start("Start")
a = " x123=MyString+123.65-'hello'*value"
c = 0
while c < len(a):
ret = fs.event(a[c])
# Make sure a transition back to start (from something else) does not consume the character.
if ret[0] != "Start" or (ret[0] == "Start" and ret[1] == False):
c += 1
ret = fs.event("")
print t.tokenList
|
http://en.wikipedia.org/wiki/Finite-state_machine
The Finite State Machine class keeps track of the current state, and the list of valid state transitions.
You define each transition by specifying :
- FromState - the starting state for this transition
- ToState - the end state for this transition
- condition - a callable which when it returns True means this transition is valid
- callback - an optional callable function which is invoked when this transition is executed.
There is no validation on these values, and it is perfectly legal to build a state transition where FromState and ToState are the same state, or to build a State machine with no end state - both of these are illustrated in the example.
State names are case sensitive.
The key part of the Finite State Machine is the event method - which takes a value (of any type), and tries to find a single valid transition in the defined transition list -i.e. one which has a FromState equal to the current state, and where the condition callable is invoked with the value as an argument will return True.
Generally the condition for a transition is a callable - i.e. a predefined function or method, or as illustrated a lambda function. The condition callable is passed the value as a single argument. :
condition( value )
It is also possible to define the condition for a transition as True (i.e. not a callable) - this is illustrated in the example code - in this case the transition always happens regardless of the value passed to the event method. Defining condition in this way has it's uses - see in the EndQuote to Start state tranistion in the exmaple.
The callback can be defined which is invoked on each transition - callback has to be defined with the following function prototype :
callback( fsm, value )
Where fsm is the finite state machine instance invoking the callback, and value is the same value passed to the fsm.event method. The callback function is not expected to return anything. In the example code the callbacks are defined as methods on a instance of the TokenList class, but they could be just as easily defined as simple functions.
The event method will raise an exception if there is no valid transition for the value provided, or if the value could result in more than one transition.
The event method returns a tuple containing the new state and an indication if the state has changed - again useful if you need to resubmit the same event.
There is some simple protection built into the machine - for instance you can't modify the machine once it has started.
By using data to define the Finite State Machine it avoids the need to hard code the details. Logic errors are still possible - for instance - spelling/typing mistakes can easily result in a sate machine which is broken - i.e. states which are impossible to get to - or leave. Care should be taken when defining the states.
The fsm class is not currently re-entrant (i.e. don't call any fss method from the callback function) or be shared between threads - although there is no reason each thread couldn't have a fss instance to itself.
Edited 15 Dec : Corrected a bug in the fsm.stop method
I do like the idea of using callables to define the conditions.
IMHO line 27 should read:
Holger : well spotted. INow corrected