""" sax2dom_chunker.py version 1.1 A SAX handler that takes a set of element paths and creates a series of DOM chunks matching the element paths for individual processing. Designed for Python 2.2. or greater. Copyright 2004 Fourthought Inc, USA. This work is licensed under Creative Commons Attribution 1.0 For details: http://creativecommons.org/licenses/by/1.0/ """ from xml import sax from xml.dom import XML_NAMESPACE, XMLNS_NAMESPACE, EMPTY_NAMESPACE import xml.dom.minidom DUMMY_DOCELEM = u'dummy' START_STATE = 0 TOP = -1 class _state_machine: """ A simple state machine specialized for DOM chunking from SAX A state is "live" when it represents the successful completion of a path. This is generally a signal to the handler using this state machine to start creating the DOM fragment from the subset of SAX events until we transit to a non-live state """ def __init__(self, trim_to_paths): if not trim_to_paths: self.event = self.event_nop self.is_live = self.is_live_nop return self._state_table = {START_STATE: {}} self._live_states = [] #Use the given trim paths to generate a state table newest_state = START_STATE for path in trim_to_paths: last_state = START_STATE for segment in path: start_event = (1, segment[0], segment[1]) end_event = (0, segment[0], segment[1]) if self._state_table[last_state].has_key(start_event): top_state = \ self._state_table[last_state][start_event] else: newest_state += 1 top_state = newest_state self._state_table[top_state] = {} self._state_table[last_state][start_event] = \ top_state self._state_table[top_state][end_event] = \ last_state last_state = top_state self._live_states.append(top_state) self._state = START_STATE self.chunk_completed = 0 return def event(self, is_start, ns, local): """ Register an event and effect ant state transitions found in the state table """ #We only have a chunk ready for the handler in #the explicit case below self.chunk_completed = 0 lookup_from = self._state_table[self._state] if lookup_from.has_key((is_start, ns, local)): new_state = lookup_from[(is_start, ns, local)] #If we have completed a chunk, we set a flag for #The chunker if (self._state in self._live_states and new_state not in self._live_states): self.chunk_completed = 1 self._state = new_state return self._state def is_live(self): """ 1 if the curent state is considered live, else 0 """ return self._state in self._live_states def event_nop(self, is_start, ns, local): pass def is_live_nop(self): return 1 class sax2dom_chunker(sax.ContentHandler): """ Note: ignores nodes prior to the document element, such as PIs and text nodes This filter is only designed to work if you set features sax.handler.feature_namespaces and sax.handler.feature_namespace_prefixes to 1 on the parser you use. It will not work on drivers that do not support these features. The default drv_expat works fine in this case, but of course has but very limited DTD processing. It also collapses CDATA sections into plain text trim_to_paths - a list of lists of tuples. Each tuple is of the form (namespace, local-name), providing one segment in a path of contained elements [ [ (None, u'monty'), (None, u'python') ], [ (None, u'monty'), (None, u'spam'), ('urn:dummy', u'eggs') ] ] If None (the default, a DOM node will be created representing the entire tree. chunk_consumer - a callable object taking a DOM node. It will be invoked as each DOM chunk is prepared. domimpl - DOM implemention to build, e.g. mindom (the default) or cDomlette or pxdom (if you have the right third-party packages installed). owner_doc - for advanced uses, if you want to use an existing DOM document object as the owner of all created nodes. """ def __init__(self, trim_to_paths=None, chunk_consumer=None, domimpl=xml.dom.minidom.getDOMImplementation(), owner_doc=None ): self._impl = domimpl if owner_doc: self._owner_doc = owner_doc else: dt = self._impl.createDocumentType(DUMMY_DOCELEM, None, u'') self._owner_doc = self._impl.createDocument( DUMMY_DOCELEM, DUMMY_DOCELEM, dt) #Create a docfrag to hold all the generated nodes. root_node = self._owner_doc.createDocumentFragment() self._nodeStack = [ root_node ] self.state_machine = _state_machine(trim_to_paths) self._chunk_consumer = chunk_consumer return def get_root_node(self): """ Only useful if the user does not register trim paths If so, then after SAX processing the user can call this method to retrieve resulting DOm representing the entire document """ return self._nodeStack[0] #Overridden DocumentHandler methods def startElementNS(self, name, qname, attribs): self.state_machine.event(1, name[0], name[1]) if not self.state_machine.is_live(): return (ns, local) = name new_element = self._owner_doc.createElementNS(ns, qname or local) for ((attr_ns, lname), value) in attribs.items(): if attr_ns is not None: attr_qname = attribs.getQNameByName((attr_ns, lname)) else: attr_qname = lname attr = self._owner_doc.createAttributeNS( attr_ns, attr_qname) attr_qname = attribs.getQNameByName((attr_ns, lname)) attr.value = value new_element.setAttributeNodeNS(attr) self._nodeStack.append(new_element) return def endElementNS(self, name, qname): self.state_machine.event(0, name[0], name[1]) if not self.state_machine.is_live(): if (self._chunk_consumer and self.state_machine.chunk_completed): #Complete the element being closed because it #Is the last bit of a DOM to be fed to the consumer new_element = self._nodeStack[TOP] del self._nodeStack[TOP] self._nodeStack[TOP].appendChild(new_element) #Feed the consumer self._chunk_consumer(self._nodeStack[0]) #Start all over with a new doc frag so the old #One's memory can be reclaimed root_node = self._owner_doc.createDocumentFragment() self._nodeStack = [ root_node ] return new_element = self._nodeStack[TOP] del self._nodeStack[TOP] self._nodeStack[TOP].appendChild(new_element) return def processingInstruction(self, target, data): if self.state_machine.is_live(): pi = self._owner_doc.createProcessingInstruction( target, data) self._nodeStack[TOP].appendChild(pi) return #Overridden LexicalHandler methods def comment(self, text): if self.state_machine.is_live(): new_comment = self._owner_doc.createComment(text) self._nodeStack[TOP].appendChild(new_comment) return def characters(self, chars): if self.state_machine.is_live(): new_text = self._owner_doc.createTextNode(chars) self._nodeStack[TOP].appendChild(new_text) return