import os import marshal import cPickle import array class HuffmanNode(object): recurPrint = False def __init__(self, ch=None, fq=None, lnode=None, rnode=None, parent=None): self.L = lnode self.R = rnode self.p = parent self.c = ch self.fq = fq def __repr__(self): if HuffmanNode.recurPrint: lnode = self.L if self.L else '#' rnode = self.R if self.R else '#' return ''.join( ('(%s:%d)'%(self.c, self.fq), str(lnode), str(rnode) ) ) else: return '(%s:%d)'%(self.c, self.fq) def __cmp__(self, other): if not isinstance(other, HuffmanNode): return super(HuffmanNode, self).__cmp__(other) return cmp(self.fq, other.fq) def _pop_first_two_nodes(nodes): if len(nodes)>1: first=nodes.pop(0) second=nodes.pop(0) return first, second else: #print "[popFirstTwoNodes] nodes's length <= 1" return nodes[0], None def _build_tree(nodes): nodes.sort() while(True): first, second = _pop_first_two_nodes(nodes) if not second: return first parent = HuffmanNode(lnode=first, rnode=second, fq=first.fq+second.fq) first.p = parent second.p = parent nodes.insert(0, parent) nodes.sort() def _gen_huffman_code(node, dict_codes, buffer_stack=[]): if not node.L and not node.R: dict_codes[node.c] = ''.join(buffer_stack) return buffer_stack.append('0') _gen_huffman_code(node.L, dict_codes, buffer_stack) buffer_stack.pop() buffer_stack.append('1') _gen_huffman_code(node.R, dict_codes, buffer_stack) buffer_stack.pop() def _cal_freq(long_str): from collections import defaultdict d = defaultdict(int) for c in long_str: d[c] += 1 return d MAX_BITS = 8 class Encoder(object): def __init__(self, filename_or_long_str=None): if filename_or_long_str: if os.path.exists(filename_or_long_str): self.encode(filename_or_long_str) else: print '[Encoder] take \'%s\' as a string to be encoded.'\ % filename_or_long_str self.long_str = filename_or_long_str def __get_long_str(self): return self._long_str def __set_long_str(self, s): self._long_str = s if s: self.root = self._get_tree_root() self.code_map = self._get_code_map() self.array_codes, self.code_length = self._encode() long_str = property(__get_long_str, __set_long_str) def _get_tree_root(self): d = _cal_freq(self.long_str) return _build_tree( [HuffmanNode(ch=ch, fq=int(fq)) for ch, fq in d.iteritems()] ) def _get_code_map(self): a_dict={} _gen_huffman_code(self.root, a_dict) return a_dict def _encode(self): array_codes = array.array('B') code_length = 0 buff, length = 0, 0 for ch in self.long_str: code = self.code_map[ch] for bit in list(code): if bit=='1': buff = (buff << 1) | 0x01 else: # bit == '0' buff = (buff << 1) length += 1 if length == MAX_BITS: array_codes.extend([buff]) buff, length = 0, 0 code_length += len(code) if length != 0: array_codes.extend([buff << (MAX_BITS-length)]) return array_codes, code_length def encode(self, filename): fp = open(filename, 'rb') self.long_str = fp.read() fp.close() def write(self, filename): if self._long_str: fcompressed = open(filename, 'wb') marshal.dump( (cPickle.dumps(self.root), self.code_length, self.array_codes), fcompressed) fcompressed.close() else: print "You haven't set 'long_str' attribute." class Decoder(object): def __init__(self, filename_or_raw_str=None): if filename_or_raw_str: if os.path.exists(filename_or_raw_str): filename = filename_or_raw_str self.read(filename) else: print '[Decoder] take \'%s\' as raw string' % filename_or_raw_str raw_string = filename_or_raw_str unpickled_root, length, array_codes = marshal.loads(raw_string) self.root = cPickle.loads(unpickled_root) self.code_length = length self.array_codes = array.array('B', array_codes) def _decode(self): string_buf = [] total_length = 0 node = self.root for code in self.array_codes: buf_length = 0 while (buf_length < MAX_BITS and total_length != self.code_length): buf_length += 1 total_length += 1 if code >> (MAX_BITS - buf_length) & 1: node = node.R if node.c: string_buf.append(node.c) node = self.root else: node = node.L if node.c: string_buf.append(node.c) node = self.root return ''.join(string_buf) def read(self, filename): fp = open(filename, 'rb') unpickled_root, length, array_codes = marshal.load(fp) self.root = cPickle.loads(unpickled_root) self.code_length = length self.array_codes = array.array('B', array_codes) fp.close() def decode_as(self, filename): decoded = self._decode() fout = open(filename, 'wb') fout.write(decoded) fout.close() if __name__=='__main__': original_file = 'filename.txt' compressed_file = 'compressed.scw' decompressed_file = 'filename2.txt' # first way to use Encoder/Decoder enc = Encoder(original_file) enc.write(compressed_file) dec = Decoder(compressed_file) dec.decode_as(decompressed_file) # second way #enc = Encoder() #enc.encode(original_file) #enc.write(compressed_file) #dec = Decoder() #dec.read(compressed_file) #dec.decode_as(decompressed_file)