#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "James Eric Pruitt"
__all__ = [ "serialize", "deserialize" ]
__version__ = "2009.11.04"
import collections
itertable = {}
for t in [set, frozenset, list, tuple, dict]:
itertable[t] = t.__name__
itertable[t.__name__] = t
supporteddict = {
int: (repr, int, "int"),
long: (repr, long, "long"),
bool: (repr, lambda s: s == "True", "bool"),
complex: (repr, lambda s: complex(s[1:-1]), "complex"),
float: (repr, float, "float"),
str: (
lambda s: s.encode("string-escape"),
lambda s: s.decode("string-escape"), "str"),
unicode: (
lambda s: s.encode("unicode-escape"),
lambda s: s.decode("unicode-escape"), "unicode"),
type(None): (repr, lambda s: None, "None"), # None is a special case;
} # type(None) != None
# inverted dictionary
supporteddictinv = dict(
(name,func) for (_,func,name) in supporteddict.itervalues())
def serialize(root):
"""
Serializes some of the fundamental data types in Python.
Serialization function designed to not possess the same security flaws
as the cPickle and pickle modules. At present, the following data types
are supported:
set, frozenset, list, tuple, dict, int, long, bool, complex, float,
None, str, unicode
To convert the serialized object back into a Python object, pass the text
through the deserialize function.
>>> deserialize(serialize((1, 2, 3+4j, ['this', 'is', 'a', 'list'])))
(1, 2, (3+4j), ['this', 'is', 'a', 'list'])
"""
stack = collections.deque([ (0,(root,)) ])
lintree, eid = collections.deque(), 0
while stack:
uid, focus = stack.pop()
for element in focus:
eid += 1
if hasattr(focus, "keys"): # Support for dictionaries
lintree.appendleft((eid, uid, 'C', "tuple"))
stack.append((eid, (element, focus[element])))
elif hasattr(element, "__iter__"):
lintree.appendleft((eid, uid, 'C', itertable[type(element)]))
stack.append((eid, element))
else:
elementtype = type(element)
serializefunc, _, label = supporteddict[elementtype]
lintree.appendleft((eid, uid, label, serializefunc(element)))
return '\n'.join(str(element) for entry in lintree for element in entry)
def deserialize(text):
"""
Deserializes data generated by the serialize function.
>>> deserialize(serialize((1, 2, 3+4j, ['this', 'is', 'a', 'list'])))
(1, 2, (3+4j), ['this', 'is', 'a', 'list'])
"""
nodaldict = { 0: collections.deque() }
text = text.split('\n')
lastpid = int(text[1])
for quartet in xrange(0, len(text) - 1, 4):
eid, pid = int(text[quartet]), int(text[quartet+1])
moniker = text[quartet+2]
if moniker == 'C':
encapsulator = itertable[text[quartet+3]]
appendage = encapsulator(nodaldict.get(eid, collections.deque()))
else:
deserializer = supporteddictinv[moniker]
appendage = deserializer(text[quartet+3])
nodaldict.setdefault(pid, collections.deque()).appendleft(appendage)
return nodaldict[0].pop()
def test(supressoutput = False):
testvectors = [
list(((None, True, False), (1, 12341234123412341234123412341234L,0.5),
0.12341234123412341234,
u'This is\nan\tUnicode string\u0A0D\N{HANGUL SYLLABLE BYENH}',
set(('A','B','D')),
frozenset(tuple((1, '9', -0.12341234123412341234+1j, 'Y'))))),
tuple(),
list(),
set(),
frozenset(),
'Element that is not nested.',
{'a': (1, "Waka"), ('X', 'K'): u'CD', u'y':{ 1:'O', 2:('T','wo')}}]
# Recursion not yet properly supported.
#x = {'a': None, 'z': None}
#y = {'x': x}
#x['y'] = y
#testvectors.extend([x,y])
for root in testvectors:
serialized = serialize(root)
inverse = deserialize(serialized)
if not supressoutput:
print "Expected: ", repr(root)
print "Returned: ", repr(inverse)
if (inverse != root):
raise ValueError, "The test failed."
if __name__ == '__main__':
print "Running test..."
test()
print "Test passed."