General state machine mechanism plus a specialized version, LineStateMachine, for processing text files based by using regular expressions to determine state transitions.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 | """
LineStateMachine.py
General state machine mechanism plus a specialized version,
LineStateMachine, for processing text files based on regular expression
line matching.
Implementing a specific LineStateMachine
========================================
* Analyze states necessary for state machine
* Create states as subclasses of LineState.
- Set ID for state.
- Determine regular expression that signal transitions.
- Set TRANSITIONS list for list of regexes and corresponding next states.
- Order TRANSITIONS so that more general regexes are tested later, e.g.
testing (.*) first will prevent any other transitions from being tested.
- Write handler to extract info cargo match_object for current line,
then call LineState.handle() to continue processing text.
* Subclass LineStateCargo to contain information to be shared and accumulated
between states over course of text processing.
* Subclass LineStateMachine.
- Add states to initializer.
- Start LineStateMachine by calling run() with start state and cargo
containing text file pointer.
See ReportStateMachine example below.
Extensively rewritten from sample code in "Text Processing in Python" by
David Mertz, pp. 274-280, which can be found at http://gnosis.cx/TPiP/chap4.txt.
Jack Trainor 2009
"""
import string
import re
# ======================================================================
# StateMachine layer
# ======================================================================
END_STATE_ID = "END"
ERROR_STATE_ID = "ERROR"
NONE_STATE_ID = ""
class Cargo(object):
""" Instances of Cargo and its subclasses contain information to be shared
and accumulated between states and returned to state machine caller. """
def __init__(self):
self.error_msg = ""
class State(object):
""" States are instances based on class variables and a handler function.
The state handler runs until it hits transition to the next state. """
ID = NONE_STATE_ID
TRANSITIONS = []
def handle(self, cargo):
return NONE_STATE_ID, cargo
class StateMachine(object):
""" Minimal state machine that runs based on state ids and state dictionary. """
def __init__(self):
self.states = {}
def add_state(self, state):
self.states[state.ID] = state
def run(self, start_state_id, cargo):
state_id = start_state_id
while state_id:
state = self.states.get(state_id, None)
if state:
state_id, cargo = state.handle(cargo)
else:
raise RuntimeError, "%s state does not exist." % state_id
return cargo
# ======================================================================
# LineStateMachine layer
# ======================================================================
class LineStateCargo(Cargo):
""" Cargo specific to LineState's. """
def __init__(self, fp):
Cargo.__init__(self)
self.fp = fp
# match_object is saved for next state to extract text information.
self.match_object = None
# buffer are unmatched lines accumulated in state for possible use.
self.buffer = []
class LineStateTransition(object):
def __init__(self, match_re, next_state_id):
self.match_re = match_re
self.next_state_id = next_state_id
class LineState(State):
""" A LineState reads lines until the file ends or the LineState matches
a compiled regex expression in its transition list, then it returns the
next state id of that transition with the current cargo. Lines that
don't match a transition are accumulated into cargo.buffer to be used by
the current state or the next state. """
ID = NONE_STATE_ID
TRANSITIONS = []
def handle(self, cargo):
cargo.buffer = []
next_state_id = NONE_STATE_ID
while not next_state_id:
line = cargo.fp.readline()
if line:
for transition in self.TRANSITIONS:
cargo.match_object = transition.match_re.match(line)
if cargo.match_object:
next_state_id = transition.next_state_id
break
else:
next_state_id = END_STATE_ID
if not next_state_id:
cargo.buffer.append(line)
return next_state_id, cargo
class EndState(State):
ID = END_STATE_ID
def handle(self, cargo):
cargo.fp.close()
print "Normal termination"
return NONE_STATE_ID, cargo
class ErrorState(State):
ID = ERROR_STATE_ID
def handle(self, cargo):
cargo.fp.close()
print "Error:", cargo.error_msg
return NONE_STATE_ID, cargo
class LineStateMachine(StateMachine):
def __init__(self):
StateMachine.__init__(self)
self.add_state(EndState())
self.add_state(ErrorState())
# ======================================================================
# ReportStateMachine layer
# ======================================================================
START_STATE_ID = "START"
COMPANY_STATE_ID = "COMPANY"
ORDER_STATE_ID = "ORDER"
COMMENT_START_STATE_ID = "COMMENT_START"
COMMENT_END_STATE_ID = "COMMENT_END"
COMPANY_RE = re.compile(r"^>> (.*)\s*$")
ORDER_RE = re.compile(r"^\s*(\w+)\s+(\d+)([Kk]*).*$")
COMMENT_START_RE = re.compile(r"^\*(.*)$")
COMMENT_END_RE = re.compile(r"^(.*)\*$")
class ReportStateCargo(LineStateCargo):
def __init__(self, fp):
LineStateCargo.__init__(self, fp)
self.company = ""
self.amount = 0
class StartState(LineState):
ID = START_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
return LineState.handle(self, cargo)
class CompanyState(LineState):
ID = COMPANY_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(ORDER_RE, ORDER_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
cargo.company = cargo.match_object.group(1)
cargo.amount = 0
return LineState.handle(self, cargo)
class OrderState(LineState):
ID = ORDER_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(ORDER_RE, ORDER_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
product = cargo.match_object.group(1)
quantity = get_quantity(cargo.match_object.group(2) + cargo.match_object.group(3))
price = get_product_price(cargo.company, product)
cargo.amount += quantity * price
next_state_id, cargo = LineState.handle(self, cargo)
if next_state_id != ORDER_STATE_ID:
print_invoice(cargo.company, cargo.amount)
return next_state_id, cargo
class CommentStartState(LineState):
ID = COMMENT_START_STATE_ID
TRANSITIONS = [
LineStateTransition(COMMENT_END_RE, COMMENT_END_STATE_ID)
]
def handle(self, cargo):
comment_line = cargo.match_object.group(1)
if comment_line and comment_line[-1] == "*":
next_state_id = COMMENT_END_STATE_ID
else:
next_state_id, cargo = LineState.handle(self, cargo)
return next_state_id, cargo
class CommentEndState(LineState):
ID = COMMENT_END_STATE_ID
TRANSITIONS = [
LineStateTransition(COMPANY_RE, COMPANY_STATE_ID),
LineStateTransition(ORDER_RE, ORDER_STATE_ID),
LineStateTransition(COMMENT_START_RE, COMMENT_START_STATE_ID)
]
def handle(self, cargo):
comment_line = cargo.match_object.group(1)
return LineState.handle(self, cargo)
class ReportStateMachine(LineStateMachine):
def __init__(self):
LineStateMachine.__init__(self)
self.add_state(StartState())
self.add_state(CompanyState())
self.add_state(OrderState())
self.add_state(CommentStartState())
self.add_state(CommentEndState())
# ======================================================================
# Report utility functions
# ======================================================================
# Discount consists of dollar requirement and a percentage reduction.
# Each buyer can have an ascending series of discounts, the highest
# one applicable to a month is used.
discount_schedules = {
"STANDARD" : [(5000,10),(10000,20),(15000,30),(20000,40)],
"ACME" : [(1000,10),(5000,15),(10000,30),(20000,40)],
"MEGAMART" : [(2000,10),(5000,20),(10000,25),(30000,50)],
"BAGOBOLTS" : [(2500,10),(5000,15),(10000,25),(30000,50)],
}
item_prices = {
"STANDARD" : {'widgets':1.0, 'whatzits':0.9, 'doodads':1.1,
'dingdongs':1.3, 'flazs':0.7},
"ACME" : {'widgets':0.9, 'whatzits':0.9, 'doodads':1.0,
'dingdongs':0.9, 'flazs':0.6},
"MEGAMART" : {'widgets':1.0, 'whatzits':0.8, 'doodads':1.0,
'dingdongs':1.2, 'flazs':0.7},
"BAGOBOLTS" : {'widgets':0.8, 'whatzits':0.9, 'doodads':1.1,
'dingdongs':1.3, 'flazs':0.5},
}
discount_types = {
"STANDARD" : "standard",
"ACME" : "negotiated",
"MEGAMART" : "negotiated"
}
def get_company_id(company):
if company.upper().find("ACME") >= 0:
return "ACME"
elif company.upper().find("MEGAMART") >= 0:
return "MEGAMART"
else:
return "STANDARD"
def get_discount_schedule(company):
return discount_schedules.get(get_company_id(company), discount_schedules["STANDARD"])
def get_product_price(company, product):
prices = item_prices.get(get_company_id(company), item_prices["STANDARD"])
return prices[product]
def get_discount_type(company):
return discount_types.get(get_company_id(company), discount_types["STANDARD"])
def get_discounted_amount(company, amount):
multiplier = 1.0
for threshhold, percent in get_discount_schedule(company):
if amount >= threshhold: multiplier = 1 - float(percent)/100
return amount * multiplier
def get_quantity(quantity):
quantity = string.replace(string.upper(quantity),'K','000')
quantity = int(quantity)
return quantity
def print_invoice(company, amount):
print "Company name:", company, "(%s discounts)" % get_discount_type(company)
print "Invoice total: $", get_discounted_amount(company, amount), '\n'
# ======================================================================
# ReportStateMachine test
# ======================================================================
REPORT = """MONTHLY REPORT -- April 2002
===================================================================
Rules:
- Each buyer has price schedule for each item (func of quantity).
- Each buyer has a discount schedule based on dollar totals.
- Discounts are per-order (i.e. contiguous block)
- Buyer listing starts with line containing ">>", then buyer name.
- Item quantities have name-whitespace-number, one per line.
- Comment sections begin with line starting with an asterisk,
and ends with first line that ends with an asterisk.
>> Acme Purchasing
widgets 100
whatzits 1000
doodads 5000
dingdongs 20
* Note to Donald: The best contact for Acme is Debbie Franklin, at
* 413-555-0001. Fallback is Sue Fong (call switchboard). *
>> Megamart
doodads 10k
whatzits 5k
>> Fly-by-Night Sellers
widgets 500
whatzits 4
flazs 1000
* Note to Harry: Have Sales contact FbN for negotiations *
*
Known buyers:
>> Acme
>> Megamart
>> Standard (default discounts)
*
*** LATE ADDITIONS ***
>> Acme Purchasing
widgets 500 (rush shipment)**
"""
def test():
import StringIO
fp = StringIO.StringIO(REPORT)
cargo = ReportStateMachine().run(START_STATE_ID, ReportStateCargo(fp))
if __name__ == "__main__":
test()
|
I wanted to improve my code from recipe http://code.activestate.com/recipes/574430/ by stripping StateMachine to the essentials, and by making the cargo and states into objects to be extended by the usual subclassing, instead of the ad hoc approach in the original Mertz code.
The result is a simple layered pattern in which state machines for text processing can be easily implemented by choosing states and writing regular expressions that define transitions to the next states.