#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = "James Eric Pruitt" __all__ = [ "serialize", "deserialize" ] __version__ = "2009.11.04" import collections itertable = {} for t in [set, frozenset, list, tuple, dict]: itertable[t] = t.__name__ itertable[t.__name__] = t supporteddict = { int: (repr, int, "int"), long: (repr, long, "long"), bool: (repr, lambda s: s == "True", "bool"), complex: (repr, lambda s: complex(s[1:-1]), "complex"), float: (repr, float, "float"), str: ( lambda s: s.encode("string-escape"), lambda s: s.decode("string-escape"), "str"), unicode: ( lambda s: s.encode("unicode-escape"), lambda s: s.decode("unicode-escape"), "unicode"), type(None): (repr, lambda s: None, "None"), # None is a special case; } # type(None) != None # inverted dictionary supporteddictinv = dict( (name,func) for (_,func,name) in supporteddict.itervalues()) def serialize(root): """ Serializes some of the fundamental data types in Python. Serialization function designed to not possess the same security flaws as the cPickle and pickle modules. At present, the following data types are supported: set, frozenset, list, tuple, dict, int, long, bool, complex, float, None, str, unicode To convert the serialized object back into a Python object, pass the text through the deserialize function. >>> deserialize(serialize((1, 2, 3+4j, ['this', 'is', 'a', 'list']))) (1, 2, (3+4j), ['this', 'is', 'a', 'list']) """ stack = collections.deque([ (0,(root,)) ]) lintree, eid = collections.deque(), 0 while stack: uid, focus = stack.pop() for element in focus: eid += 1 if hasattr(focus, "keys"): # Support for dictionaries lintree.appendleft((eid, uid, 'C', "tuple")) stack.append((eid, (element, focus[element]))) elif hasattr(element, "__iter__"): lintree.appendleft((eid, uid, 'C', itertable[type(element)])) stack.append((eid, element)) else: elementtype = type(element) serializefunc, _, label = supporteddict[elementtype] lintree.appendleft((eid, uid, label, serializefunc(element))) return '\n'.join(str(element) for entry in lintree for element in entry) def deserialize(text): """ Deserializes data generated by the serialize function. >>> deserialize(serialize((1, 2, 3+4j, ['this', 'is', 'a', 'list']))) (1, 2, (3+4j), ['this', 'is', 'a', 'list']) """ nodaldict = { 0: collections.deque() } text = text.split('\n') lastpid = int(text[1]) for quartet in xrange(0, len(text) - 1, 4): eid, pid = int(text[quartet]), int(text[quartet+1]) moniker = text[quartet+2] if moniker == 'C': encapsulator = itertable[text[quartet+3]] appendage = encapsulator(nodaldict.get(eid, collections.deque())) else: deserializer = supporteddictinv[moniker] appendage = deserializer(text[quartet+3]) nodaldict.setdefault(pid, collections.deque()).appendleft(appendage) return nodaldict[0].pop() def test(supressoutput = False): testvectors = [ list(((None, True, False), (1, 12341234123412341234123412341234L,0.5), 0.12341234123412341234, u'This is\nan\tUnicode string\u0A0D\N{HANGUL SYLLABLE BYENH}', set(('A','B','D')), frozenset(tuple((1, '9', -0.12341234123412341234+1j, 'Y'))))), tuple(), list(), set(), frozenset(), 'Element that is not nested.', {'a': (1, "Waka"), ('X', 'K'): u'CD', u'y':{ 1:'O', 2:('T','wo')}}] # Recursion not yet properly supported. #x = {'a': None, 'z': None} #y = {'x': x} #x['y'] = y #testvectors.extend([x,y]) for root in testvectors: serialized = serialize(root) inverse = deserialize(serialized) if not supressoutput: print "Expected: ", repr(root) print "Returned: ", repr(inverse) if (inverse != root): raise ValueError, "The test failed." if __name__ == '__main__': print "Running test..." test() print "Test passed."