Welcome, guest | Sign In | My Account | Store | Cart
"""
sax2dom_chunker.py    version 1.1

A SAX handler that takes a set of element paths and
creates a series of DOM chunks matching the element paths
for individual processing.  Designed for Python 2.2. or greater.

Copyright 2004 Fourthought Inc, USA.
This work is licensed under Creative Commons Attribution 1.0
For details: http://creativecommons.org/licenses/by/1.0/
"""

from xml import sax
from xml.dom import XML_NAMESPACE, XMLNS_NAMESPACE, EMPTY_NAMESPACE
import xml.dom.minidom

DUMMY_DOCELEM = u'dummy'
START_STATE = 0
TOP = -1

class _state_machine:
    """
    A simple state machine specialized for DOM chunking from SAX
    A state is "live" when it represents the successful completion
    of a path.
    This is generally a signal to the handler using this state machine
    to start creating the DOM fragment from the subset of SAX
    events until we transit to a non-live state
    """
    def __init__(self, trim_to_paths):
        if not trim_to_paths:
            self.event = self.event_nop
            self.is_live = self.is_live_nop
            return
        self._state_table = {START_STATE: {}}
        self._live_states = []
        #Use the given trim paths to generate a state table
        newest_state = START_STATE
        for path in trim_to_paths:
            last_state = START_STATE
            for segment in path:
                start_event = (1, segment[0], segment[1])
                end_event = (0, segment[0], segment[1])
                if self._state_table[last_state].has_key(start_event):
                    top_state = \
                        self._state_table[last_state][start_event]
                else:
                    newest_state += 1
                    top_state = newest_state
                    self._state_table[top_state] = {}
                self._state_table[last_state][start_event] = \
                    top_state
                self._state_table[top_state][end_event] = \
                    last_state
                last_state = top_state
            self._live_states.append(top_state)
        self._state = START_STATE
        self.chunk_completed = 0
        return

    def event(self, is_start, ns, local):
        """
        Register an event and effect ant state transitions
        found in the state table
        """
        #We only have a chunk ready for the handler in
        #the explicit case below
        self.chunk_completed = 0
        lookup_from = self._state_table[self._state]
        if lookup_from.has_key((is_start, ns, local)):
             new_state = lookup_from[(is_start, ns, local)]
             #If we have completed a chunk, we set a flag for
             #The chunker
             if (self._state in self._live_states and
                 new_state not in self._live_states):
                 self.chunk_completed = 1
             self._state = new_state
        return self._state

    def is_live(self):
        """
        1 if the curent state is considered live, else 0
        """
        return self._state in self._live_states

    def event_nop(self, is_start, ns, local):
        pass

    def is_live_nop(self):
        return 1


class sax2dom_chunker(sax.ContentHandler):
    """
    Note: ignores nodes prior to the document element, such as PIs and
    text nodes
    This filter is only designed to work if you set features
    sax.handler.feature_namespaces
    and
    sax.handler.feature_namespace_prefixes
    to 1 on the parser you use.  It will not work on drivers that
    do not support these features.  The default drv_expat works fine
    in this case, but of course has but very limited DTD processing.
    It also collapses CDATA sections into plain text

    trim_to_paths - a list of lists of tuples.  Each tuple is of
        the form (namespace, local-name), providing one segment
        in a path of contained elements
        [
          [ (None, u'monty'), (None, u'python') ],
          [ (None, u'monty'), (None, u'spam'), ('urn:dummy', u'eggs') ]
        ]
        If None (the default, a DOM node will be created representing
        the entire tree.

    chunk_consumer - a callable object taking a DOM node.  It will be
        invoked as each DOM chunk is prepared.
    
    domimpl - DOM implemention to build, e.g. mindom (the default)
        or cDomlette or pxdom (if you have the right third-party
        packages installed).
    
    owner_doc - for advanced uses, if you want to use an existing
        DOM document object as the owner of all created nodes.
    """
    def __init__(self,
                 trim_to_paths=None,
                 chunk_consumer=None,
                 domimpl=xml.dom.minidom.getDOMImplementation(),
                 owner_doc=None
                 ):
        self._impl = domimpl
        if owner_doc:
            self._owner_doc = owner_doc
        else:
            dt = self._impl.createDocumentType(DUMMY_DOCELEM, None, u'')
            self._owner_doc = self._impl.createDocument(
                DUMMY_DOCELEM, DUMMY_DOCELEM, dt)
        #Create a docfrag to hold all the generated nodes.
        root_node = self._owner_doc.createDocumentFragment()
        self._nodeStack = [ root_node ]
        self.state_machine = _state_machine(trim_to_paths)
        self._chunk_consumer = chunk_consumer
        return

    def get_root_node(self):
        """
        Only useful if the user does not register trim paths
        If so, then after SAX processing the user can call this
        method to retrieve resulting DOm representing the entire
        document
        """
        return self._nodeStack[0]

    #Overridden DocumentHandler methods
    def startElementNS(self, name, qname, attribs):
        self.state_machine.event(1, name[0], name[1])
        if not self.state_machine.is_live():
            return
        (ns, local) = name
        new_element = self._owner_doc.createElementNS(ns, qname or local)

        for ((attr_ns, lname), value) in attribs.items():
            if attr_ns is not None:
                attr_qname = attribs.getQNameByName((attr_ns, lname))
            else:
                attr_qname = lname
            attr = self._owner_doc.createAttributeNS(
                attr_ns, attr_qname)
            attr_qname = attribs.getQNameByName((attr_ns, lname))
            attr.value = value
            new_element.setAttributeNodeNS(attr)

        self._nodeStack.append(new_element)
        return

    def endElementNS(self, name, qname):
        self.state_machine.event(0, name[0], name[1])
        if not self.state_machine.is_live():
            if (self._chunk_consumer and
                self.state_machine.chunk_completed):
                #Complete the element being closed because it
                #Is the last bit of a DOM to be fed to the consumer
                new_element = self._nodeStack[TOP]
                del self._nodeStack[TOP]
                self._nodeStack[TOP].appendChild(new_element)
                #Feed the consumer
                self._chunk_consumer(self._nodeStack[0])
                #Start all over with a new doc frag so the old
                #One's memory can be reclaimed
                root_node = self._owner_doc.createDocumentFragment()
                self._nodeStack = [ root_node ]
            return
        new_element = self._nodeStack[TOP]
        del self._nodeStack[TOP]
        self._nodeStack[TOP].appendChild(new_element)
        return

    def processingInstruction(self, target, data):
        if self.state_machine.is_live():
            pi = self._owner_doc.createProcessingInstruction(
                target, data)
            self._nodeStack[TOP].appendChild(pi)
        return

    #Overridden LexicalHandler methods
    def comment(self, text):
        if self.state_machine.is_live():
            new_comment = self._owner_doc.createComment(text)
            self._nodeStack[TOP].appendChild(new_comment)
        return

    def characters(self, chars):
        if self.state_machine.is_live():
            new_text = self._owner_doc.createTextNode(chars)
            self._nodeStack[TOP].appendChild(new_text)
        return

History