Welcome, guest | Sign In | My Account | Store | Cart
#! /usr/bin/env python

""" Generic finite state machine class
    Initialise the class with a list of tuples - or by adding transitions
    Tony Flury - November 2012
    Released under an MIT License - free to use so long as the author and other contributers are credited.
"""

class fsm(object):
    """ A simple to use finite state machine class.
        Allows definition of multiple states, condition functions from state to state and optional callbacks
    """
    def __init__(self, states=[]):
        self._states=states
        self.currentState = None

    def start(self,startState=None):
        """ Start the finite state machine
        """
        if not startState or not (startState in [x[0] for x in self._states]):
            raise ValueError("Not a valid start state")
        self.currentState = startState

    def stop(self):
        """ Stop the finite state machine
        """
        # Bug fix 15 Dec 2012 - self.currentState should be reset, not startState - Identified by Holger Waldmann
        self.currentState = None
    
    def addTransition(self,fromState, toState, condition, callback=None):
        """ Add a state transition to the list, order is irellevant, loops are undetected 
            Can only add a transition if the state machine isn't started.
        """
        if not self.currentState:
            raise ValueError("StateMachine already Started - cannot add new transitions")

        # add a transition to the state table
        self._states.append( (fromState, toState,condition, callback))

    def event(self, value):
        """ Trigger a transition - return a tuple (<new_state>, <changed>)
            Raise an exception if no valid transition exists.
            Callee needs to determine if the value will be consumed or re-used
        """
        if not self.currentState:
            raise ValueError("StateMachine not Started - cannot process event")

        # get a list of transitions which are valid       
        self.nextStates = [ x for x in self._states\
                            if x[0] == self.currentState \
                            and (x[2]==True or (callable(x[2]) and x[2](value))) ] 

        if not self.nextStates: 
            raise ValueError("No Transition defined from state {0} with value '{1}'".format(self.currentState, value))
        elif len(self.nextStates) > 1:
            raise ValueError("Ambiguous transitions from state {0} with value '{1}' ->  New states defined {2}".format(self.currentState, value, [x[0] for x in self.nextStates]))
        else:
            if len(self.nextStates[0]) == 4:
                current, next, condition, callback = self.nextStates[0]
            else:
                current, next, condition = self.nextStates[0]
                callback = None

            self.currentState, changed = (next,True) \
                    if self.currentState != next else (next, False)
            
            # Execute the callback if defined
            if callable(callback):
                callback(self, value)

            return self.currentState, changed

    def CurrentState(self):
        """ Return the current State of the finite State machine
        """
        return self.currentState


# -------------------------------------------------------------------------------------------------
# Example classes to demonstrate the use of the Finite State Machine Class
# They implement a simple lexical tokeniser.
# These classes are not neccesary for the FSM class to work.
# -------------------------------------------------------------------------------------------------

# Simple storage object for each token
class token(object):
    def __init__(self, type):
        self.tokenType = type
        self.tokenText = ""
    
    def addCharacter(self, char):
        self.tokenText += char

    def __repr__(self):
        return "{0}<{1}>".format(self.tokenType, self.tokenText)


# Token list object - demonstrating the definition of state machine callbacks
class tokenList(object):
    def __init__(self):
        self.tokenList = []
        self.currentToken = None
        
    def StartToken(self, fss, value):
        self.currentToken = token(fss.CurrentState())
        self.currentToken.addCharacter(value)

    def addCharacter(self, fss, value):
        self.currentToken.addCharacter(value)

    def EndToken(self, fss, value):
        self.tokenList.append(self.currentToken)
        self.currentToken = None


# Example code - showing population of the state machine in the constructor
# the Machine could also be constructed by multiple calls to addTransition method
# Example code is a simple tokeniser 
# Machine transitions back to the Start state whenever the end of a token is detected
if __name__ == "__main__":
    t = tokenList()

    fs = fsm( [ ("Start","Start",lambda x: x.isspace() ),
                ("Start","Identifier",str.isalpha, t.StartToken ),
                ("Identifier","Identifier", str.isalnum, t.addCharacter ),
                ("Identifier","Start",lambda x: not x.isalnum(), t.EndToken ),
                ("Start","Operator", lambda x: x in "=+*/-()", t.StartToken ),
                ("Operator","Start", True, t.EndToken),
                ("Start","Number",str.isdigit, t.StartToken ),
                ("Number","Number",lambda x: x.isdigit() or x == ".", t.addCharacter ),
                ("Number","Start",lambda x: not x.isdigit() and x != ".", t.EndToken ),
                ("Start","StartQuote",lambda x: x == "\'"),
                ("StartQuote","String", lambda x: x != "\'", t.StartToken),
                ("String","String",lambda x: x != "\'", t.addCharacter ),
                ("String","EndQuote", lambda x: x == "\'", t.EndToken ),
                ("EndQuote","Start", True ) ] )

    fs.start("Start")

    a = "    x123=MyString+123.65-'hello'*value"
    c = 0 
    while c < len(a):
        ret = fs.event(a[c])
        # Make sure a transition back to start (from something else) does not consume the character.
        if ret[0] != "Start" or (ret[0] == "Start" and ret[1] == False):
            c += 1
    ret = fs.event("")

    print t.tokenList

Diff to Previous Revision

--- revision 3 2012-12-15 19:22:28
+++ revision 4 2012-12-15 19:24:18
@@ -24,7 +24,7 @@
     def stop(self):
         """ Stop the finite state machine
         """
-        # Bug fix 15 Dec 2012 - self.currentState should be reset, not startState
+        # Bug fix 15 Dec 2012 - self.currentState should be reset, not startState - Identified by Holger Waldmann
         self.currentState = None
     
     def addTransition(self,fromState, toState, condition, callback=None):

History