Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = "James Eric Pruitt"
__all__ = [ "serialize", "deserialize" ]
__version__ = "2009.11.04"

import collections

itertable = {}
for t in [set, frozenset, list, tuple, dict]:
    itertable[t] = t.__name__
    itertable[t.__name__] = t

supporteddict = {
    int: (repr, int, "int"),
    long: (repr, long, "long"),
    bool: (repr, lambda s: s == "True", "bool"),
    complex: (repr, lambda s: complex(s[1:-1]), "complex"),
    float: (repr, float, "float"),

    str: (
        lambda s: s.encode("string-escape"),
        lambda s: s.decode("string-escape"), "str"),
    unicode: (
        lambda s: s.encode("unicode-escape"),
        lambda s: s.decode("unicode-escape"), "unicode"),

    type(None): (repr, lambda s: None, "None"), # None is a special case;
}                                               # type(None) != None

# inverted dictionary
supporteddictinv = dict(
    (name,func) for (_,func,name) in supporteddict.itervalues())

def serialize(root):
    """
    Serializes some of the fundamental data types in Python.

    Serialization function designed to not possess the same security flaws
    as the cPickle and pickle modules. At present, the following data types
    are supported:

        set, frozenset, list, tuple, dict, int, long, bool, complex, float,
        None, str, unicode

    To convert the serialized object back into a Python object, pass the text
    through the deserialize function.

    >>> deserialize(serialize((1, 2, 3+4j, ['this', 'is', 'a', 'list'])))
    (1, 2, (3+4j), ['this', 'is', 'a', 'list'])
    """
    stack = collections.deque([ (0,(root,)) ])
    lintree, eid = collections.deque(), 0
    while stack:
        uid, focus = stack.pop()
        for element in focus:
            eid += 1
            if hasattr(focus, "keys"): # Support for dictionaries
                lintree.appendleft((eid, uid, 'C', "tuple"))
                stack.append((eid, (element, focus[element])))
            elif hasattr(element, "__iter__"):
                lintree.appendleft((eid, uid, 'C', itertable[type(element)]))
                stack.append((eid, element))
            else:
                elementtype = type(element)
                serializefunc, _, label = supporteddict[elementtype]
                lintree.appendleft((eid, uid, label, serializefunc(element)))

    return '\n'.join(str(element) for entry in lintree for element in entry)

def deserialize(text):
    """
    Deserializes data generated by the serialize function.

    >>> deserialize(serialize((1, 2, 3+4j, ['this', 'is', 'a', 'list'])))
    (1, 2, (3+4j), ['this', 'is', 'a', 'list'])
    """
    nodaldict = { 0: collections.deque() }
    text = text.split('\n')
    lastpid = int(text[1])
    for quartet in xrange(0, len(text) - 1, 4):
        eid, pid = int(text[quartet]), int(text[quartet+1])
        moniker = text[quartet+2]
        if moniker == 'C':
            encapsulator = itertable[text[quartet+3]]
            appendage = encapsulator(nodaldict.get(eid, collections.deque()))
        else:
            deserializer = supporteddictinv[moniker]
            appendage = deserializer(text[quartet+3])
        nodaldict.setdefault(pid, collections.deque()).appendleft(appendage)

    return nodaldict[0].pop()

def test(supressoutput = False):
    testvectors = [
        list(((None, True, False), (1, 12341234123412341234123412341234L,0.5),
              0.12341234123412341234,
              u'This is\nan\tUnicode string\u0A0D\N{HANGUL SYLLABLE BYENH}',
              set(('A','B','D')),
            frozenset(tuple((1, '9', -0.12341234123412341234+1j, 'Y'))))),
        tuple(),
        list(),
        set(),
        frozenset(),
        'Element that is not nested.',
        {'a': (1, "Waka"), ('X', 'K'): u'CD', u'y':{ 1:'O', 2:('T','wo')}}]

    # Recursion not yet properly supported.
    #x = {'a': None, 'z': None}
    #y = {'x': x}
    #x['y'] = y
    #testvectors.extend([x,y])
    for root in testvectors:
        serialized = serialize(root)
        inverse = deserialize(serialized)
        if not supressoutput:
            print "Expected: ", repr(root)
            print "Returned: ", repr(inverse)
        if (inverse != root):
            raise ValueError, "The test failed."

if __name__ == '__main__':
    print "Running test..."
    test()
    print "Test passed."

History

  • revision 19 (14 years ago)
  • previous revisions are not available