This module provides classes that are useful for executing Markov encryption and decryption on data. ME was inspired by a combination of Markov chains with the puzzles of Sudoku. This implementation is a rewrite from the Python 3.x version and includes various changes and optimizations to work with Python 2.5 and related versions. All documentation has been left in recipe 578075 and should be referred to there.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | import random
import sys
import collections
################################################################################
_CHAOS = random.SystemRandom()
def slots(names=''):
sys._getframe(1).f_locals['__slots__'] = \
tuple('__' + name for name in names.replace(',', ' ').split())
################################################################################
class Key(object):
slots('data, prefix_len, base, size, encoder, axes, order, decoder')
@classmethod
def new(cls, chars_used, chain_size):
selection, blocks = list(set(chars_used)), []
for _ in range(chain_size):
_CHAOS.shuffle(selection)
blocks.append(''.join(selection))
return cls(tuple(blocks))
def __init__(self, data):
self.__test_data(data)
self.__make_vars(data)
@staticmethod
def __test_data(data):
if not isinstance(data, tuple):
raise TypeError('Data must be a tuple object!')
if len(data) < 2:
raise ValueError('Data must contain at least two items!')
item = data[0]
if not isinstance(item, str):
raise TypeError('Data items must be str objects!')
length = len(item)
if length < 2:
raise ValueError('Data items must contain at least two chars!')
unique = set(item)
if len(unique) != length:
raise ValueError('Data items must contain unique char sets!')
for item in data[1:]:
if not isinstance(item, str):
raise TypeError('Data items must be str objects!')
next_length = len(item)
if next_length != length:
raise ValueError('All data items must have the same size!')
next_unique = set(item)
if len(next_unique) != next_length:
raise ValueError('Data items must contain unique char sets!')
if next_unique ^ unique:
raise ValueError('All data items must use the same char set!')
def __make_vars(self, data):
self.__data = data
self.__prefix_len = len(data) - 1
self.__base = base = data[0]
self.__size = size = len(base)
offset = -sum(base.index(block[0]) for block in data[1:-1]) % size
self.__encoder = base[offset:] + base[:offset]
self.__axes = tuple(reversed([tuple(base.index(char) for char in block)
for block in data[1:]]))
self.__order = key = ''.join(sorted(base))
grid = []
for rotation in range(size):
block, row = base[rotation:] + base[:rotation], [None] * size
for char, value in zip(block, key):
row[key.index(char)] = value
grid.append(''.join(row))
self.__decoder = tuple(grid[offset:] + grid[:offset])
def test_primer(self, primer):
primer.test_key(self)
def encode(self, prefix, current):
assert len(prefix) == self.__prefix_len, \
'Prefix size is not compatible with key dimensions!'
return self.__encoder[(sum(table[probe] for table, probe in
zip(self.__axes, prefix)) + current) % self.__size]
def decode(self, prefix, current):
assert len(prefix) == self.__prefix_len, \
'Prefix size is not compatible with key dimensions!'
return self.__decoder[sum(table[probe] for table, probe in
zip(self.__axes, prefix)) % self.__size][current]
@property
def data(self):
return self.__data
@property
def prefix_len(self):
return self.__prefix_len
@property
def base(self):
return self.__base
@property
def order(self):
return self.__order
################################################################################
class Primer(object):
slots('data')
@classmethod
def new(cls, key):
base = key.base
return cls(''.join(_CHAOS.choice(base) for _ in range(key.prefix_len)))
def __init__(self, data):
self.__test_data(data)
self.__data = data
@staticmethod
def __test_data(data):
if not isinstance(data, str):
raise TypeError('Data must be a str object!')
if not data:
raise ValueError('Data must contain at least one char!')
def test_key(self, key):
if len(self.__data) != key.prefix_len:
raise ValueError('Key size must be one more than the primer size!')
if not set(self.__data).issubset(key.base):
raise ValueError('Key data must be a superset of primer data!')
@property
def data(self):
return self.__data
################################################################################
class _Processor(object):
slots('key, into, index, from')
def __init__(self, key, primer):
if self.__class__ is _Processor:
raise NotImplementedError('This is an abstract class!')
key.test_primer(primer)
self.__key = key
self.__into = table = dict(map(reversed, enumerate(key.order)))
self.__index = collections.deque(map(table.__getitem__, primer.data))
self.__index.appendleft(None)
self.__from = dict(map(reversed, table.items()))
def process(self, data):
cache = []
self._run(data, cache.append, self.__key, self.__into, self.__index)
return ''.join(cache)
@staticmethod
def _run(data, cache_push, key, table, index):
raise NotImplementedError('This is an abstract method!')
@property
def primer(self):
self.__index.popleft()
value = Primer(''.join(map(self.__from.__getitem__, self.__index)))
self.__index.appendleft(None)
return value
################################################################################
class Encrypter(_Processor):
slots()
@staticmethod
def _run(data, cache_push, key, table, index):
index_pop, encode, index_push = index.popleft, key.encode, index.append
for char in data:
if char in table:
index_pop()
code = table[char]
cache_push(encode(index, code))
index_push(code)
else:
cache_push(char)
################################################################################
class Decrypter(_Processor):
slots()
@staticmethod
def _run(data, cache_push, key, table, index):
index_pop, decode, index_push = index.popleft, key.decode, index.append
for char in data:
if char in table:
index_pop()
value = decode(index, table[char])
cache_push(value)
index_push(table[value])
else:
cache_push(char)
################################################################################
def encrypt(data, key, primer):
engine = Encrypter(key, primer)
return engine.process(data), engine.primer
def decrypt(data, key, primer):
engine = Decrypter(key, primer)
return engine.process(data), engine.primer
def auto_encrypt(data, chain_size, plain_text=''):
key = Key.new(set(data) - set(plain_text), chain_size)
primer = Primer.new(key)
return Encrypter(key, primer).process(data), key, primer
|
The code above was ported from the original module so that it could be used in the Wabol Talk program.