General state machine mechanism plus a specialized version, LineStateMachine, for processing text files based by using regular expressions to determine state transitions.
| """
LineStateMachine.py
General state machine mechanism plus a specialized version,
LineStateMachine, for processing text files based on regular expression
line matching.
Implementing a specific LineStateMachine
========================================
* Analyze states necessary for state machine
* Create states as subclasses of LineState.
- Set ID for state.
- Determine regular expression that signal transitions.
- Set TRANSITIONS list for list of regexes and corresponding next states.
- Order TRANSITIONS so that more general regexes are tested later, e.g.
testing (.*) first will prevent any other transitions from being tested.
- Write handler to extract info cargo match_object for current line,
then call LineState.handle() to continue processing text.
* Subclass LineStateCargo to contain information to be shared and accumulated
between states over course of text processing.
* Subclass LineStateMachine.
- Add states to initializer.
- Start LineStateMachine by calling run() with start state and cargo
containing text file pointer.
See ReportStateMachine example below.
Extensively rewritten from sample code in "Text Processing in Python" by
David Mertz, pp. 274-280, which can be found at http://gnosis.cx/TPiP/chap4.txt.
Jack Trainor 2009
"""
import string
import re
# ======================================================================
# StateMachine layer
# ======================================================================
END_STATE_ID = "END"
ERROR_STATE_ID = "ERROR"
NONE_STATE_ID = ""
class Cargo(object):
""" Instances of Cargo and its subclasses contain information to be shared
and accumulated between states and returned to state machine caller. """
def __init__(self):
self.error_msg = ""
class State(object):
""" States are instances based on class variables and a handler function.
The state handler runs until it hits transition to the next state. """
ID = NONE_STATE_ID
TRANSITIONS = []
def handle(self, cargo):
return NONE_STATE_ID, cargo
class StateMachine(object):
""" Minimal state machine that runs based on state ids and state dictionary. """
def __init__(self):
self.states = {}
def add_state(self, state):
self.states[state.ID] = state
def run(self, start_state_id, cargo):
state_id = start_state_id
while state_id:
state = self.states.get(state_id, None)
if state:
state_id, cargo = state.handle(cargo)
else:
raise RuntimeError, "%s state does not exist." % state_id
return cargo
# ======================================================================
# LineStateMachine layer
# ======================================================================
class LineStateCargo(Cargo):
""" Cargo specific to LineState's. """
def __init__(self, fp):
Cargo.__init__(self)
self.fp = fp
# match_object is saved for next state to extract text information.
self.match_object = None
# buffer are unmatched lines accumulated in state for possible use.
self.buffer = []
class LineStateTransition(object):
def __init__(self, match_re, next_state_id):
self.match_re = match_re
self.next_state_id = next_state_id
class LineState(State):
""" A LineState reads lines until the file ends or the LineState matches
a compiled regex expression in its transition list, then it returns the
next state id of that transition with the current cargo. Lines that
don't match a transition are accumulated into cargo.buffer to be used by
the current state or the next state. """
ID = NONE_STATE_ID
TRANSITIONS = []
def handle(self, cargo):
cargo.buffer = []
next_state_id = NONE_STATE_ID
while not next_state_id:
line = cargo.fp.readline()
if line:
for transition in self.TRANSITIONS:
cargo.match_object = transition.match_re.match(line)
if cargo.match_object:
next_state_id = transition.next_state_id
break
else:
next_state_id = END_STATE_ID
if not next_state_id:
cargo.buffer.append(line)
return next_state_id, cargo
class EndState(State):
ID = END_STATE_ID
def handle(self, cargo):
cargo.fp.close()
print "Normal termination"
return NONE_STATE_ID, cargo
class ErrorState(State):
ID = ERROR_STATE_ID
def handle(self, cargo):
cargo.fp.close()
print "Error:", cargo.error_msg
return NONE_STATE_ID, cargo
class LineStateMachine(StateMachine):
def __init__(self):
StateMachine.__init__(self)
self.add_state(EndState())
self.add_state(ErrorState())
# ======================================================================
# ReportStateMachine layer
# ======================================================================
START_STATE_ID = "START"
COMPANY_STATE_ID = "COMPANY"
ORDER_STATE_ID = "ORDER"
COMMENT_START_STATE_ID = "COMMENT_START"
COMMENT_END_STATE_ID = "COMMENT_END"
COMPANY_RE = re.compile(r"^>> (.*)\s*$")
ORDER_RE = re.compile(r"^\s*(\w+)\s+(\d+)([Kk]*).*$")
COMMENT_START_RE = re.compile(r"^\*(.*)$")
COMMENT_END_RE = re.compile(r"^(.*)\*$")
class ReportStateCargo(LineStateCargo):
def __init__(self, fp):
LineStateCargo.__init__(self, fp)
self.company = ""
self.amount = 0
class StartState(LineState):
ID = START_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
return LineState.handle(self, cargo)
class CompanyState(LineState):
ID = COMPANY_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(ORDER_RE, ORDER_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
cargo.company = cargo.match_object.group(1)
cargo.amount = 0
return LineState.handle(self, cargo)
class OrderState(LineState):
ID = ORDER_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(ORDER_RE, ORDER_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
product = cargo.match_object.group(1)
quantity = get_quantity(cargo.match_object.group(2) + cargo.match_object.group(3))
price = get_product_price(cargo.company, product)
cargo.amount += quantity * price
next_state_id, cargo = LineState.handle(self, cargo)
if next_state_id != ORDER_STATE_ID:
print_invoice(cargo.company, cargo.amount)
return next_state_id, cargo
class CommentStartState(LineState):
ID = COMMENT_START_STATE_ID
TRANSITIONS = [
LineStateTransition(COMMENT_END_RE, COMMENT_END_STATE_ID)
]
def handle(self, cargo):
comment_line = cargo.match_object.group(1)
if comment_line and comment_line[-1] == "*":
next_state_id = COMMENT_END_STATE_ID
else:
next_state_id, cargo = LineState.handle(self, cargo)
return next_state_id, cargo
class CommentEndState(LineState):
ID = COMMENT_END_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(ORDER_RE, ORDER_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
comment_line = cargo.match_object.group(1)
return LineState.handle(self, cargo)
class ReportStateMachine(LineStateMachine):
def __init__(self):
LineStateMachine.__init__(self)
self.add_state(StartState())
self.add_state(CompanyState())
self.add_state(OrderState())
self.add_state(CommentStartState())
self.add_state(CommentEndState())
# ======================================================================
# Report utility functions
# ======================================================================
# Discount consists of dollar requirement and a percentage reduction.
# Each buyer can have an ascending series of discounts, the highest
# one applicable to a month is used.
discount_schedules = {
"STANDARD" : [(5000,10),(10000,20),(15000,30),(20000,40)],
"ACME" : [(1000,10),(5000,15),(10000,30),(20000,40)],
"MEGAMART" : [(2000,10),(5000,20),(10000,25),(30000,50)],
"BAGOBOLTS" : [(2500,10),(5000,15),(10000,25),(30000,50)],
}
item_prices = {
"STANDARD" : {'widgets':1.0, 'whatzits':0.9, 'doodads':1.1,
'dingdongs':1.3, 'flazs':0.7},
"ACME" : {'widgets':0.9, 'whatzits':0.9, 'doodads':1.0,
'dingdongs':0.9, 'flazs':0.6},
"MEGAMART" : {'widgets':1.0, 'whatzits':0.8, 'doodads':1.0,
'dingdongs':1.2, 'flazs':0.7},
"BAGOBOLTS" : {'widgets':0.8, 'whatzits':0.9, 'doodads':1.1,
'dingdongs':1.3, 'flazs':0.5},
}
discount_types = {
"STANDARD" : "standard",
"ACME" : "negotiated",
"MEGAMART" : "negotiated"
}
def get_company_id(company):
if company.upper().find("ACME") >= 0:
return "ACME"
elif company.upper().find("MEGAMART") >= 0:
return "MEGAMART"
else:
return "STANDARD"
def get_discount_schedule(company):
return discount_schedules.get(get_company_id(company), discount_schedules["STANDARD"])
def get_product_price(company, product):
prices = item_prices.get(get_company_id(company), item_prices["STANDARD"])
return prices[product]
def get_discount_type(company):
return discount_types.get(get_company_id(company), discount_types["STANDARD"])
def get_discounted_amount(company, amount):
multiplier = 1.0
for threshhold, percent in get_discount_schedule(company):
if amount >= threshhold: multiplier = 1 - float(percent)/100
return amount * multiplier
def get_quantity(quantity):
quantity = string.replace(string.upper(quantity),'K','000')
quantity = int(quantity)
return quantity
def print_invoice(company, amount):
print "Company name:", company, "(%s discounts)" % get_discount_type(company)
print "Invoice total: $", get_discounted_amount(company, amount), '\n'
# ======================================================================
# ReportStateMachine test
# ======================================================================
REPORT = """MONTHLY REPORT -- April 2002
===================================================================
Rules:
- Each buyer has price schedule for each item (func of quantity).
- Each buyer has a discount schedule based on dollar totals.
- Discounts are per-order (i.e. contiguous block)
- Buyer listing starts with line containing ">>", then buyer name.
- Item quantities have name-whitespace-number, one per line.
- Comment sections begin with line starting with an asterisk,
and ends with first line that ends with an asterisk.
>> Acme Purchasing
widgets 100
whatzits 1000
doodads 5000
dingdongs 20
* Note to Donald: The best contact for Acme is Debbie Franklin, at
* 413-555-0001. Fallback is Sue Fong (call switchboard). *
>> Megamart
doodads 10k
whatzits 5k
>> Fly-by-Night Sellers
widgets 500
whatzits 4
flazs 1000
* Note to Harry: Have Sales contact FbN for negotiations *
*
Known buyers:
>> Acme
>> Megamart
>> Standard (default discounts)
*
*** LATE ADDITIONS ***
>> Acme Purchasing
widgets 500 (rush shipment)**
"""
def test():
import StringIO
fp = StringIO.StringIO(REPORT)
cargo = ReportStateMachine().run(START_STATE_ID, ReportStateCargo(fp))
if __name__ == "__main__":
test()
|
I wanted to improve my code from recipe http://code.activestate.com/recipes/574430/ by stripping StateMachine to the essentials, and by making the cargo and states into objects to be extended by the usual subclassing, instead of the ad hoc approach in the original Mertz code.
The result is a simple layered pattern in which state machines for text processing can be easily implemented by choosing states and writing regular expressions that define transitions to the next states.