This is a pure Python implementation of the rsync algorithm. On my desktop (3.0GHz dual core, 7200RPM), best case throughput for target file hash generation and delta generation is around 2.9MB/s. Absolute worst case scenario (no blocks in common) throughput for delta generation is 200KB/s to 300KB/s on the same system.
Tested in Python 2.5, 2.6, and 3.1. In 2.7, io.BufferedReader should yield the best throughput. On all other versions use __builtin__.open.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This is a pure Python implementation of the [rsync algorithm](TM96).
[TM96] Andrew Tridgell and Paul Mackerras. The rsync algorithm.
Technical Report TR-CS-96-05, Canberra 0200 ACT, Australia, 1996.
http://samba.anu.edu.au/rsync/.
### Example Use Case: ###
# On the system containing the file that needs to be patched
>>> unpatched = open("unpatched.file", "rb")
>>> hashes = blockchecksums(unpatched)
# On the remote system after having received `hashes`
>>> patchedfile = open("patched.file", "rb")
>>> delta = rsyncdelta(patchedfile, hashes)
# System with the unpatched file after receiving `delta`
>>> unpatched.seek(0)
>>> save_to = open("locally-patched.file", "wb")
>>> patchstream(unpatched, save_to, delta)
"""
import collections
import hashlib
if not(hasattr(__builtins__, "bytes")) or str is bytes:
# Python 2.x compatibility
def bytes(var, *args):
try:
return ''.join(map(chr, var))
except TypeError:
return map(ord, var)
__all__ = ["rollingchecksum", "weakchecksum", "patchstream", "rsyncdelta",
"blockchecksums"]
def rsyncdelta(datastream, remotesignatures, blocksize=4096):
"""
Generates a binary patch when supplied with the weak and strong
hashes from an unpatched target and a readable stream for the
up-to-date data. The blocksize must be the same as the value
used to generate remotesignatures.
"""
remote_weak, remote_strong = remotesignatures
match = True
matchblock = -1
deltaqueue = collections.deque()
while True:
if match and datastream is not None:
# Whenever there is a match or the loop is running for the first
# time, populate the window using weakchecksum instead of rolling
# through every single byte which takes at least twice as long.
window = collections.deque(bytes(datastream.read(blocksize)))
checksum, a, b = weakchecksum(window)
try:
# If there are two identical weak checksums in a file, and the
# matching strong hash does not occur at the first match, it will
# be missed and the data sent over. May fix eventually, but this
# problem arises very rarely.
matchblock = remote_weak.index(checksum, matchblock + 1)
stronghash = hashlib.md5(bytes(window)).hexdigest()
matchblock = remote_strong.index(stronghash, matchblock)
match = True
deltaqueue.append(matchblock)
if datastream.closed:
break
continue
except ValueError:
# The weakchecksum did not match
match = False
try:
if datastream:
# Get the next byte and affix to the window
newbyte = ord(datastream.read(1))
window.append(newbyte)
except TypeError:
# No more data from the file; the window will slowly shrink.
# newbyte needs to be zero from here on to keep the checksum
# correct.
newbyte = 0
tailsize = datastream.tell() % blocksize
datastream = None
if datastream is None and len(window) <= tailsize:
# The likelihood that any blocks will match after this is
# nearly nil so call it quits.
deltaqueue.append(window)
break
# Yank off the extra byte and calculate the new window checksum
oldbyte = window.popleft()
checksum, a, b = rollingchecksum(oldbyte, newbyte, a, b, blocksize)
# Add the old byte the file delta. This is data that was not found
# inside of a matching block so it needs to be sent to the target.
try:
deltaqueue[-1].append(oldbyte)
except (AttributeError, IndexError):
deltaqueue.append([oldbyte])
# Return a delta that starts with the blocksize and converts all iterables
# to bytes.
deltastructure = [blocksize]
for element in deltaqueue:
if isinstance(element, int):
deltastructure.append(element)
elif element:
deltastructure.append(bytes(element))
return deltastructure
def blockchecksums(instream, blocksize=4096):
"""
Returns a list of weak and strong hashes for each block of the
defined size for the given data stream.
"""
weakhashes = list()
stronghashes = list()
read = instream.read(blocksize)
while read:
weakhashes.append(weakchecksum(bytes(read))[0])
stronghashes.append(hashlib.md5(read).hexdigest())
read = instream.read(blocksize)
return weakhashes, stronghashes
def patchstream(instream, outstream, delta):
"""
Patches instream using the supplied delta and write the resultantant
data to outstream.
"""
blocksize = delta[0]
for element in delta[1:]:
if isinstance(element, int) and blocksize:
instream.seek(element * blocksize)
element = instream.read(blocksize)
outstream.write(element)
def rollingchecksum(removed, new, a, b, blocksize=4096):
"""
Generates a new weak checksum when supplied with the internal state
of the checksum calculation for the previous window, the removed
byte, and the added byte.
"""
a -= removed - new
b -= removed * blocksize - a
return (b << 16) | a, a, b
def weakchecksum(data):
"""
Generates a weak checksum from an iterable set of bytes.
"""
a = b = 0
l = len(data)
for i in range(l):
a += data[i]
b += (l - i)*data[i]
return (b << 16) | a, a, b
|
Example Use Case:
# On the system containing the file that needs to be patched
>>> unpatched = open("unpatched.file", "rb")
>>> hashes = blockchecksums(unpatched)
# On the remote system after having received `hashes`
>>> patchedfile = open("patched.file", "rb")
>>> delta = rsyncdelta(patchedfile, hashes)
# System with the unpatched file after receiving `delta`
>>> unpatched.seek(0)
>>> save_to = open("locally-patched.file", "wb")
>>> patchstream(unpatched, save_to, delta)
Module test can be found here (just stick it at the end or from rsyncpy import *
).
Thanks for the code. Please, don't use small-L for a variable name. It looks like number one: 1.