This module is similar to pulldom in that it takes a stream of SAX objects and breaks it down into chunks of DOM. The differences are that it works with any DOM implementation meeting the Python DOM conventions, and that it uses simple pattern expressions to declaratively set how the DOM chunks are partitioned, rather than requiring the user to write procedural code for this purpose. This is an updated/fixed version of code that appeared in an XML.com column.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | """
sax2dom_chunker.py    version 1.1
A SAX handler that takes a set of element paths and
creates a series of DOM chunks matching the element paths
for individual processing.  Designed for Python 2.2. or greater.
Copyright 2004 Fourthought Inc, USA.
This work is licensed under Creative Commons Attribution 1.0
For details: http://creativecommons.org/licenses/by/1.0/
"""
from xml import sax
from xml.dom import XML_NAMESPACE, XMLNS_NAMESPACE, EMPTY_NAMESPACE
import xml.dom.minidom
DUMMY_DOCELEM = u'dummy'
START_STATE = 0
TOP = -1
class _state_machine:
    """
    A simple state machine specialized for DOM chunking from SAX
    A state is "live" when it represents the successful completion
    of a path.
    This is generally a signal to the handler using this state machine
    to start creating the DOM fragment from the subset of SAX
    events until we transit to a non-live state
    """
    def __init__(self, trim_to_paths):
        if not trim_to_paths:
            self.event = self.event_nop
            self.is_live = self.is_live_nop
            return
        self._state_table = {START_STATE: {}}
        self._live_states = []
        #Use the given trim paths to generate a state table
        newest_state = START_STATE
        for path in trim_to_paths:
            last_state = START_STATE
            for segment in path:
                start_event = (1, segment[0], segment[1])
                end_event = (0, segment[0], segment[1])
                if self._state_table[last_state].has_key(start_event):
                    top_state = \
                        self._state_table[last_state][start_event]
                else:
                    newest_state += 1
                    top_state = newest_state
                    self._state_table[top_state] = {}
                self._state_table[last_state][start_event] = \
                    top_state
                self._state_table[top_state][end_event] = \
                    last_state
                last_state = top_state
            self._live_states.append(top_state)
        self._state = START_STATE
        self.chunk_completed = 0
        return
    def event(self, is_start, ns, local):
        """
        Register an event and effect ant state transitions
        found in the state table
        """
        #We only have a chunk ready for the handler in
        #the explicit case below
        self.chunk_completed = 0
        lookup_from = self._state_table[self._state]
        if lookup_from.has_key((is_start, ns, local)):
             new_state = lookup_from[(is_start, ns, local)]
             #If we have completed a chunk, we set a flag for
             #The chunker
             if (self._state in self._live_states and
                 new_state not in self._live_states):
                 self.chunk_completed = 1
             self._state = new_state
        return self._state
    def is_live(self):
        """
        1 if the curent state is considered live, else 0
        """
        return self._state in self._live_states
    def event_nop(self, is_start, ns, local):
        pass
    def is_live_nop(self):
        return 1
