Welcome, guest | Sign In | My Account | Store | Cart

This is just a rewrite of Recipe 466302 "Sorting big files the Python 2.4 way", taking advantage of heapq.merge, context managers, and other niceties of newer Python versions. It can be used to sort very large files (millions of records) in Python. No record termination character is required, hence a record may contain embedded binary data, newlines, etc. You can specify how many temporary files to use and where they are located.

Python, 100 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
# based on Recipe 466302: Sorting big files the Python 2.4 way
# by Nicolas Lehuen

import os
from tempfile import gettempdir
from itertools import islice, cycle
from collections import namedtuple
import heapq

Keyed = namedtuple("Keyed", ["key", "obj"])

def merge(key=None, *iterables):
    # based on code posted by Scott David Daniels in c.l.p.
    # http://groups.google.com/group/comp.lang.python/msg/484f01f1ea3c832d

    if key is None:
        keyed_iterables = iterables
    else:
        keyed_iterables = [(Keyed(key(obj), obj) for obj in iterable)
                            for iterable in iterables]
    for element in heapq.merge(*keyed_iterables):
        yield element.obj


def batch_sort(input, output, key=None, buffer_size=32000, tempdirs=None):
    if tempdirs is None:
        tempdirs = []
    if not tempdirs:
        tempdirs.append(gettempdir())

    chunks = []
    try:
        with open(input,'rb',64*1024) as input_file:
            input_iterator = iter(input_file)
            for tempdir in cycle(tempdirs):
                current_chunk = list(islice(input_iterator,buffer_size))
                if not current_chunk:
                    break
                current_chunk.sort(key=key)
                output_chunk = open(os.path.join(tempdir,'%06i'%len(chunks)),'w+b',64*1024)
                chunks.append(output_chunk)
                output_chunk.writelines(current_chunk)
                output_chunk.flush()
                output_chunk.seek(0)
        with open(output,'wb',64*1024) as output_file:
            output_file.writelines(merge(key, *chunks))
    finally:
        for chunk in chunks:
            try:
                chunk.close()
                os.remove(chunk.name)
            except Exception:
                pass


if __name__ == '__main__':
    import optparse
    parser = optparse.OptionParser()
    parser.add_option(
        '-b','--buffer',
        dest='buffer_size',
        type='int',default=32000,
        help='''Size of the line buffer. The file to sort is
            divided into chunks of that many lines. Default : 32,000 lines.'''
    )
    parser.add_option(
        '-k','--key',
        dest='key',
        help='''Python expression used to compute the key for each
            line, "lambda line:" is prepended.\n
            Example : -k "line[5:10]". By default, the whole line is the key.'''
    )
    parser.add_option(
        '-t','--tempdir',
        dest='tempdirs',
        action='append',
        default=[],
        help='''Temporary directory to use. You might get performance
            improvements if the temporary directory is not on the same physical
            disk than the input and output directories. You can even try
            providing multiples directories on differents physical disks.
            Use multiple -t options to do that.'''
    )
    parser.add_option(
        '-p','--psyco',
        dest='psyco',
        action='store_true',
        default=False,
        help='''Use Psyco.'''
    )
    options,args = parser.parse_args()

    if options.key:
        options.key = eval('lambda line : (%s)'%options.key)

    if options.psyco:
        import psyco
        psyco.full()

    batch_sort(args[0],args[1],options.key,options.buffer_size,options.tempdirs)

This is just a rewrite of Recipe 466302 "Sorting big files the Python 2.4 way", taking advantage of heapq.merge, context managers, and other niceties of newer Python versions.

Interface and command line usage are identical to the original recipe; most comments still apply too.

9 comments

harry m 12 years, 6 months ago  # | flag

Like it - this could be pretty useful. Just one thing - shouldn't the default argument for tempdirs on line 25 be None then assigned to an [gettempdir()] as lists used in default arguments are static (not that it matters in this code as the function is executed once - but could avoid some headaches if the function was used elsewhere). Nice little script though!

Gabriel Genellina (author) 12 years, 6 months ago  # | flag

Thanks - default argument fixed.

Dario Beraldi 11 years ago  # | flag

Thanks a lot for this! A probably naive question... What expression should I put in -k to sort in reverse order?

johannes thom 11 years ago  # | flag

Thanks for the script!

Im trying to use that script to sort huge files where in each line is a timestamp ascending by this timestamp. I added an import of the dateutil package to the script and tried dateutil.parser.parse(line[9:28]) as a key, which returns a datetime object for each line as a key.

The problem: The resulting sorting is just alphabetical according to the datetime string, but not ordered temporally. How could I achieve the right order?

tom 9 years, 6 months ago  # | flag

I'm running Python 2.65 on Ubuntu 10.04. Sorting a small text file gives me the following error:

Traceback (most recent call last): File "./batch_sort.py", line 108, in <module> batch_sort(args[0],args[1],options.key,options.buffer_size,options.tempdirs) File "./batch_sort.py", line 54, in batch_sort output_file.writelines(merge(key, *chunks)) File "./batch_sort.py", line 30, in merge yield element.obj AttributeError: 'str' object has no attribute 'obj'

Is this specific only so ActivePython?

tom 9 years, 6 months ago  # | flag

Found the solution with some help on stackoverflow.com (http://stackoverflow.com/questions/10665925/how-to-sort-huge-files-with-python)

def merge(key=None, *iterables):
    # based on code posted by Scott David Daniels in c.l.p.
    # http://groups.google.com/group/comp.lang.python/msg/484f01f1ea3c832d

    if key is None:
        for element in heapq.merge(*iterables):
            yield element
    else:
        keyed_iterables = [(Keyed(key(obj), obj) for obj in iterable)
                        for iterable in iterables]
        for element in heapq.merge(*keyed_iterables):
            yield element.obj
Think 8 years, 6 months ago  # | flag

Thank you very much for the sort functionality. I am trying to use this for a large csv file (250 Millions lines).

python sort.py -k"(line[2:8], int(line[1]))" filename.csv output.csv

when i execute the above script, i get the following error line 44 error output_chunk = open(os.path.join(tempdir, '%06i'%len(chunks)),'w+b', 64*1024) IOError: [Errno 24] Too many open files : C:\users\username\local\temp000508'

Any idea ? I really appreciate your help.

FYI : I am running on windows 7

Gabriel Genellina (author) 8 years, 6 months ago  # | flag

@Think: replace line 44 with:

with open(os.path.join(tempdir,'%06i'%len(chunks)),'w+b',64*1024) as output_chunk:

and indent the next 4 lines.

@Tom: thanks for the fix!

Think 8 years, 6 months ago  # | flag

Thank you for the help. I added to my script and able to sort a small file. However which I to sort a large file around 25G (could be more later ), I get error line 26 in merge [for iterable in iterables]

ValueError: I/o operation on closed file. Any idea ?