This recipe can be used to sort very large files (millions of records) in Python. No record termination character is required, hence a record may contain embedded binary data, newlines, etc. You can specify how many temporary files to use and where they are located.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | from heapq import heapify, heappop, heappush
import tempfile
import os
import sys
import struct
class VariableLengthRecordFile(file):
def __init__(self, name, mode, bufsize = -1):
file.__init__(self, name, mode, bufsize)
self.headerFormat = "i"
self.headerLength = struct.calcsize(self.headerFormat)
def readline(self):
header = self.read(self.headerLength)
if header == "":
return (-2, "")
recordLength = struct.unpack(self.headerFormat, header)[0]
if recordLength == -1:
return (-1, "")
return (1, self.read(recordLength))
def writeline(self, s):
self.write(struct.pack(self.headerFormat, len(s)))
self.write(s)
def mark(self):
self.write(struct.pack(self.headerFormat, -1))
class SortExternal:
def __init__(self, buffer_size = 200000, filenum = 16):
self.buffer_size = buffer_size
self.tempdir = tempfile.mkdtemp()
self.chunk = []
self.chunksize = 0
self.inputChunkFiles = []
self.outputChunkFiles = []
for i in range(filenum):
filename = os.path.join(self.tempdir, "sort-%06i" % i)
self.inputChunkFiles.append(VariableLengthRecordFile(filename,'w+b',64*1024))
for i in range(filenum, filenum * 2):
filename = os.path.join(self.tempdir, "sort-%06i" %i )
self.outputChunkFiles.append(VariableLengthRecordFile(filename,'w+b',64*1024))
self.currOutputFile = -1
self.chunkDepth = 1
def __iter__(self):
return self
def put(self, value):
self.chunk.append(value)
self.chunksize = self.chunksize + len(value)
if self.chunksize < self.buffer_size:
return
self.chunk.sort()
self.put_chunk(self.chunk)
self.chunk = []
self.chunksize = 0
def put_chunk(self, valueIterator):
self.currOutputFile = self.currOutputFile + 1
if self.currOutputFile >= len(self.outputChunkFiles):
self.currOutputFile = 0
self.chunkDepth = self.chunkDepth + 1
for value in valueIterator:
#sys.stderr.write(value + "\n")
self.outputChunkFiles[self.currOutputFile].writeline(value)
self.outputChunkFiles[self.currOutputFile].mark()
def sort(self):
if len(self.chunk) > 0:
self.chunk.sort()
self.put_chunk(self.chunk)
while self.chunkDepth > 1:
self.mergeFiles()
t = self.inputChunkFiles
self.inputChunkFiles = self.outputChunkFiles
self.outputChunkFiles = t
for f in self.inputChunkFiles:
f.flush()
f.seek(0)
self.prepareChunkMerge()
def prepareChunkMerge(self):
self.chunkHeap = []
for chunkFile in self.inputChunkFiles:
status, value = chunkFile.readline()
if status > 0:
heappush(self.chunkHeap,(value,chunkFile))
def mergeFiles(self):
t = self.inputChunkFiles
self.inputChunkFiles = self.outputChunkFiles
self.outputChunkFiles = t
self.currOutputFile = -1
self.chunkDepth = 1
for f in self.outputChunkFiles:
f.flush()
f.truncate(0)
f.seek(0)
for f in self.inputChunkFiles:
f.flush()
f.seek(0)
# for each layer of chunks
while True:
self.prepareChunkMerge()
if not self.chunkHeap:
break
self.put_chunk(self)
def next(self):
# merges current chunk layer
if not self.chunkHeap:
raise StopIteration
value, chunkFile = heappop(self.chunkHeap)
returnValue = value
status, value = chunkFile.readline()
if status > 0:
heappush(self.chunkHeap, (value, chunkFile))
#sys.stderr.write("Value: %s\n" % returnValue)
return returnValue
def cleanup(self):
for chunkFile in self.inputChunkFiles:
chunkFile.close()
os.remove(chunkFile.name)
for chunkFile in self.outputChunkFiles:
chunkFile.close()
os.remove(chunkFile.name)
os.rmdir(self.tempdir)
if __name__ == '__main__':
# example usage
import random
s = SortExternal(buffer_size=100000, filenum=32)
for i in range(100000):
line = "%08i" % random.randint(0, 99999999)
sys.stderr.write(">" + repr(line) + "\n")
s.put(line)
s.sort()
for line in s:
sys.stderr.write("<" + repr(line) + "\n")
|
This recipe borrows heavily from another recipe: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/466302
It can handle files even bigger than that recipe, because it stores multiple sequences of sorted records ("chunks") in each temporary file and merges them in multiple passes.
The size of file that can be sorted is limited only by the temporary disk space available. You need about twice as much temporary space as the original file size.
The number of temporary files to use can be specified as an argument. More is better but you get diminishing returns after 10 or so.
Input records are transformed into "variable length" records by prepending them with a 4-byte record length field. For this reason there are no practical limitations on the size or contents of records. There is no requirement for any particular end-of-line character. If records already have a newline or zero at the end, it doesn't hurt to leave them there -- they will be retained.
For simplicity and speed the algorithm sorts using binary collation starting at column 1. Because the sort comparison is based on the entire record the notion of sort "stability" does not apply.
If other comparison functions are needed the above mentioned script (466302)shows how to do that.