This module was written for self-academic purposes in an attempt to understand databases better. Just as in college where students are required to reinvent "wheels" like linked lists, trees, binary searches, et cetera, developing this program helped me understand some of the things that goes on behind the scenes in a database while also helping to learn what the desired output of each operation should be. The code in this module provides support for running a simple database engine that runs completely in memory and allows usage of various concepts available in a structured query language to get and set data that may be saved to file.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 | """Module written to learn and understand more about databases.
The code in this module provides support for running a simple database engine
that runs completely in memory and allows usage of various concepts available
in a structured query language to get and set data that may be saved to file."""
################################################################################
import bz2
import copy
import datetime
import decimal
import pickle
import re
import sys
import types
import _thread
################################################################################
def _slots(names=''):
"Returns the private version of names for __slots__ on a class."
return tuple('__' + name for name in names.split())
################################################################################
class Database:
"Database() -> Database"
@classmethod
def load(cls, path):
"Loads database from path and tests identity."
with open(path, 'rb') as file:
obj = pickle.loads(bz2.decompress(file.read()))
assert isinstance(obj, cls), 'Could not load a database object!'
obj.__path = path
return obj
########################################################################
__slots__ = _slots('path data type view')
def __init__(self):
"Initializes database object void of tables or views."
self.__path = None
self.__setstate__(Table(('name', str),
('type', type),
('data', (Table, _View))))
def __repr__(self):
"Returns the representation of the database."
return repr(self.__view.value)
def __iter__(self):
"Iterates over the names of the tables and views in the database."
for row in rows(self.__data('name')):
yield self[row[0]]
def __getattr__(self, name):
"Allows getting table or view via attribute lookup or index notation."
t = tuple(self.__data.where(ROW.name == name)('data'))
assert len(t) < 3, 'Name is ambiguous!'
assert len(t) > 1, 'Object was not found!'
data = t[1][0]
if isinstance(data, _View):
return data.value
return data
__getitem__ = __getattr__
def __getstate__(self):
"Provides support for pickling and saving the database."
return self.__data
def __setstate__(self, state):
"Helps with unpickling and adding needed instance variables."
self.__data = state
self.__type = Table(('type', type), ('name', str))
self.__type.insert(Table, 'table')
self.__type.insert(_View, 'view')
self.__view = _View(None, lambda _: self.__data.left_join(self.__type, \
'Types', ROW.type == ROW.Types.type) \
.select('name', 'Types.name', (lambda obj: \
float(len(obj) if isinstance(obj, Table) else 'nan'), 'data')),
('Types.name', 'type'), ('<lambda>(data)', 'size'))
########################################################################
def save(self, path=None):
"Saves the database to path or most recently known path."
if path is None:
assert self.__path is not None, 'Path must be provided!'
path = self.__path
with open(path, 'wb') as file:
file.write(bz2.compress(pickle.dumps(self)))
self.__path = path
def create(self, name, schema_or_table_or_query, *name_changes):
"Creates either a table or view for use in the database."
assert not self.__data.where(ROW.name == name), \
'Name is already used and may not be overloaded!'
if isinstance(schema_or_table_or_query, (tuple, list)):
assert not name_changes, 'Name changes not allowed with schema!'
data = Table(*schema_or_table_or_query)
elif isinstance(schema_or_table_or_query, Table):
assert not name_changes, 'Name changes not allowed with table!'
data = schema_or_table_or_query
else:
data = _View(self, schema_or_table_or_query, *name_changes)
self.__data.insert(name=name, type=type(data), data=data)
return data
def drop(self, name):
"Deletes a table or view from the database."
self.__data.delete(ROW.name == name)
def print(self, end='\n\n', file=None):
"Provides a simple way of showing a representation of the database."
self.__view.value.print(end, file)
def create_or_replace(self, name, schema_or_table_or_query, *name_changes):
"Drops table or view before creating one with the same name."
self.drop(name)
self.create(name, schema_or_table_or_query, *name_changes)
def inner_join(self, table_a, table_b, test):
"Inner joins tables and views by name using test."
return inner_join(test, **{table_a: self[table_a],
table_b: self[table_b]})
def full_join(self, table_a, table_b, test):
"Full joins tables and views by name using test."
return full_join(test, **{table_a: self[table_a],
table_b: self[table_b]})
################################################################################
class Database2(Database):
"Database2() -> Database2"
@classmethod
def upgrade(cls, db_old):
"Upgrades the base version of a database into the child version."
assert isinstance(db_old, cls.__base__), \
'Can only upgrade Database objects!'
db_new = cls()
db_new.__setstate__(db_old.__getstate__())
db_new.save(db_old._Database__path)
db_old.__init__()
return db_new
########################################################################
__slots__ = _slots('lock locked view')
def __repr__(self):
"Returns an updated representation of the database."
return repr(self.__view.value)
def __setstate__(self, state):
"Sets up remaining attributes and prepares for transactions."
super().__setstate__(state)
self.__add_transaction_support()
def __getstate__(self):
"Reduces internal table to required columns and returns copy."
self.__del_transaction_support()
data = self.__data.copy()
self.__extend_data()
return data
def __getattr__(self, name):
"Allows contents to be accessed only if not in transaction."
table = self.__data.where(name=name)
assert len(table) < 2, 'Name is abmiguous!'
assert len(table) > 0, 'Object was not found!'
assert not table.first('lock').locked, 'A transaction is in place!'
if table.first('type') is _View:
return table.first('data').value
return table.first('data')
__getitem__ = __getattr__
########################################################################
def begin_transaction(self, table, wait=False):
"Locks and copies table while optionally waiting for unlock."
table = self.__data.where(name=table)
assert table.first('type') is not _View, 'Views are not supported!'
lock = table.first('lock')
if wait:
lock.acquire()
with self.__lock: # Protects Critical Section
data = table.first('data')
table.update(copy=copy.deepcopy(data))
else:
with self.__lock:
assert lock.acquire(False), 'Table is locked in a transaction!'
data = table.first('data')
table.update(copy=copy.deepcopy(data))
return data
def commit_transaction(self, table):
"Deletes reserve copy and unlocks the table."
self.__close_transaction(table, self.__commit)
def rollback_transaction(self, table):
"Restores table with copy, removes copy, and unlocks the table."
self.__close_transaction(table, self.__rollback)
########################################################################
def __add_transaction_support(self):
"Add attributes so database can support transactions."
self.__lock = _thread.allocate_lock()
self.__extend_data()
self.__locked = _View(None, lambda _: self.__data \
.select('name', (lambda lock: lock.locked, 'lock')) \
.as_(('<lambda>(lock)', 'locked')))
self.__view = _View(None, lambda _: self._Database__view.value \
.left_join(self.__locked.value, 'Lock', ROW.name == ROW.Lock.name) \
.select('name', 'type', 'size', 'Lock.locked'), \
('Lock.locked', 'locked'))
def __extend_data(self):
"Adds columns to internal table as necessary."
if ('type', type) not in self.__data.schema:
self.__data.alter_add('type', type)
for name, data in rows(self.__data('name', 'data')):
self.__data.where(name=name).update(type=type(data))
self.__data.alter_add('lock', _Lock)
self.__data.alter_add('copy', object)
def __del_transaction_support(self):
"Ensures no pending transactions and removes unsaved columns."
assert not self.__locked.value.where(locked=True), \
'You must commit all transactions before pickling!'
self.__data.alter_drop('type')
self.__data.alter_drop('lock')
self.__data.alter_drop('copy')
def __close_transaction(self, table, action):
"Finishes taking care of a transaction's end."
table = self.__data.where(name=table)
assert table.first('type') is not _View, 'Views are not supported!'
lock = table.first('lock')
# Begin Critical Section
with self.__lock:
try:
lock.release()
except _thread.error:
raise ValueError('Table was not in a transaction!')
action(table)
# End Critical Section
########################################################################
@staticmethod
def __commit(table):
"Deletes the reserve copy of a table."
table.update(copy=object())
@staticmethod
def __rollback(table):
"Restores table from copy and deletes the copy."
table.update(data=table.first('copy'), copy=object())
########################################################################
@property
def __data(self):
"Aliases internal table from Database class."
return self._Database__data
################################################################################
class _Lock:
"_Lock(immediate=False, silent=False) -> _Lock"
__slots__ = _slots('lock verbose')
def __init__(self, immediate=False, silent=False):
"Initializes _Lock instance with internal mechanism."
self.__lock = _thread.allocate_lock()
self.__verbose = silent
if immediate:
self.acquire()
########################################################################
def acquire(self, wait=True):
"Acquires lock with an optional wait."
return self.__lock.acquire(wait)
def release(self, exc_type=None, exc_value=None, traceback=None):
"Release lock if locked or possibly throws error."
try:
self.__lock.release()
except _thread.error:
if self.__verbose:
raise
########################################################################
__enter__ = acquire
__exit__ = release
########################################################################
@property
def locked(self):
"Returns whether or not lock is currently locked."
return self.__lock.locked()
################################################################################
class Table:
"Table(*columns) -> Table"
@classmethod
def from_iter(cls, iterator):
"Generates a table from a column / rows iterator."
title, test_row, *rows = iterator
table = cls(*zip(title, map(type, test_row)))
table.insert(*test_row)
for row in rows:
table.insert(*row)
return table
########################################################################
__slots__ = _slots('columns data_area row_index')
def __init__(self, *columns):
"Initializes Table with columns and row storage area."
self.__columns = _Columns(columns)
self.__data_area = {}
self.__row_index = 1
def __len__(self):
"Returns the number of rows in the table."
return len(self.__data_area)
def __repr__(self):
"Creates a complete representation of the table."
buffer = [list(map(repr, ['ROW_ID'] + [name for index, name, data_type \
in self.__columns]))]
width = [0] * len(buffer[0])
for row in sorted(self.__data_area):
buffer.append(list(map(repr, [row] + [self.__data_area[row][index] \
for index, name, data_type \
in self.__columns])))
for row in buffer:
for index, string in enumerate(row):
width[index] = max(width[index], len(string))
string = ''
for index, value in enumerate(buffer[0]):
string += value.ljust(width[index]) + ' | '
string = string[:-3] + '\n'
for index in range(len(buffer[0])):
string += '-' * width[index] + '-+-'
string = string[:-3] + '\n'
for row in buffer[1:]:
for index, value in enumerate(row):
string += value.ljust(width[index]) + ' | '
string = string[:-3] + '\n'
return string[:-1]
def __str__(self):
names, *rows = self
columns = {name: [] for name in names}
for row in rows:
for key, value in zip(names, row):
columns[key].append(value)
lengths = tuple(max(len(str(value)) for value in columns[key] + [key])
for key in names)
template = ' '.join(map('{{:{}}}'.format, lengths))
lines = [template.format(*map(str.upper, names)),
' '.join(map('-'.__mul__, lengths))]
for row in zip(*map(columns.__getitem__, names)):
lines.append(template.format(*row))
return '\n'.join(lines)
def __iter__(self):
"Returns an iterator over the table's columns."
return self(*self.columns)
def __call__(self, *columns):
"Returns an iterator over the specified columns."
indexs = tuple(self.__columns[name][1] for name in columns)
yield columns
for row in sorted(self.__data_area):
yield tuple(self.__data_area[row][index] for index in indexs)
########################################################################
def first(self, column=None):
"Returns the first row or column of specified row."
return self.__get_location(min, column)
def last(self, column=None):
"Returns the last row or column of specified row."
return self.__get_location(max, column)
def print(self, end='\n\n', file=None):
"Provides a convenient way of printing representation of the table."
print(repr(self), end=end, file=sys.stdout if file is None else file)
def top(self, amount):
"Iterates over the top rows specified by amount."
if amount == -1:
amount = len(self.__data_area)
elif 0 <= amount < 1:
amount = round(amount * len(self.__data_area))
assert isinstance(amount, int), 'Amount was not understood!'
for row, count in zip(self, range(amount + 1)):
yield row
def insert(self, *values, **columns):
"Inserts provided data into a new row of the database."
if values:
assert len(values) == len(self.__columns), 'Bad number of columns!'
assert not columns, 'Mixed syntax is not accepted!'
row = self.__insert_across(values)
else:
assert columns, 'There is nothing to insert!'
row = self.__insert_select(columns)
self.__data_area[self.__row_index] = row
self.__row_index += 1
def alter_add(self, name, data_type):
"Adds a column to the table and populates it."
index = self.__columns.add(name, data_type)
started = False
try:
for row in self.__data_area.values():
row[index] = data_type()
started = True
except TypeError:
if started:
raise
for row in self.__data_area.values():
row[index] = data_type
def alter_drop(self, name):
"Removes a column from the table and frees memory."
index = self.__columns.drop(name)
for row in self.__data_area.values():
del row[index]
def alter_column(self, name, data_type):
"Changes the data-type of a column and refreshes it."
index = self.__columns.alter(name, data_type)
for row in self.__data_area.values():
row[index] = data_type()
def alter_name(self, old, new):
"Renames a column without altering the rows."
self.__columns.rename(old, new)
def as_(self, *pairs):
"Changes the name of multiple columns at a time."
for old, new in pairs:
self.alter_name(old, new)
return self
def copy(self):
"Copies a table while sharing cell instances."
copy = type(self)()
copy.__columns = self.__columns.copy()
copy.__data_area = {}
for key, value in self.__data_area.items():
copy.__data_area[key] = value.copy()
copy.__row_index = self.__row_index
return copy
def select(self, *column_names):
"Select columns and process them with any given functions."
if not column_names:
return self
columns, functions = [], []
for item in column_names:
if isinstance(item, str):
columns.append(item)
elif isinstance(item, tuple):
functions.append(item)
else:
raise TypeError(type(item))
original = {name for index, name, data_type in self.__columns}
excess = original - set(columns)
if functions:
return self.__select_with_function(excess, functions)
copy = type(self)()
copy.__columns = self.__columns.copy()
copy.__data_area = self.__data_area
copy.__row_index = self.__row_index
for column in excess:
copy.__columns.drop(column)
return copy
def distinct(self):
"Return copy of table having only distinct rows."
copy = type(self)()
copy.__columns = self.__columns
copy.__data_area = self.__data_area.copy()
copy.__row_index = self.__row_index
valid_indexs = set()
distinct_rows = set()
for row in copy.__data_area:
array = pickle.dumps(tuple(copy.__data_area[row][index] \
for index, name, data_type \
in self.__columns))
if array not in distinct_rows:
valid_indexs.add(row)
distinct_rows.add(array)
for row in tuple(copy.__data_area):
if row not in valid_indexs:
del copy.__data_area[row]
return copy
def update(self, **assignments):
"Changes all present rows with given assignments."
assign = []
for name, value in assignments.items():
data_type, index = self.__columns[name]
assert isinstance(value, data_type), \
'Wrong datatype: {} ({!r}, {!r})'.format(name, value, data_type)
assign.append((index, value))
for row in self.__data_area.values():
for index, value in assign:
row[index] = value
def where(self, test='and', **kw):
"Select rows that fit criteria given by the test."
test = self.__process_test(test, kw)
copy = type(self)()
copy.__columns = self.__columns
copy.__data_area = self.__data_area.copy()
copy.__row_index = self.__row_index
self.__remove(copy.__data_area, False, test)
return copy
def delete(self, test='and', **kw):
"Delete rows that fit criteria given by the test."
test = self.__process_test(test, kw)
self.__remove(self.__data_area, True, test)
return self
def truncate(self):
"Deletes all of the rows in the table."
self.__data_area.clear()
return self
def order_by(self, column, desc=False):
"Returns a sorted result of the table."
return _SortedResults(self, column, desc)
def into(self, table):
"Inserts external table into this table by column name."
self_iter = iter(self)
self_colu = next(self_iter)
for row in self_iter:
table.insert(**{name: data for name, data in zip(self_colu, row)})
def left_join(self, table, name, test):
"Returns result of a left join on the given table using test."
return left_join(self, (table, name), test)
def sum_(self, column):
"Adds up all of the cells in a particular column of the table."
data_type, index = self.__columns[column]
total = data_type()
for row in self.__data_area:
total += self.__data_area[row][index]
return total
def avg(self, column):
"Averages the cells in the given column of the table."
size = len(self.__data_area)
return self.sum_(column) / size if size else size
def max_(self, column):
"Finds the largest cell value from the column in the table."
index = self.__columns[column][1]
return max(map(ROW[index], self.__data_area.values()))
def min_(self, column):
"Finds the smallest cell value from the column in the table."
index = self.__columns[column][1]
return min(map(ROW[index], self.__data_area.values()))
def count(self, column=None):
"Counts the total number of 'non-null' cells in the given column."
if column is None:
return len(self.__data_area)
data_type, index = self.__columns[column]
null, total = data_type(), 0
for row in self.__data_area.values():
if row[index] != null:
total += 1
return total
def group_by(self, *columns):
"Creates new tables from this table on matching columns."
column_map = {name: index for index, name, data_type in self.__columns}
index_list = tuple(sorted(column_map.values()))
schema = list(self.schema)
tables = {}
first = True
for row_dict in self.__data_area.values():
interest = []
row = list(row_dict[index] for index in index_list)
for name in columns:
if isinstance(name, str):
interest.append(row_dict[column_map[name]])
else:
interest.append(name(_RowAdapter(row_dict, column_map)))
name = name.name
if name is not None:
data = interest[-1]
row.append(data)
if first:
signature = name, type(data)
if signature not in schema:
schema.append(signature)
first = False
key = tuple(interest)
if key not in tables:
tables[key] = type(self)(*schema)
tables[key].insert(*row)
return tables.values()
########################################################################
def __get_location(self, function, column):
"Returns a row or cell based on function and column."
row = self.__data_area[function(self.__data_area)]
if column is None:
return tuple(row[index] for index in sorted(row))
return row[self.__columns[column][1]]
def __insert_across(self, values):
"Inserts values into new row while checking data types."
row = {}
for value, (index, name, data_type) in zip(values, self.__columns):
assert isinstance(value, data_type), \
'Wrong datatype: {} ({!r}, {!r})'.format(name, value, data_type)
row[index] = value
return row
def __insert_select(self, values):
"Inserts values into new row and fills in blank cells."
row = {}
for name, value in values.items():
data_type, index = self.__columns[name]
assert isinstance(value, data_type), \
'Wrong datatype: {} ({!r}, {!r})'.format(name, value, data_type)
row[index] = value
for index, name, data_type in self.__columns:
if index not in row:
row[index] = data_type()
return row
def __remove(self, data_area, delete, test):
"Removes rows from data area according to criteria."
column_map = {name: index for index, name, data_type in self.__columns}
for row in tuple(data_area):
value = test(_RowAdapter(data_area[row], column_map))
assert not isinstance(value, _RowAdapter), 'Test improperly formed!'
if bool(value) == delete:
del data_area[row]
def __select_with_function(self, excess, functions):
"Creates virtual rows formed by calling functions on columns."
table = self.copy()
for code, data in functions:
if data in table.__columns:
data_name = '{}({})'.format(code.__name__, data)
data_type = type(code(next(rows(table(data)))[0]))
table.alter_add(data_name, data_type)
dest = table.__columns[data_name][1]
sour = table.__columns[data][1]
for row in table.__data_area.values():
row[dest] = code(row[sour])
else:
sour = code()
table.alter_add(data, type(sour))
dest = table.__columns[data][1]
for row in table.__data_area.values():
row[dest] = copy.deepcopy(sour)
for column in excess:
table.alter_drop(column)
return table
########################################################################
@staticmethod
def __process_test(test, kw):
"Ensures that test has been properly formed as necessary."
if kw:
test = _Where(test, kw)
else:
assert callable(test), 'Test must be callable!'
return test
########################################################################
@property
def columns(self):
"Returns a list of column names from the table."
columns = sorted(self.__columns, key=lambda info: info[0])
return tuple(map(lambda info: info[1], columns))
@property
def schema(self):
"Returns table's schema that can be used to create another table."
return tuple((name, self.__columns[name][0]) for name in self.columns)
################################################################################
class _Columns:
"_Columns(columns) -> _Columns"
__slots__ = _slots('column_index column_names')
def __init__(self, columns):
"Initializes Columns instance with names and data types."
self.__column_index = 1
self.__column_names = UniqueDict()
for name, data_type in columns:
self.add(name, data_type)
def __contains__(self, name):
"Checks if the named column already exists."
return name in self.__column_names
def __len__(self):
"Returns the number of columns recognizes."
return len(self.__column_names)
def __iter__(self):
"Iterates over columns in sorted order."
cache = []
for name, (data_type, index) in self.__column_names.items():
cache.append((index, name, data_type))
for item in sorted(cache):
yield item
def __getitem__(self, name):
"Returns requested information on the given column name."
return self.__column_names[name]
def __getstate__(self):
"Provides support for class instances to be pickled."
return self.__column_index, self.__column_names
def __setstate__(self, state):
"Sets the state while object in being unpickled."
self.__column_index, self.__column_names = state
########################################################################
def copy(self):
"Creates a copy of the known columns."
copy = type(self)([])
copy.__column_index = self.__column_index
copy.__column_names = self.__column_names.copy()
return copy
def add(self, name, data_type):
"Adds a column name with data type and assigns an index."
index = self.__column_index
self.__column_names[name] = data_type, index
self.__column_index += 1
return index
def drop(self, name):
"Removes all information regarding the named column."
index = self.__column_names[name][1]
del self.__column_names[name]
return index
def alter(self, name, data_type):
"Changes the data type of the named column."
index = self.__column_names[name][1]
self.__column_names.replace(name, (data_type, index))
return index
def rename(self, old, new):
"Renames a column from old name to new name."
self.__column_names[new] = self.__column_names[old]
del self.__column_names[old]
################################################################################
class UniqueDict(dict):
"UniqueDict(iterable=None, **kwargs) -> UniqueDict"
__slots__ = ()
def __setitem__(self, key, value):
"Sets key with value if key does not exist."
assert key not in self, 'Key already exists!'
super().__setitem__(key, value)
def replace(self, key, value):
"Sets key with value if key already exists."
assert key in self, 'Key does not exist!'
super().__setitem__(key, value)
################################################################################
class _RowAdapter:
"_RowAdapter(row, column_map=None) -> _RowAdapter"
__slots__ = _slots('row map')
def __init__(self, row, column_map=None):
"Initializes _RowAdapter with data and mapping information."
self.__row = row
self.__map = column_map
def __getattr__(self, column):
"Returns a column from the row this instance in adapting."
if self.__map is None:
return self.__unmapped(column)
if column in self.__map:
return self.__row[self.__map[column]]
new_map = {}
column += '.'
for name in self.__map:
if name.startswith(column):
new_map[name[len(column):]] = self.__map[name]
assert new_map, 'Name did not match any known column: ' + repr(column)
return type(self)(self.__row, new_map)
__getitem__ = __getattr__
########################################################################
def __unmapped(self, column):
"Processes a row with column names already filled in."
if column in self.__row:
return self.__row[column]
row = {}
column += '.'
for name in self.__row:
if name.startswith(column):
row[name[len(column):]] = self.__row[name]
assert row, 'Name did not match any known column: ' + repr(column)
return type(self)(row)
################################################################################
class _SortedResults:
"_SortedResults(iterable column, desc) -> _SortedResults"
__slots__ = _slots('iter column direction')
def __init__(self, iterable, column, desc):
"Initializes sorting adapter with given data."
self.__iter = iterable
self.__column = column
self.__direction = desc
def __iter__(self):
"Iterates over internal data in the order requested."
title, *rows = tuple(self.__iter)
index = title.index(self.__column)
yield title
for row in sorted(rows, key=ROW[index], reverse=self.__direction):
yield row
########################################################################
def order_by(self, column, desc=False):
"Returns results that are sorted on an additional level."
return type(self)(self, column, desc)
def table(self):
"Converts the sorted results into a table object."
return Table.from_iter(self)
################################################################################
class _View:
"_View(database, query, *name_changes) -> _View"
__slots__ = _slots('database query name_changes')
def __init__(self, database, query, *name_changes):
"Initializes _View instance with details of saved query."
self.__database = database
self.__query = query
self.__name_changes = name_changes
def __getstate__(self):
"Returns everything needed to pickle _View instance."
return self.__database, self.__query.__code__, self.__name_changes
def __setstate__(self, state):
"Sets the state of the _View instance when unpickled."
database, query, name_changes = state
self.__database = database
self.__query = types.LambdaType(query, sys.modules, '', (), ())
self.__name_changes = name_changes
########################################################################
@property
def value(self):
"Caculates and returns the value of view's query."
data = self.__query(self.__database)
table = data if isinstance(data, Table) else Table.from_iter(data)
for old, new in self.__name_changes:
table.alter_name(old, new)
return table
################################################################################
class _Where:
"_Where(mode, condition) -> _Where"
__slots__ = _slots('call rows')
def __init__(self, mode, condition):
"Initializes _Where support object for simple selections."
self.__call = {'and': all, 'or': any}[mode]
self.__rows = condition
def __call__(self, row):
"Runs test on given row and validates against condition."
return self.__call(row[k] == v for k, v in self.__rows.items())
################################################################################
class NotLike:
"NotLike(column, pattern, flags=IGNORECASE, advanced=False) -> NotLike"
__slots__ = _slots('column method')
def __init__(self, column, pattern, flags=re.IGNORECASE, advanced=False):
"Initializes comparison object for specified column."
self.__column = column
if not advanced:
pattern = '^' + pattern + '$'
self.__method = re.compile(pattern, flags).search
def __call__(self, row):
"Tests if column in row was like the given pattern."
return self.__method(row[self.__column]) is None
################################################################################
class Like(NotLike):
"Like(column, pattern, flags=IGNORECASE, advanced=False) -> Like"
__slots__ = _slots()
def __call__(self, row):
"Reverses the result from calling a NotLike instance."
return not super().__call__(row)
################################################################################
class date(datetime.date):
"date(year=None, month=None, day=None) -> date"
__slots__ = _slots()
def __new__(cls, year=None, month=None, day=None):
"Creates a customized date object that does not require arguments."
if year is None:
year, month, day = cls.max.year, cls.max.month, cls.max.day
elif isinstance(year, bytes):
year_high, year_low, month, day = year
year = (year_high << 8) + year_low
return super().__new__(cls, year, month, day)
def __str__(self):
return self.strftime('%d-%b-%Y').upper()
def __format__(self, length):
return str(self).ljust(int(length))
################################################################################
class datetime(datetime.datetime):
"""datetime(year=None, month=None, day=None, hour=0,
minute=0, second=0, microsecond=0, tzinfo=None) -> datetime"""
__slots__ = _slots()
def __new__(cls, year=None, month=None, day=None, hour=0,
minute=0, second=0, microsecond=0, tzinfo=None):
"Creates a customized datetime object that does not require arguments."
if year is None:
year, month, day = cls.max.year, cls.max.month, cls.max.day
elif isinstance(year, bytes):
year_high, year_low, _month, day, \
hour, minute, second, a, b, c = year
year = (year_high << 8) + year_low
microsecond = (((a << 8) | b) << 8) | c
if month is None or isinstance(month, datetime._tzinfo_class):
tzinfo = month
else:
raise TypeError('bad tzinfo state arg {!r}'.format(month))
month = _month
return super().__new__(cls, year, month, day, hour,
minute, second, microsecond, tzinfo)
def date(self):
d = super().date()
return date(d.year, d.month, d.day)
################################################################################
class _NamedInstance:
"_NamedInstance(*args, **kwargs) -> _NamedInstance"
__slots__ = _slots()
def __init__(self, *args, **kwargs):
"Raises an error since this is an abstract class."
raise NotImplementedError('This is an abstract class!')
@property
def __name__(self):
"Provides a way for callable instances to be identified."
return type(self).__name__
################################################################################
class DatePart(_NamedInstance):
"DatePart(part, column, name=None) -> DatePart"
__slots__ = _slots('part column name')
def __init__(self, part, column, name=None):
"Initializes DatePart instance usable with 'group_by' method."
self.__part = part.upper()
self.__column = column
self.__name = name
def __call__(self, row):
"Extract specified part of date from column in row."
date = row[self.__column]
if self.__part == 'Y':
return date.year
if self.__part == 'Q':
return (date.month - 1) // 3 + 1
if self.__part == 'M':
return date.month
if self.__part == 'D':
return date.month
raise ValueError('DatePart code cannot be processed!')
########################################################################
@property
def name(self):
"Provides a name for us in the 'group_by' method."
return self.__name
################################################################################
class MID(_NamedInstance):
"MID(start, length=None) -> MID"
__slots__ = _slots('start stop')
def __init__(self, start, length=None):
"Intializes MID instance with data to extract a sub-interval."
self.__start = start - 1
self.__stop = None if length is None else self.__start + length
def __call__(self, data):
"Returns sub-internal as specified upon instantiation."
if self.__stop is None:
return data[self.__start:]
return data[self.__start:self.__stop]
################################################################################
class FORMAT(_NamedInstance):
"FORMAT(spec) -> FORMAT"
__slots__ = _slots('spec')
def __init__(self, spec):
"Initializes instance with 'spec' for the format function."
self.__spec = spec
def __call__(self, data):
"Returns result from format function based on data and spec."
return format(data, self.__spec)
################################################################################
del _slots
types.StringType = str
next_ = next
NOW = datetime.now
################################################################################
def inner_join(test, *table_arg, **table_kwarg):
"Runs and returns result from inner joining two tables together."
pa, pb, ta, tb = _join_args(table_arg, table_kwarg)
table = _composite_table(pa, pb, ta, tb)
_join_loop(table, test, pa, pb, ta, tb, True, False)
return table
def full_join(test, *table_arg, **table_kwarg):
"Runs and returns result from full joining two tables together."
pa, pb, ta, tb = _join_args(table_arg, table_kwarg)
table = _composite_table(pa, pb, ta, tb)
_join_loop(table, test, pa, pb, ta, tb, False, True)
return table
def left_join(table_a, table_b, test):
"Runs and returns result from left joining two tables together."
assert sum(isinstance(table, tuple) for table in (table_a, table_b)) > 0, \
'At least one table must be given a name!'
ta, pa = table_a if isinstance(table_a, tuple) else (table_a, '_')
tb, pb = table_b if isinstance(table_b, tuple) else (table_b, '_')
table = _composite_table(pa, pb, ta, tb)
_join_loop(table, test, pa, pb, ta, tb, False, False)
return table
def right_join(table_a, table_b, test):
"Runs and returns result from right joining two tables together."
return left_join(table_b, table_a, test)
def union(table_a, table_b, all_=False):
"Creates a table from two tables that have been combined."
table = Table.from_iter(table_a)
for row in rows(table_b):
table.insert(*row)
if all_:
return table
return table.distinct()
def rows(iterable):
"Skips the first row (column names) from a table-style iterator."
iterator = iter(iterable)
next(iterator)
return iterator
################################################################################
def _join_args(table_arg, table_kwarg):
"Determines tables and prefixes from given arguments."
assert len(table_kwarg) > 0, 'At least one table name must be given!'
assert sum(map(len, (table_arg, table_kwarg))) == 2, \
'Two tables must be provided to join!'
if len(table_kwarg) == 2:
(pa, pb), (ta, tb) = zip(*table_kwarg.items())
else:
pa, ta = next(iter(table_kwarg.items()))
pb, tb = '_', table_arg[0]
return pa, pb, ta, tb
def _composite_table(pa, pb, ta, tb):
"Create a new table based on information from tables and prefixes."
columns = []
for table_name, table_obj in zip((pa, pb), (ta, tb)):
iterator = iter(table_obj)
names = next(iterator)
types = map(lambda item: item[1], table_obj.schema)
for column_name, column_type in zip(names, types):
if table_name != '_':
column_name = '{}.{}'.format(table_name, column_name)
columns.append((column_name, column_type))
return Table(*columns)
def _join_loop(table, test, pa, pb, ta, tb, inner, full):
"Joins two tables together into one table based on criteria."
first = True
second = dict()
table_a = tuple(_pre_process(ta, pa))
table_b = tuple(_pre_process(tb, pb))
for row_cache in table_a:
match = False
for add in table_b:
row = row_cache.copy()
row.update(add)
if test(_RowAdapter(row)):
table.insert(**row)
match = True
if not first:
second.pop(id(add), None)
elif first:
second[id(add)] = add
if not (inner or match):
table.insert(**row_cache)
first = False
if full:
for row in second.values():
table.insert(**row)
def _pre_process(table, prefix):
"Creates a table iterator that can cache results with optional prefix."
iterator = iter(table)
columns = next(iterator)
if prefix == '_':
for row in iterator:
yield dict(zip(columns, row))
else:
for row in iterator:
yield {'{}.{}'.format(prefix, column): \
value for column, value in zip(columns, row)}
################################################################################
# Unsupported Features:
# =====================
# Constraints:
# ------------
# NOT NULL [forced on all columns]
# UNIQUE
# PRIMARY KEY
# FOREIGN KEY
# CHECK
# DEFAULT [constructed from type]
# Indexes:
# --------
# CREATE
# DROP
# Increment:
# ----------
# AUTO INCREMENT
# Starting Value
# Increment by X
# ["ROW_ID" starts at and increments by 1 but is not accessible]
# Dates:
# ------
# NOW()
# CURDATE()
# CURTIME()
# EXTRACT()
# DATE_ADD()
# DATE_SUB()
# DATEDIFF()
# DATE_FORMAT()
# GETDATE()
# CONVERT()
# ["DatePart" and "date" are supported and may
# be supplemented with the "datetime" module]
# Nulls:
# ------
# ISNULL()
# NVL()
# IFNULL()
# COALESCE()
# [the NOT NULL constraint is forced on all columns]
# Data Types:
# -----------
# Data types that cannot be initialized with a
# parameterless call are not directly supported.
# Functions:
# ----------
# max() [use "table.max_(column)" instead]
# min() [use "table.min_(column)" instead]
# sum() [use "table.sum_(column)" instead]
# Having Statement
# ucase() or upper() [use "(str.upper, 'column')" instead]
# lcase() or lower() [use "(str.lower, 'column')" instead)
# Virtual Columns [Function Based]
# Materialized Views [Cached Functions]
# Transactions:
# -------------
# Table Level Transactions
# [database level transactions are supported;
# table locks are supported in the same way]
################################################################################
import itertools
import operator
class _Repr:
def __repr__(self):
return '{}({})'.format(
type(self).__name__,
', '.join(itertools.starmap('{!s}={!r}'.format,
sorted(vars(self).items()))))
class _Row(_Repr):
def __getattr__(self, name):
return _Column(name)
def __getitem__(self, key):
return lambda row: row[key]
class _Column(_Row):
def __init__(self, name):
self.__name = name
def __call__(self, row):
return row[self.__name]
def __getattr__(self, name):
if name == 'NOT':
return _Comparison(self, lambda a, b: (not a, b)[0], None)
return super().__getattr__(self.__name + '.' + name)
def __lt__(self, other):
return _Comparison(self, operator.lt, other)
def __le__(self, other):
return _Comparison(self, operator.le, other)
def __eq__(self, other):
return _Comparison(self, operator.eq, other)
def __ne__(self, other):
return _Comparison(self, operator.ne, other)
def __gt__(self, other):
return _Comparison(self, operator.gt, other)
def __ge__(self, other):
return _Comparison(self, operator.ge, other)
def in_(self, *items):
return _Comparison(self, lambda a, b: a in b, items)
class _Comparison(_Repr):
def __init__(self, column, op, other):
self.__column, self.__op, self.__other = column, op, other
def __call__(self, row):
if isinstance(self.__other, _Column):
return self.__op(self.__column(row), self.__other(row))
return self.__op(self.__column(row), self.__other)
def __lt__(self, other):
return self & (self.__column < other)
def __le__(self, other):
return self & (self.__column <= other)
def __eq__(self, other):
return self & (self.__column == other)
def __ne__(self, other):
return self & (self.__column != other)
def __gt__(self, other):
return self & (self.__column > other)
def __ge__(self, other):
return self & (self.__column >= other)
def __and__(self, other):
return _Comparison(self, lambda a, b: a and b, other)
def __or__(self, other):
return _Comparison(self, lambda a, b: a or b, other)
ROW = _Row()
################################################################################
def test():
"Runs several groups of tests of the database engine."
# Test simple statements in SQL.
persons = test_basic_sql()
# Test various ways to select rows.
test_row_selection(persons)
# Test the four different types of joins in SQL.
orders = test_all_joins(persons)
# Test unstructured ways of joining tables together.
test_table_addition(persons, orders)
# Test creation and manipulation of databases.
test_database_support()
# Load and run some test on the sample Northwind database.
northwind = test_northwind()
# Test different date operations that can be performed.
test_date_functionality()
# Test various functions that operate on specified column.
test_column_functions()
if northwind:
# Test ability to select columns with function processing.
test_generic_column_functions(persons, northwind)
# Test Database2 instances that support transactions.
nw2 = test_transactional_database()
# Allow for interaction at the end of the test.
globals().update(locals())
def test_basic_sql():
"Tests simple statements in SQL."
# Test create table statement.
persons = Table(('P_Id', int), ('LastName', str), ('FirstName', str),
('Address', str), ('City', str))
# Populate the table with rows.
persons.insert(1, 'Hansen', 'Ola', 'Timoteivn 10', 'Sandnes')
persons.insert(2, 'Svendson', 'Tove', 'Borgvn 23', 'Sandnes')
persons.insert(3, 'Pettersen', 'Kari', 'Storgt 20', 'Stavanger')
persons.print()
# Test the select statement.
persons.select('LastName', 'FirstName').print()
persons.select().print()
# Test the distinct statement.
persons.select('City').distinct().print()
# Test the where clause.
persons.where(ROW.City == 'Sandnes').print()
# Test the and operator.
persons.where((ROW.FirstName == 'Tove') &
(ROW.LastName == 'Svendson')).print()
# Test the or operator.
persons.where((ROW.FirstName == 'Tove') | (ROW.FirstName == 'Ola')).print()
# Test both and & or operators.
persons.where((ROW.LastName == 'Svendson') &
((ROW.FirstName == 'Tove') |
(ROW.FirstName == 'Ola'))).print()
# Test order by statement.
persons.insert(4, 'Nilsen', 'Tom', 'Vingvn 23', 'Stavanger')
persons.order_by('LastName').table().print()
persons.order_by('LastName', True).table().print()
# Test insert statement.
persons.insert(5, 'Nilsen', 'Johan', 'Bakken 2', 'Stavanger')
persons.print()
persons.insert(P_Id=6, LastName='Tjessem', FirstName='Jakob')
persons.print()
# Test update statement.
persons.where((ROW.LastName == 'Tjessem') &
(ROW.FirstName == 'Jakob')).update(Address='Nissestien 67',
City='Sandnes')
persons.print()
copy = persons.order_by('P_Id').table()
copy.update(Address='Nissestien 67', City='Sandnes')
copy.print()
# Test delete statement.
copy = persons.order_by('P_Id').table()
copy.delete((ROW.LastName == 'Tjessem') &
(ROW.FirstName == 'Jakob')).print()
copy.truncate().print()
return persons
def test_row_selection(persons):
"Tests various ways to select rows."
# Test top clause.
Table.from_iter(persons.top(2)).print()
Table.from_iter(persons.top(0.5)).print()
# Test like operator.
persons.where(Like('City', 's.*')).print()
persons.where(Like('City', '.*s')).print()
persons.where(Like('City', '.*tav.*')).print()
persons.where(NotLike('City', '.*tav.*')).print()
# Test wildcard patterns.
persons.where(Like('City', 'sa.*')).print()
persons.where(Like('City', '.*nes.*')).print()
persons.where(Like('FirstName', '.la')).print()
persons.where(Like('LastName', 'S.end.on')).print()
persons.where(Like('LastName', '[bsp].*')).print()
persons.where(Like('LastName', '[^bsp].*')).print()
# Test in operator.
persons.where(ROW.LastName.in_('Hansen', 'Pettersen')).print()
# Test manual between syntax.
persons.where(('Hansen' < ROW.LastName) < 'Pettersen').print()
persons.where(('Hansen' <= ROW.LastName) < 'Pettersen').print()
persons.where(('Hansen' <= ROW.LastName) <= 'Pettersen').print()
persons.where(('Hansen' < ROW.LastName) <= 'Pettersen').print()
def test_all_joins(persons):
"Tests the four different types of joins in SQL."
# Create and populate the Orders table.
orders = Table(('O_Id', int), ('OrderNo', int), ('P_Id', int))
orders.insert(1, 77895, 3)
orders.insert(2, 44678, 3)
orders.insert(3, 22456, 1)
orders.insert(4, 24562, 1)
orders.insert(5, 34764, 15)
# Test the inner join function.
inner_join(ROW.Persons.P_Id == ROW.Orders.P_Id,
Persons=persons, Orders=orders) \
.select('Persons.LastName',
'Persons.FirstName',
'Orders.OrderNo') \
.order_by('Persons.LastName').table().print()
# Test inner join with alias.
inner_join(ROW.p.P_Id == ROW.po.P_Id,
p=persons, po=orders) \
.select('po.OrderNo', 'p.LastName', 'p.FirstName') \
.where((ROW.p.LastName == 'Hansen') &
(ROW.p.FirstName == 'Ola')).print()
# Test left join with and without alias.
left_join((persons, 'Persons'), (orders, 'Orders'),
ROW.Persons.P_Id == ROW.Orders.P_Id) \
.select('Persons.LastName',
'Persons.FirstName',
'Orders.OrderNo') \
.order_by('Persons.LastName').table().print()
left_join((persons, 'p'), (orders, 'o'), ROW.p.P_Id == ROW.o.P_Id) \
.select('p.LastName',
'p.FirstName',
'o.OrderNo') \
.order_by('p.LastName').table().print()
# Test right join with and without alias.
right_join((persons, 'Persons'), (orders, 'Orders'),
ROW.Persons.P_Id == ROW.Orders.P_Id) \
.select('Persons.LastName',
'Persons.FirstName',
'Orders.OrderNo') \
.order_by('Persons.LastName').table().print()
right_join((persons, 'p'), (orders, 'o'), ROW.p.P_Id == ROW.o.P_Id) \
.select('p.LastName', 'p.FirstName', 'o.OrderNo') \
.order_by('p.LastName').table().print()
# Test full join with and without alias.
full_join(ROW.Persons.P_Id == ROW.Orders.P_Id,
Persons=persons, Orders=orders) \
.order_by('Persons.LastName').table().print()
full_join(ROW.p.P_Id == ROW.o.P_Id,
p=persons, o=orders) \
.select('p.LastName', 'p.FirstName', 'o.OrderNo') \
.order_by('p.LastName').table().print()
return orders
def test_table_addition(persons, orders):
"Tests unstructured ways of joining tables together."
# Create two tables to union together.
employees_norway = Table(('E_ID', str), ('E_Name', str))
employees_norway.insert('01', 'Hansen, Ola')
employees_norway.insert('02', 'Svendson, Tove')
employees_norway.insert('03', 'Svendson, Stephen')
employees_norway.insert('04', 'Pettersen, Kari')
employees_usa = Table(('E_ID', str), ('E_Name', str))
employees_usa.insert('01', 'Turner, Sally')
employees_usa.insert('02', 'Kent, Clark')
employees_usa.insert('03', 'Svendson, Stephen')
employees_usa.insert('04', 'Scott, Stephen')
# Test union function on tables.
union(employees_norway('E_Name'), employees_usa('E_Name')).print()
union(employees_norway('E_Name'), employees_usa('E_Name'), True).print()
# Test select into functionality.
backup = Table(*persons.schema)
persons.into(backup)
backup.print()
backup.truncate()
persons.select('LastName', 'FirstName').into(backup)
backup.print()
# Test select into with where and join clauses.
backup = Table(('LastName', str), ('FirstName', str))
persons.where(ROW.City == 'Sandnes') \
.select('LastName', 'FirstName').into(backup)
backup.print()
person_orders = Table(('Persons.LastName', str), ('Orders.OrderNo', int))
inner_join(ROW.Persons.P_Id == ROW.Orders.P_Id,
Persons=persons, Orders=orders) \
.select('Persons.LastName', 'Orders.OrderNo') \
.into(person_orders)
person_orders.print()
def test_database_support():
"Tests creation and manipulation of databases."
# Test ability to create database.
db = Database()
# Test creating and retrieving database tables.
db.create('persons', Table(('Name', str), ('Credit', int)))
db.create('mapdata', (('time', float), ('place', complex)))
db.print()
db.persons.insert('Marty', 7 ** 4)
db.persons.insert(Name='Haddock')
db.persons.print()
def test_northwind():
"Loads and runs some test on the sample Northwind database."
import os, imp
# Patch the module namespace to recognize this file.
name = os.path.splitext(os.path.basename(sys.argv[0]))[0]
module = imp.new_module(name)
vars(module).update(globals())
sys.modules[name] = module
# Load a Northwind database for various testing purposes.
try:
northwind = Database.load('northwind.db')
except IOError:
return
# Create and test a current product list view.
northwind.create('Current Product List', lambda db: db.Products.where(
ROW.Discontinued.NOT).select('ProductID', 'ProductName'))
northwind['Current Product List'].print()
# Find all products having an above-average price.
def above_average_price(db):
return db.Products.where(ROW.UnitPrice > db.Products.avg('UnitPrice')) \
.select('ProductName', 'UnitPrice')
northwind.create('Products Above Average Price', above_average_price)
northwind['Products Above Average Price'].print()
# Calculate total sale per category in 1997.
def category_sales_for_1997(db):
result = Table(('CategoryName', str),
('CategorySales', decimal.Decimal))
for table in db['Product Sales For 1997'] \
.group_by('Categories.CategoryName'):
name = next(rows(table.select('Categories.CategoryName')))[0]
total = table.sum_('ProductSales')
result.insert(name, total)
return result
northwind.create('Category Sales For 1997', category_sales_for_1997)
northwind['Category Sales For 1997'].print()
# Show just the Beverages Category from the previous view.
northwind['Category Sales For 1997'].where(
ROW.CategoryName == 'Beverages').print()
# Add the Category column to the Current Product List view.
northwind.create_or_replace('Current Product List', lambda db: \
db['Products View'].where(ROW.Discontinued.NOT) \
.select('ProductID', 'ProductName', 'Category'))
northwind['Current Product List'].print()
# Drop the Category Sales For 1997 view.
northwind.drop('Category Sales For 1997')
return northwind
def test_date_functionality():
"Tests different date operations that can be performed."
# Create an orderz table to test the date type.
orderz = Table(('OrderId', int), ('ProductName', str), ('OrderDate', date))
orderz.insert(1, 'Geitost', date(2008, 11, 11))
orderz.insert(2, 'Camembert Pierrot', date(2008, 11, 9))
orderz.insert(3, 'Mozzarella di Giovanni', date(2008, 11, 11))
orderz.insert(4, 'Mascarpone Fabioloi', date(2008, 10, 29))
# Query the table for a specific date.
orderz.where(ROW.OrderDate == date(2008, 11, 11)).print()
# Update the orderz table so that times are present with the dates.
orderz.alter_column('OrderDate', datetime)
orderz.where(ROW.OrderId == 1) \
.update(OrderDate=datetime(2008, 11, 11, 13, 23, 44))
orderz.where(ROW.OrderId == 2) \
.update(OrderDate=datetime(2008, 11, 9, 15, 45, 21))
orderz.where(ROW.OrderId == 3) \
.update(OrderDate=datetime(2008, 11, 11, 11, 12, 1))
orderz.where(ROW.OrderId == 4) \
.update(OrderDate=datetime(2008, 10, 29, 14, 56, 59))
# Query the table with a datetime object this time.
orderz.where(ROW.OrderDate == datetime(2008, 11, 11)).print()
def test_column_functions():
"Tests various functions that operate on specified column."
# Create an order table to test various functions on.
order = Table(('O_Id', int), ('OrderDate', date),
('OrderPrice', int), ('Customer', str))
order.insert(1, date(2008, 11, 12), 1000, 'Hansen')
order.insert(2, date(2008, 10, 23), 1600, 'Nilsen')
order.insert(3, date(2008, 9, 2), 700, 'Hansen')
order.insert(4, date(2008, 9, 3), 300, 'Hansen')
order.insert(5, date(2008, 9, 30), 2000, 'Jensen')
order.insert(6, date(2008, 10, 4), 100, 'Nilsen')
# Test the "avg" function.
order_average = order.avg('OrderPrice')
print('OrderAverage =', order_average, '\n')
order.where(ROW.OrderPrice > order_average).select('Customer').print()
# Test the "count" function.
print('CustomerNilsen =', order.where(
ROW.Customer == 'Nilsen').count('Customer'))
print('NumberOfOrders =', order.count())
print('NumberOfCustomers =', order.select('Customer') \
.distinct().count('Customer'))
# Test the "first" function.
print('FirstOrderPrice =', order.first('OrderPrice'))
# Test the "last" function.
print('LastOrderPrice =', order.last('OrderPrice'))
# Test the "max_" function.
print('LargestOrderPrice =', order.max_('OrderPrice'))
# Test the "min_" function.
print('SmallestOrderPrice =', order.min_('OrderPrice'))
# Test the "sum_" function.
print('OrderTotal =', order.sum_('OrderPrice'), '\n')
# Test the "group_by" statement.
result = Table(('Customer', str), ('OrderPrice', int))
for table in order.group_by('Customer'):
result.insert(table.first('Customer'), table.sum_('OrderPrice'))
result.print()
# Add some more orders to the table.
order.insert(7, date(2008, 11, 12), 950, 'Hansen')
order.insert(8, date(2008, 10, 23), 1900, 'Nilsen')
order.insert(9, date(2008, 9, 2), 2850, 'Hansen')
order.insert(10, date(2008, 9, 3), 3800, 'Hansen')
order.insert(11, date(2008, 9, 30), 4750, 'Jensen')
order.insert(12, date(2008, 10, 4), 5700, 'Nilsen')
# Test ability to group by several columns.
result.truncate().alter_add('OrderDate', date)
for table in order.group_by('Customer', 'OrderDate'):
result.insert(table.first('Customer'),
table.sum_('OrderPrice'),
table.first('OrderDate'))
result.print()
def test_generic_column_functions(persons, northwind):
"Tests ability to select columns with function processing."
# Test as_ and select with functions run on columns.
persons.select((str.upper, 'LastName'), 'FirstName') \
.as_(('upper(LastName)', 'LastName')).print()
persons.select((str.lower, 'LastName'), 'FirstName') \
.as_(('lower(LastName)', 'LastName')).print()
persons.select((MID(1, 4), 'City')) \
.as_(('MID(City)', 'SmallCity')).print()
persons.select((len, 'Address')) \
.as_(('len(Address)', 'LengthOfAddress')).print()
northwind['Products'].select('ProductName', (round, 'UnitPrice')) \
.as_(('round(UnitPrice)', 'UnitPrice')).print()
current_products = northwind['Products'].select('ProductName',
'UnitPrice',
(NOW, 'PerDate'))
current_products.print()
current_products.select('ProductName', 'UnitPrice', (FORMAT('%Y-%m-%d'),
'PerDate')).as_(('FORMAT(PerDate)', 'PerDate')).print()
def test_transactional_database():
"Tests Database2 instances that support transactions."
# Create a test database, tables, and dummy data.
db2 = Database2()
db2.create('test', Table(('id', int), ('name', str)))
db2.test.insert(100, 'Adam')
db2.test.print()
# Test the rollback transaction support added in Database2.
test = db2.begin_transaction('test')
test.insert(101, 'Eve')
test.print()
db2.rollback_transaction('test')
db2.test.print()
# Test the commit transaction support added in Database2.
test = db2.begin_transaction('test')
test.insert(102, 'Seth')
test.print()
db2.commit_transaction('test')
db2.test.print()
# Prepare some supports for the test that follows.
import time
def delay(seconds, handler, table):
time.sleep(seconds)
handler(table)
def async_commit(db, action, table, wait):
_thread.start_new_thread(delay,
(wait, getattr(db, action + '_transaction'), table))
try:
nw2 = Database2.load('northwind2.db')
except IOError:
return
# Test waiting on a locked table before transaction.
print('Starting transaction ...')
categories = nw2.begin_transaction('Categories')
print('Simulating processing ...')
async_commit(nw2, 'commit', 'Categories', 2)
print('Holding for release ...')
categories = nw2.begin_transaction('Categories', True)
print('Rolling back the table ...')
nw2.rollback_transaction('Categories')
return nw2
################################################################################
if __name__ == '__main__':
test()
|
The latest version includes some classes designed to simplify the syntax used to define the relationships among rows. Some extra tools without the example tables are also provided below.
test.py
import string
import me
from pydb import *
class _PickleData:
def __getstate__(self):
return self.data
def __setstate__(self, state):
self.__init__(state)
class Key(me.Key, _PickleData):
def __init__(self, data=None):
if data is not None:
super().__init__(data)
class Primer(me.Primer, _PickleData):
def __init__(self, data=None):
if data is not None:
super().__init__(data)
PRINTABLE = string.digits + string.ascii_letters + string.punctuation
def main():
db = Database2()
db.create('crypt', schema(name=str,
key=Key,
primer=Primer))
key = Key.new(PRINTABLE.encode(), 1 << 8)
primer = Primer.new(key)
db.crypt.insert(name='master', key=key, primer=primer)
create_table(db, 'ccbat', CCBAT)
create_table(db, 'addrs', ADDRS)
create_table(db, 'contacts', CONTACTS)
db.save('rejected_cards.db')
secure = lambda string: encrypt(db, 'master', string)
to_char = lambda integer: '{:04}'.format(integer)
lpad = lambda string: string.strip().zfill(2)
print(Table.from_iter(obfuscate(db.ccbat)))
print(Table.from_iter(obfuscate(db.addrs)))
print(Table.from_iter(obfuscate(db.contacts)))
globals().update(locals())
SK = Key.new(PRINTABLE.encode(), 1 << 10)
SP = Primer.new(SK)
IK = Key.new(string.digits.encode(), 1 << 10)
IP = Primer.new(IK)
def obfuscate(table):
types = {str: lambda s: me.encrypt_str(s, SK, SP)[0],
int: lambda i: i if i in {0, 1} else
int(me.encrypt_str(str(i), IK, IP)[0]),
float: lambda i: i if i in {0, 1} else
float(me.encrypt_str(str(i), IK, IP)[0])}
handler = {c: types.get(t, lambda v: v) for c, t in table.schema}
stream = iter(table)
columns = next(stream)
yield columns
for row in stream:
yield tuple(handler[k](v) for k, v in zip(columns, row))
def minimize(table):
distinct = table.distinct()
table.truncate()
distinct.into(table)
def encrypt(db, name, string):
locals().update(zip(*db.crypt.where(ROW.name == name)))
head, tail = string[:-4], string[-4:]
return me.encrypt_str(string[:-4], key, primer)[0] + tail
schema = lambda **columns: sorted(columns.items())
to_str = lambda value: value
to_bool_y = lambda value: value.upper() == 'Y'
to_bool_n = lambda value: value.upper() == 'N'
to_bool_01 = lambda value: bool(int(value))
to_int = lambda value: int(value) if value else int()
to_float = lambda value: float(value) if value else float()
def to_date_1(value):
if value in {'', '0'}:
return date()
return datetime.strptime(value, '%Y%m%d').date()
def to_date_2(value):
if not value:
return date()
try:
return datetime.strptime(value, '%d-%b-%y').date()
except ValueError:
return datetime.strptime(value, '%d-%b-%Y').date()
def is_type(type_, value):
try:
if value:
type_(value)
return True
except ValueError:
return False
def create_table(db, name, text):
s = guess_schema(text)
table = db.create(name, schema(**{k: v[0] for k, v in s.items()}))
for row in dict_from_table(text):
for key, value in row.items():
row[key] = s[key][1](value)
table.insert(**row)
def guess_schema(table):
columns = {}
for row in dict_from_table(table):
for key, value in row.items():
columns.setdefault(key, []).append(value)
for name, array in columns.items():
if all(value == '' for value in array):
columns[name] = str, to_str
elif all(value.upper() in {'', 'Y'} for value in array):
columns[name] = bool, to_bool_y
elif all(value.upper() in {'', 'N'} for value in array):
columns[name] = bool, to_bool_n
elif all(value.upper() in {'', 'Y', 'N'} for value in array):
columns[name] = bool, to_bool_y
elif all(value.upper() in {'0', '1'} for value in array):
columns[name] = bool, to_bool_01
elif all(is_type(to_date_1, value) for value in array):
columns[name] = date, to_date_1
elif all(is_type(int, value) for value in array):
columns[name] = int, to_int
elif all(is_type(float, value) for value in array):
columns[name] = float, to_float
elif all(is_type(to_date_2, value) for value in array):
columns[name] = date, to_date_2
else:
columns[name] = str, to_str
return columns
def dict_from_table(table):
columns, *rows = rows_from_table(table)
columns = tuple(name.lower() for name in columns)
yield from (dict(zip(columns, row)) for row in rows)
def rows_from_table(table):
_, names, space, *rows = table.split('\n')
space = space.split(' ')
total = sum(column.count('-') for column in space)
space = tuple(map(len, space))
assert sum(space) == total, 'Problem with the spacing information!'
yield tuple(columns_from_row(names, space))
yield from (tuple(columns_from_row(row, space)) for row in rows)
def columns_from_row(row, space):
index = 0
for length in space:
yield row[index:index+length].strip()
index += length + 1
if __name__ == '__main__':
main()
me.py
"""Provide an implementation of Markov Encryption for simplified use.
This module exposes primitives useful for executing Markov Encryption
processes. ME was inspired by a combination of Markov chains with the
puzzles of Sudoku. This implementation has undergone numerous changes
and optimizations since its original design. Please see documentation."""
__author__ = 'Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>'
__date__ = '5 September 2012'
__version__ = 2, 0, 7
################################################################################
# Import several functions needed later in the code.
from random import SystemRandom
from sys import _getframe
from collections import deque
################################################################################
# Create some tools to use in the classes down below.
_CHAOS = SystemRandom()
def slots(names=''):
"""Set the __slots__ variable in the calling context with private names.
This function allows a convenient syntax when specifying the slots
used in a class. Simply call it in a class definition context with
the needed names. Locals are modified with private slot names."""
_getframe(1).f_locals['__slots__'] = \
tuple('__' + name for name in names.replace(',', ' ').split())
################################################################################
# Implement a Key primitive data type for Markov Encryption.
class Key:
"""Key(data) -> Key instance
This class represents a Markov Encryption Key primitive. It allows for
easy key creation, checks for proper data construction, and helps with
encoding and decoding indexes based on cached internal tables."""
slots('data dimensions base size encoder axes order decoder')
@classmethod
def new(cls, bytes_used, chain_size):
"""Return a Key instance created from bytes_used and chain_size.
Creating a new key is easy with this method. Call this class method
with the bytes you want the key to recognize along with the size of
the chains you want the encryption/decryption processes to use."""
selection, blocks = list(set(bytes_used)), []
for _ in range(chain_size):
_CHAOS.shuffle(selection)
blocks.append(bytes(selection))
return cls(tuple(blocks))
def __init__(self, data):
"""Initialize the Key instance's variables after testing the data.
Keys are created with tuples of carefully constructed bytes arrays.
This method tests the given data before going on to build internal
tables for efficient encoding and decoding methods later on."""
self.__test_data(data)
self.__make_vars(data)
@staticmethod
def __test_data(data):
"""Test the data for correctness in its construction.
The data must be a tuple of at least two byte arrays. Each byte
array must have at least two bytes, all of which must be unique.
Furthermore, all arrays should share the exact same byte set."""
if not isinstance(data, tuple):
raise TypeError('Data must be a tuple object!')
if len(data) < 2:
raise ValueError('Data must contain at least two items!')
item = data[0]
if not isinstance(item, bytes):
raise TypeError('Data items must be bytes objects!')
length = len(item)
if length < 2:
raise ValueError('Data items must contain at least two bytes!')
unique = set(item)
if len(unique) != length:
raise ValueError('Data items must contain unique byte sets!')
for item in data[1:]:
if not isinstance(item, bytes):
raise TypeError('Data items must be bytes objects!')
next_length = len(item)
if next_length != length:
raise ValueError('All data items must have the same size!')
next_unique = set(item)
if len(next_unique) != next_length:
raise ValueError('Data items must contain unique byte sets!')
if next_unique ^ unique:
raise ValueError('All data items must use the same byte set!')
def __make_vars(self, data):
"""Build various internal tables for optimized calculations.
Encoding and decoding rely on complex relationships with the given
data. This method caches several of these key relationships for use
when the encryption and decryption processes are being executed."""
self.__data = data
self.__dimensions = len(data)
base, *mutations = data
self.__base = base = tuple(base)
self.__size = size = len(base)
offset = -sum(base.index(block[0]) for block in mutations[:-1]) % size
self.__encoder = base[offset:] + base[:offset]
self.__axes = tuple(reversed([tuple(base.index(byte) for byte in block)
for block in mutations]))
self.__order = key = tuple(sorted(base))
grid = []
for rotation in range(size):
block, row = base[rotation:] + base[:rotation], [None] * size
for byte, value in zip(block, key):
row[key.index(byte)] = value
grid.append(tuple(row))
self.__decoder = tuple(grid[offset:] + grid[:offset])
def test_primer(self, primer):
"""Raise an error if the primer is not compatible with this key.
Key and primers have a certain relationship that must be maintained
in order for them to work together. Since the primer understands
the requirements, it is asked to check this key for compatibility."""
primer.test_key(self)
def encode(self, index):
"""Encode index based on internal tables and return byte code.
An index probes into the various axes of the multidimensional,
virtual grid that a key represents. The index is evaluated, and
the value at its coordinates is returned by running this method."""
assert len(index) == self.__dimensions, \
'Index size is not compatible with key dimensions!'
*probes, current = index
return self.__encoder[(sum(table[probe] for table, probe in
zip(self.__axes, probes)) + current) % self.__size]
def decode(self, index):
"""Decode index based on internal tables and return byte code.
Decoding does the exact same thing as encoding, but it indexes
into a virtual grid that represents the inverse of the encoding
grid. Tables are used to make the process fast and efficient."""
assert len(index) == self.__dimensions, \
'Index size is not compatible with key dimensions!'
*probes, current = index
return self.__decoder[sum(table[probe] for table, probe in
zip(self.__axes, probes)) % self.__size][current]
@property
def data(self):
"""Data that the instance was initialized with.
This is the tuple of byte arrays used to create this key and can
be used to create an exact copy of this key at some later time."""
return self.__data
@property
def dimensions(self):
"""Dimensions that the internal, virtual grid contains.
The virtual grid has a number of axes that can be referenced when
indexing into it, and this number is the count of its dimensions."""
return self.__dimensions
@property
def base(self):
"""Base value that the internal grid is built from.
The Sudoku nature of the grid comes from rotating this value by
offsets, keeping values unique along any axis while traveling."""
return self.__base
@property
def order(self):
"""Order of base after its values have been sorted.
A sorted base is important when constructing inverse rows and when
encoding raw bytes for use in updating an encode/decode index."""
return self.__order
################################################################################
# Implement a Primer primitive data type for Markov Encryption.
class Primer:
"""Primer(data) -> Primer instance
This class represents a Markov Encryption Primer primitive. It is very
important for starting both the encryption and decryption processes. A
method is provided for their easy creation with a related key."""
slots('data')
@classmethod
def new(cls, key):
"""Return a Primer instance from a parent Key.
Primers must be compatible with the keys they are used with. This
method takes a key and constructs a cryptographically sound primer
that is ready to use in the beginning stages of encryption."""
base = key.base
return cls(bytes(_CHAOS.choice(base)
for _ in range(key.dimensions - 1)))
def __init__(self, data):
"""Initialize the Primer instance after testing validity of data.
Though not as complicated in its requirements as keys, primers do
need some simple structure in the data they are given. A checking
method is run before saving the data to the instance's attribute."""
self.__test_data(data)
self.__data = data
@staticmethod
def __test_data(data):
"""Test the data for correctness and test the data.
In order for the primer to be compatible with the nature of the
Markov Encryption processes, the data must be an array of bytes;
and to act as a primer, it must contain at least some information."""
if not isinstance(data, bytes):
raise TypeError('Data must be a bytes object!')
if not data:
raise ValueError('Data must contain at least one byte!')
def test_key(self, key):
"""Raise an error if the key is not compatible with this primer.
Primers provide needed data to start encryption and decryption. For
it be compatible with a key, it must contain one byte less than the
key's dimensions and must be a subset of the base in the key."""
if len(self.__data) != key.dimensions - 1:
raise ValueError('Key size must be one more than the primer size!')
if not set(self.__data).issubset(key.base):
raise ValueError('Key data must be a superset of primer data!')
@property
def data(self):
"""Data that the instance was initialized with.
This is the byte array used to create this primer and can be used
if desired to create an copy of this primer at some later time."""
return self.__data
################################################################################
# Create an abstract processing class for use in encryption and decryption.
class _Processor:
"""_Processor(key, primer) -> NotImplementedError exception
This class acts as a base for the encryption and decryption processes.
The given key is saved, and several tables are created along with an
index. Since it is abstract, calling the class will raise an exception."""
slots('key into index from')
def __init__(self, key, primer):
"""Initialize the _Processor instance if it is from a child class.
After passing several tests for creating a valid processing object,
the key is saved, and the primer is used to start an index. Tables
are also formed for converting byte values between systems."""
if type(self) is _Processor:
raise NotImplementedError('This is an abstract class!')
key.test_primer(primer)
self.__key = key
self.__into = table = dict(map(reversed, enumerate(key.order)))
self.__index = deque(map(table.__getitem__, primer.data),
key.dimensions)
self.__from = dict(map(reversed, table.items()))
def process(self, data):
"""Process the data and return its transformed state.
A cache for the data transformation is created and an internal
method is run to quickly encode or decode the given bytes. The
cache is finally converted to immutable bytes when returned."""
cache = bytearray()
self._run(data, cache.append, self.__key, self.__into, self.__index)
return bytes(cache)
@staticmethod
def _run(data, cache_append, key, table, index):
"""Run the processing algorithm in an overloaded method.
Since this is only an abstract base class for encoding/decoding,
this method will raise an exception when run. Inheriting classes
should implement whatever is appropriate for the intended function."""
raise NotImplementedError('This is an abstract method!')
@property
def primer(self):
"""Primer representing the state of the internal index.
The index can be retrieved as a primer, useful for initializing
another processor in the same starting state as the current one."""
index = self.__index
index.append(None)
index.pop()
return Primer(bytes(map(self.__from.__getitem__, index)))
################################################################################
# Inherit from _Processor and implement the ME encoding algorithm.
class Encrypter(_Processor):
"""Encrypter(key, primer) -> Encrypter instance
This class represents a state-aware encryption engine that can be fed
data and will return a stream of coherent cipher-text. An index is
maintained, and a state-continuation primer can be retrieved at will."""
slots()
@staticmethod
def _run(data, cache_append, key, table, index):
"""Encrypt the data with the given arguments.
To run the encryption process as fast as possible, methods are
cached as names. As the algorithm operates, only recognized bytes
are encoded while running through the selective processing loop."""
encode, index_append = key.encode, index.append
for byte in data:
if byte in table:
index_append(table[byte])
cache_append(encode(index))
else:
cache_append(byte)
################################################################################
# Inherit from _Processor and implement the ME decoding algorithm.
class Decrypter(_Processor):
"""Decrypter(key, primer) -> Decrypter instance
This class represents a state-aware decryption engine that can be fed
data and will return a stream of coherent plain-text. An index is
maintained, and a state-continuation primer can be retrieved at will."""
slots()
@staticmethod
def _run(data, cache_append, key, table, index):
"""Decrypt the data with the given arguments.
To run the decryption process as fast as possible, methods are
cached as names. As the algorithm operates, only recognized bytes
are decoded while running through the selective processing loop."""
decode, index_append = key.decode, index.append
for byte in data:
if byte in table:
index_append(table[byte])
value = decode(index)
cache_append(value)
index[-1] = table[value]
else:
cache_append(byte)
################################################################################
# Provide functions to easily encrypt and decrypt both bytes and strings.
def encrypt_bytes(data, key, primer):
"""Return encoded data processed with the key and primer.
This function is a shortcut for creating an Encrypter instance,
processing the data with the encryption engine, and returning the
cipher-text along with the primer representing the engine's state."""
engine = Encrypter(key, primer)
return engine.process(data), engine.primer
def decrypt_bytes(data, key, primer):
"""Return decoded data processed with the key and primer.
This function is a shortcut for creating a Decrypter instance,
processing the data with the decryption engine, and returning the
plain-text along with the primer representing the engine's state."""
engine = Decrypter(key, primer)
return engine.process(data), engine.primer
def encrypt_str(string, key, primer, encoding='utf-8', errors='ignore'):
"""Encode string with key and primer through binary interface.
This function does its best to encrypt a string with a key and primer,
taking the string's encoding into account and handling errors as given
in the keyword arguments following the standard byte-level arguments."""
engine = Encrypter(key, primer)
return engine.process(string.encode(encoding, errors))\
.decode(encoding, errors), engine.primer
def decrypt_str(string, key, primer, encoding='utf-8', errors='ignore'):
"""Decode string with key and primer through binary interface.
This function does its best to decrypt a string with a key and primer,
taking the string's encoding into account and handling errors as given
in the keyword arguments following the standard byte-level arguments."""
engine = Decrypter(key, primer)
return engine.process(string.encode(encoding, errors))\
.decode(encoding, errors), engine.primer
################################################################################
# Allow immediate encryption with automatically generated keys and primers.
def auto_encrypt_bytes(data, chain_size, plain_text=b''):
"""Encrypt data with automatically generated key and primer.
This function automates key and primer creation and encrypts the
data with them. The two arguments following the data allow some
simple customizations of the key and primer generation process."""
key = Key.new(set(data) - set(plain_text), chain_size)
primer = Primer.new(key)
return Encrypter(key, primer).process(data), key, primer
def auto_encrypt_str(string, chain_size, plain_text='',
encoding='utf-8', errors='ignore'):
"""Encrypt string with automatically generated key and primer.
This function automates key and primer creation and encrypts the
string with them. The two arguments following the data allow some
simple customizations of the key and primer generation process."""
string, plain_text = string.encode(encoding, errors), \
plain_text.encode(encoding, errors)
key = Key.new(set(string) - set(plain_text), chain_size)
primer = Primer.new(key)
return Encrypter(key, primer).process(string)\
.decode(encoding, errors), key, primer