Please refer to wikipedia: http://en.wikipedia.org/wiki/Huffman_coding
Huffman coding is an entropy encoding algorithm used for lossless data compression. The term refers to the use of a variable-length code table for encoding a source symbol (such as a character in a file) where the variable-length code table has been derived in a particular way based on the estimated probability of occurrence for each possible value of the source symbol. It was developed by David A. Huffman while he was a Ph.D. student at MIT, and published in the 1952 paper "A Method for the Construction of Minimum-Redundancy Codes".
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | import os
import marshal
import cPickle
import array
class HuffmanNode(object):
recurPrint = False
def __init__(self, ch=None, fq=None, lnode=None, rnode=None, parent=None):
self.L = lnode
self.R = rnode
self.p = parent
self.c = ch
self.fq = fq
def __repr__(self):
if HuffmanNode.recurPrint:
lnode = self.L if self.L else '#'
rnode = self.R if self.R else '#'
return ''.join( ('(%s:%d)'%(self.c, self.fq), str(lnode), str(rnode) ) )
else:
return '(%s:%d)'%(self.c, self.fq)
def __cmp__(self, other):
if not isinstance(other, HuffmanNode):
return super(HuffmanNode, self).__cmp__(other)
return cmp(self.fq, other.fq)
def _pop_first_two_nodes(nodes):
if len(nodes)>1:
first=nodes.pop(0)
second=nodes.pop(0)
return first, second
else:
#print "[popFirstTwoNodes] nodes's length <= 1"
return nodes[0], None
def _build_tree(nodes):
nodes.sort()
while(True):
first, second = _pop_first_two_nodes(nodes)
if not second:
return first
parent = HuffmanNode(lnode=first, rnode=second, fq=first.fq+second.fq)
first.p = parent
second.p = parent
nodes.insert(0, parent)
nodes.sort()
def _gen_huffman_code(node, dict_codes, buffer_stack=[]):
if not node.L and not node.R:
dict_codes[node.c] = ''.join(buffer_stack)
return
buffer_stack.append('0')
_gen_huffman_code(node.L, dict_codes, buffer_stack)
buffer_stack.pop()
buffer_stack.append('1')
_gen_huffman_code(node.R, dict_codes, buffer_stack)
buffer_stack.pop()
def _cal_freq(long_str):
from collections import defaultdict
d = defaultdict(int)
for c in long_str:
d[c] += 1
return d
MAX_BITS = 8
class Encoder(object):
def __init__(self, filename_or_long_str=None):
if filename_or_long_str:
if os.path.exists(filename_or_long_str):
self.encode(filename_or_long_str)
else:
print '[Encoder] take \'%s\' as a string to be encoded.'\
% filename_or_long_str
self.long_str = filename_or_long_str
def __get_long_str(self):
return self._long_str
def __set_long_str(self, s):
self._long_str = s
if s:
self.root = self._get_tree_root()
self.code_map = self._get_code_map()
self.array_codes, self.code_length = self._encode()
long_str = property(__get_long_str, __set_long_str)
def _get_tree_root(self):
d = _cal_freq(self.long_str)
return _build_tree(
[HuffmanNode(ch=ch, fq=int(fq)) for ch, fq in d.iteritems()]
)
def _get_code_map(self):
a_dict={}
_gen_huffman_code(self.root, a_dict)
return a_dict
def _encode(self):
array_codes = array.array('B')
code_length = 0
buff, length = 0, 0
for ch in self.long_str:
code = self.code_map[ch]
for bit in list(code):
if bit=='1':
buff = (buff << 1) | 0x01
else: # bit == '0'
buff = (buff << 1)
length += 1
if length == MAX_BITS:
array_codes.extend([buff])
buff, length = 0, 0
code_length += len(code)
if length != 0:
array_codes.extend([buff << (MAX_BITS-length)])
return array_codes, code_length
def encode(self, filename):
fp = open(filename, 'rb')
self.long_str = fp.read()
fp.close()
def write(self, filename):
if self._long_str:
fcompressed = open(filename, 'wb')
marshal.dump(
(cPickle.dumps(self.root), self.code_length, self.array_codes),
fcompressed)
fcompressed.close()
else:
print "You haven't set 'long_str' attribute."
class Decoder(object):
def __init__(self, filename_or_raw_str=None):
if filename_or_raw_str:
if os.path.exists(filename_or_raw_str):
filename = filename_or_raw_str
self.read(filename)
else:
print '[Decoder] take \'%s\' as raw string' % filename_or_raw_str
raw_string = filename_or_raw_str
unpickled_root, length, array_codes = marshal.loads(raw_string)
self.root = cPickle.loads(unpickled_root)
self.code_length = length
self.array_codes = array.array('B', array_codes)
def _decode(self):
string_buf = []
total_length = 0
node = self.root
for code in self.array_codes:
buf_length = 0
while (buf_length < MAX_BITS and total_length != self.code_length):
buf_length += 1
total_length += 1
if code >> (MAX_BITS - buf_length) & 1:
node = node.R
if node.c:
string_buf.append(node.c)
node = self.root
else:
node = node.L
if node.c:
string_buf.append(node.c)
node = self.root
return ''.join(string_buf)
def read(self, filename):
fp = open(filename, 'rb')
unpickled_root, length, array_codes = marshal.load(fp)
self.root = cPickle.loads(unpickled_root)
self.code_length = length
self.array_codes = array.array('B', array_codes)
fp.close()
def decode_as(self, filename):
decoded = self._decode()
fout = open(filename, 'wb')
fout.write(decoded)
fout.close()
if __name__=='__main__':
original_file = 'filename.txt'
compressed_file = 'compressed.scw'
decompressed_file = 'filename2.txt'
# first way to use Encoder/Decoder
enc = Encoder(original_file)
enc.write(compressed_file)
dec = Decoder(compressed_file)
dec.decode_as(decompressed_file)
# second way
#enc = Encoder()
#enc.encode(original_file)
#enc.write(compressed_file)
#dec = Decoder()
#dec.read(compressed_file)
#dec.decode_as(decompressed_file)
|
The compressed file's size is about 400kb from an original text file with 600kb.
Hi what is scw file extension and how do I open it.
The scw file extension is the compressed file, by using this Huffman compressing algorithm. You simply use the Decoder in this code to decode the file. For example,
dec = Decoder() dec.read('xxxx.scw') dec.decode_as('decoded_filename')