This recipe can be used to sort very large files (millions of records) in Python. No record termination character is required, hence a record may contain embedded binary data, newlines, etc. You can specify how many temporary files to use and where they are located.
| from heapq import heapify, heappop, heappush
import tempfile
import os
import sys
import struct
class VariableLengthRecordFile(file):
def __init__(self, name, mode, bufsize = -1):
file.__init__(self, name, mode, bufsize)
self.headerFormat = "i"
self.headerLength = struct.calcsize(self.headerFormat)
def readline(self):
header = self.read(self.headerLength)
if header == "":
return (-2, "")
recordLength = struct.unpack(self.headerFormat, header)[0]
if recordLength == -1:
return (-1, "")
return (1, self.read(recordLength))
def writeline(self, s):
self.write(struct.pack(self.headerFormat, len(s)))
self.write(s)
def mark(self):
self.write(struct.pack(self.headerFormat, -1))
class SortExternal:
def __init__(self, buffer_size = 200000, filenum = 16):
self.buffer_size = buffer_size
self.tempdir = tempfile.mkdtemp()
self.chunk = []
self.chunksize = 0
self.inputChunkFiles = []
self.outputChunkFiles = []
for i in range(filenum):
filename = os.path.join(self.tempdir, "sort-%06i" % i)
self.inputChunkFiles.append(VariableLengthRecordFile(filename,'w+b',64*1024))
for i in range(filenum, filenum * 2):
filename = os.path.join(self.tempdir, "sort-%06i" %i )
self.outputChunkFiles.append(VariableLengthRecordFile(filename,'w+b',64*1024))
self.currOutputFile = -1
self.chunkDepth = 1
def __iter__(self):
return self
def put(self, value):
self.chunk.append(value)
self.chunksize = self.chunksize + len(value)
if self.chunksize < self.buffer_size:
return
self.chunk.sort()
self.put_chunk(self.chunk)
self.chunk = []
self.chunksize = 0
def put_chunk(self, valueIterator):
self.currOutputFile = self.currOutputFile + 1
if self.currOutputFile >= len(self.outputChunkFiles):
self.currOutputFile = 0
self.chunkDepth = self.chunkDepth + 1
for value in valueIterator:
#sys.stderr.write(value + "\n")
self.outputChunkFiles[self.currOutputFile].writeline(value)
self.outputChunkFiles[self.currOutputFile].mark()
def sort(self):
if len(self.chunk) > 0:
self.chunk.sort()
self.put_chunk(self.chunk)
while self.chunkDepth > 1:
self.mergeFiles()
t = self.inputChunkFiles
self.inputChunkFiles = self.outputChunkFiles
self.outputChunkFiles = t
for f in self.inputChunkFiles:
f.flush()
f.seek(0)
self.prepareChunkMerge()
def prepareChunkMerge(self):
self.chunkHeap = []
for chunkFile in self.inputChunkFiles:
status, value = chunkFile.readline()
if status > 0:
heappush(self.chunkHeap,(value,chunkFile))
def mergeFiles(self):
t = self.inputChunkFiles
self.inputChunkFiles = self.outputChunkFiles
self.outputChunkFiles = t
self.currOutputFile = -1
self.chunkDepth = 1
for f in self.outputChunkFiles:
f.flush()
f.truncate(0)
f.seek(0)
for f in self.inputChunkFiles:
f.flush()
f.seek(0)
# for each layer of chunks
while True:
self.prepareChunkMerge()
if not self.chunkHeap:
break
self.put_chunk(self)
def next(self):
# merges current chunk layer
if not self.chunkHeap:
raise StopIteration
value, chunkFile = heappop(self.chunkHeap)
returnValue = value
status, value = chunkFile.readline()
if status > 0:
heappush(self.chunkHeap, (value, chunkFile))
#sys.stderr.write("Value: %s\n" % returnValue)
return returnValue
def cleanup(self):
for chunkFile in self.inputChunkFiles:
chunkFile.close()
os.remove(chunkFile.name)
for chunkFile in self.outputChunkFiles:
chunkFile.close()
os.remove(chunkFile.name)
os.rmdir(self.tempdir)
if __name__ == '__main__':
# example usage
import random
s = SortExternal(buffer_size=100000, filenum=32)
for i in range(100000):
line = "%08i" % random.randint(0, 99999999)
sys.stderr.write(">" + repr(line) + "\n")
s.put(line)
s.sort()
for line in s:
sys.stderr.write("<" + repr(line) + "\n")
|
This recipe borrows heavily from another recipe: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/466302
It can handle files even bigger than that recipe, because it stores multiple sequences of sorted records ("chunks") in each temporary file and merges them in multiple passes.
The size of file that can be sorted is limited only by the temporary disk space available. You need about twice as much temporary space as the original file size.
The number of temporary files to use can be specified as an argument. More is better but you get diminishing returns after 10 or so.
Input records are transformed into "variable length" records by prepending them with a 4-byte record length field. For this reason there are no practical limitations on the size or contents of records. There is no requirement for any particular end-of-line character. If records already have a newline or zero at the end, it doesn't hurt to leave them there -- they will be retained.
For simplicity and speed the algorithm sorts using binary collation starting at column 1. Because the sort comparison is based on the entire record the notion of sort "stability" does not apply.
If other comparison functions are needed the above mentioned script (466302)shows how to do that.