class sax2dom_chunker(sax.ContentHandler):
    """
    Note: ignores nodes prior to the document element, such as PIs and
    text nodes
    This filter is only designed to work if you set features
    sax.handler.feature_namespaces
    and
    sax.handler.feature_namespace_prefixes
    to 1 on the parser you use.  It will not work on drivers that
    do not support these features.  The default drv_expat works fine
    in this case, but of course has but very limited DTD processing.
    It also collapses CDATA sections into plain text
    trim_to_paths - a list of lists of tuples.  Each tuple is of
        the form (namespace, local-name), providing one segment
        in a path of contained elements
        [
          [ (None, u'monty'), (None, u'python') ],
          [ (None, u'monty'), (None, u'spam'), ('urn:dummy', u'eggs') ]
        ]
        If None (the default, a DOM node will be created representing
        the entire tree.
    chunk_consumer - a callable object taking a DOM node.  It will be
        invoked as each DOM chunk is prepared.
    
    domimpl - DOM implemention to build, e.g. mindom (the default)
        or cDomlette or pxdom (if you have the right third-party
        packages installed).
    
    owner_doc - for advanced uses, if you want to use an existing
        DOM document object as the owner of all created nodes.
    """
    def __init__(self,
                 trim_to_paths=None,
                 chunk_consumer=None,
                 domimpl=xml.dom.minidom.getDOMImplementation(),
                 owner_doc=None
                 ):
        self._impl = domimpl
        if owner_doc:
            self._owner_doc = owner_doc
        else:
            dt = self._impl.createDocumentType(DUMMY_DOCELEM, None, u'')
            self._owner_doc = self._impl.createDocument(
                DUMMY_DOCELEM, DUMMY_DOCELEM, dt)
        #Create a docfrag to hold all the generated nodes.
        root_node = self._owner_doc.createDocumentFragment()
        self._nodeStack = [ root_node ]
        self.state_machine = _state_machine(trim_to_paths)
        self._chunk_consumer = chunk_consumer
        return
    def get_root_node(self):
        """
        Only useful if the user does not register trim paths
        If so, then after SAX processing the user can call this
        method to retrieve resulting DOm representing the entire
        document
        """
        return self._nodeStack[0]
    #Overridden DocumentHandler methods
    def startElementNS(self, name, qname, attribs):
        self.state_machine.event(1, name[0], name[1])
        if not self.state_machine.is_live():
            return
        (ns, local) = name
        new_element = self._owner_doc.createElementNS(ns, qname or local)
        for ((attr_ns, lname), value) in attribs.items():
            if attr_ns is not None:
                attr_qname = attribs.getQNameByName((attr_ns, lname))
            else:
                attr_qname = lname
            attr = self._owner_doc.createAttributeNS(
                attr_ns, attr_qname)
            attr_qname = attribs.getQNameByName((attr_ns, lname))
            attr.value = value
            new_element.setAttributeNodeNS(attr)
        self._nodeStack.append(new_element)
        return
    def endElementNS(self, name, qname):
        self.state_machine.event(0, name[0], name[1])
        if not self.state_machine.is_live():
            if (self._chunk_consumer and
                self.state_machine.chunk_completed):
                #Complete the element being closed because it
                #Is the last bit of a DOM to be fed to the consumer
                new_element = self._nodeStack[TOP]
                del self._nodeStack[TOP]
                self._nodeStack[TOP].appendChild(new_element)
                #Feed the consumer
                self._chunk_consumer(self._nodeStack[0])
                #Start all over with a new doc frag so the old
                #One's memory can be reclaimed
                root_node = self._owner_doc.createDocumentFragment()
                self._nodeStack = [ root_node ]
            return
        new_element = self._nodeStack[TOP]
        del self._nodeStack[TOP]
        self._nodeStack[TOP].appendChild(new_element)
        return
    def processingInstruction(self, target, data):
        if self.state_machine.is_live():
            pi = self._owner_doc.createProcessingInstruction(
                target, data)
            self._nodeStack[TOP].appendChild(pi)
        return
    #Overridden LexicalHandler methods
    def comment(self, text):
        if self.state_machine.is_live():
            new_comment = self._owner_doc.createComment(text)
            self._nodeStack[TOP].appendChild(new_comment)
        return
    def characters(self, chars):
        if self.state_machine.is_live():
            new_text = self._owner_doc.createTextNode(chars)
            self._nodeStack[TOP].appendChild(new_text)
        return
 | 
This code is just a fix and update to listing 2 in my article "Decomposition, Process, Recomposition" ( http://www.xml.com/pub/a/2004/07/28/py-xml.html ), so read that article for fuller description and usage examples. The code in that article required the user to set up namespace prefix reporting in SAX, but this only works if PyXML is installed. This code has that limitation removed. If you're trying this code with the listings (3 and 4) in the article and you do not have PyXML installed, just delete the line:
parser.setFeature(sax.handler.feature_namespace_prefixes, 1)
In the set-up code.

 Download
Download Copy to clipboard
Copy to clipboard

Updated version of code in Amara XML Toolkit. This class, with some improvements and fixes, is now part of Amara's saxtools:
http://www.xml.com/pub/a/2005/01/19/amara.html
--Uche