Welcome, guest | Sign In | My Account | Store | Cart

This recipe can be used to sort very large files (millions of records) in Python. No record termination character is required, hence a record may contain embedded binary data, newlines, etc. You can specify how many temporary files to use and where they are located.

Python, 184 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
from heapq import heapify, heappop, heappush
import tempfile
import os
import sys
import struct

class VariableLengthRecordFile(file):

    def __init__(self, name, mode, bufsize = -1):
        file.__init__(self, name, mode, bufsize)
        self.headerFormat = "i"
        self.headerLength = struct.calcsize(self.headerFormat)
    
    def readline(self):

        header = self.read(self.headerLength)
        if header == "":
            return (-2, "")

        recordLength = struct.unpack(self.headerFormat, header)[0]
        if recordLength == -1:
            return (-1, "")

        return (1, self.read(recordLength))

    def writeline(self, s):
            
        self.write(struct.pack(self.headerFormat, len(s)))
        self.write(s)

    def mark(self):
        
        self.write(struct.pack(self.headerFormat, -1))

class SortExternal:

    def __init__(self, buffer_size = 200000, filenum = 16):
        self.buffer_size = buffer_size
        self.tempdir = tempfile.mkdtemp()
        self.chunk = []
        self.chunksize = 0
        
        self.inputChunkFiles = []
        self.outputChunkFiles = []
        
        for i in range(filenum):
            filename = os.path.join(self.tempdir, "sort-%06i" % i)
            self.inputChunkFiles.append(VariableLengthRecordFile(filename,'w+b',64*1024))
        for i in range(filenum, filenum * 2):
            filename = os.path.join(self.tempdir, "sort-%06i" %i )
            self.outputChunkFiles.append(VariableLengthRecordFile(filename,'w+b',64*1024))

        self.currOutputFile = -1
        self.chunkDepth = 1

    def __iter__(self):
        return self
    
    def put(self, value):
    
        self.chunk.append(value)
        self.chunksize = self.chunksize + len(value)
        
        if self.chunksize < self.buffer_size:
            return

        self.chunk.sort()
        self.put_chunk(self.chunk)
        self.chunk = []
        self.chunksize = 0

    def put_chunk(self, valueIterator):

        self.currOutputFile = self.currOutputFile + 1
        if self.currOutputFile >= len(self.outputChunkFiles):
            self.currOutputFile = 0
            self.chunkDepth = self.chunkDepth + 1

        for value in valueIterator:
            #sys.stderr.write(value + "\n")
            self.outputChunkFiles[self.currOutputFile].writeline(value)

        self.outputChunkFiles[self.currOutputFile].mark()

    def sort(self):

        if len(self.chunk) > 0:
            self.chunk.sort()
            self.put_chunk(self.chunk)

        while self.chunkDepth > 1:
            self.mergeFiles()

        t = self.inputChunkFiles
        self.inputChunkFiles = self.outputChunkFiles
        self.outputChunkFiles = t
        
        for f in self.inputChunkFiles:
            f.flush()
            f.seek(0)

        self.prepareChunkMerge()
        
    def prepareChunkMerge(self):
        
        self.chunkHeap = []

        for chunkFile in self.inputChunkFiles:
            status, value = chunkFile.readline()
            if status > 0:
                heappush(self.chunkHeap,(value,chunkFile))

    def mergeFiles(self):

        t = self.inputChunkFiles
        self.inputChunkFiles = self.outputChunkFiles
        self.outputChunkFiles = t

        self.currOutputFile = -1
        self.chunkDepth = 1

        for f in self.outputChunkFiles:
            f.flush()
            f.truncate(0)
            f.seek(0)
        
        for f in self.inputChunkFiles:
            f.flush()
            f.seek(0)

        # for each layer of chunks
        while True:
            self.prepareChunkMerge()
            if not self.chunkHeap:
                break
            self.put_chunk(self)
        
    def next(self):
        # merges current chunk layer
        
        if not self.chunkHeap:
            raise StopIteration

        value, chunkFile = heappop(self.chunkHeap)

        returnValue = value
        status, value = chunkFile.readline()
        if status > 0:
            heappush(self.chunkHeap, (value, chunkFile))

        #sys.stderr.write("Value: %s\n" % returnValue)

        return returnValue
        

    def cleanup(self):

        for chunkFile in self.inputChunkFiles:
            chunkFile.close()
            os.remove(chunkFile.name)

        for chunkFile in self.outputChunkFiles:
            chunkFile.close()
            os.remove(chunkFile.name)

        os.rmdir(self.tempdir)

if __name__ == '__main__':

    # example usage

    import random
    
    s = SortExternal(buffer_size=100000, filenum=32)

    for i in range(100000):
        line = "%08i" % random.randint(0, 99999999)
        sys.stderr.write(">" + repr(line) + "\n")
        s.put(line)

    s.sort()
        
    for line in s:
        sys.stderr.write("<" + repr(line) + "\n")

This recipe borrows heavily from another recipe: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/466302

It can handle files even bigger than that recipe, because it stores multiple sequences of sorted records ("chunks") in each temporary file and merges them in multiple passes.

The size of file that can be sorted is limited only by the temporary disk space available. You need about twice as much temporary space as the original file size.

The number of temporary files to use can be specified as an argument. More is better but you get diminishing returns after 10 or so.

Input records are transformed into "variable length" records by prepending them with a 4-byte record length field. For this reason there are no practical limitations on the size or contents of records. There is no requirement for any particular end-of-line character. If records already have a newline or zero at the end, it doesn't hurt to leave them there -- they will be retained.

For simplicity and speed the algorithm sorts using binary collation starting at column 1. Because the sort comparison is based on the entire record the notion of sort "stability" does not apply.

If other comparison functions are needed the above mentioned script (466302)shows how to do that.