Welcome, guest | Sign In | My Account | Store | Cart

A buffer that will partially consume an iterator in the background.

Very useful for reading files and merging the data using the excellent http://code.activestate.com/recipes/491285/

Python, 96 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/python
# vim: ts=4 sw=4 expandtab fileencoding=UTF-8 :

import Queue
from threading import Thread


# Object used by _background_consumer to signal the source is exhausted
# to the main thread.
_sentinel = object()


class _background_consumer(Thread):
    """Will fill the queue with content of the source in a separate thread.

    >>> import Queue
    >>> q = Queue.Queue()
    >>> c = _background_consumer(q, range(3))
    >>> c.start()
    >>> q.get(True, 1)
    0
    >>> q.get(True, 1)
    1
    >>> q.get(True, 1)
    2
    >>> q.get(True, 1) is _sentinel
    True
    """
    def __init__(self, queue, source):
        Thread.__init__(self)

        self._queue = queue
        self._source = source

    def run(self):
        for item in self._source:
            self._queue.put(item)

        # Signal the consumer we are done.
        self._queue.put(_sentinel)


class ibuffer(object):
    """Buffers content of an iterator polling the contents of the given
    iterator in a separate thread.
    When the consumer is faster than many producers, this kind of
    concurrency and buffering makes sense.

    The size parameter is the number of elements to buffer.

    The source must be threadsafe.

    Next is a slow task:
    >>> from itertools import chain
    >>> import time
    >>> def slow_source():
    ...     for i in range(10):
    ...         time.sleep(0.1)
    ...         yield i
    ...
    >>>
    >>> t0 = time.time()
    >>> max(chain(*( slow_source() for _ in range(10) )))
    9
    >>> int(time.time() - t0)
    10

    Now with the ibuffer:
    >>> t0 = time.time()
    >>> max(chain(*( ibuffer(5, slow_source()) for _ in range(10) )))
    9
    >>> int(time.time() - t0)
    4

    60% FASTER!!!!!11
    """
    def __init__(self, size, source):
        self._queue = Queue.Queue(size)

        self._poller = _background_consumer(self._queue, source)
        self._poller.daemon = True
        self._poller.start()

    def __iter__(self):
        return self

    def next(self):
        item = self._queue.get(True)
        if item is _sentinel:
            raise StopIteration()
        return item


if __name__ == "__main__":
    import doctest
    doctest.testmod()

Created this module to speed up IO but it is insanely slow for that.

Without ibuffer:

python -m timeit -s 'from ibuffer import ibuffer; from glob import iglob; from itertools import chain' "for i in chain.from_iterable([ open(fn) for fn in iglob('/var/log/everything.*') ]): pass" 10 loops, best of 3: 34.9 msec per loop

With ibuffer and queues of 10 elements:

python -m timeit -s 'from ibuffer import ibuffer; from glob import iglob; from itertools import chain' "for i in chain.from_iterable([ ibuffer(10, open(fn)) for fn in iglob('/var/log/everything.*') ]): pass" 10 loops, best of 3: 4.44 sec per loop

With ibuffer and queues of 1000 elements:

[19:30:00] pts/5$ python -m timeit -s 'from ibuffer import ibuffer; from glob import iglob; from itertools import chain' "for i in chain.from_iterable([ ibuffer(1000, open(fn)) for fn in iglob('/var/log/everything.*') ]): pass" 10 loops, best of 3: 2.73 sec per loop

Those timings are with warm caches. I expected this case to be somewhat slower but this is insane!

Suggestions to make it faster?

3 comments

Gabriel Genellina 14 years, 3 months ago  # | flag

You may avoid the timeout if you put a sentinel value in the queue when the source is exhausted, instead of setting done=True. That is:

sentinel = object() # near top of module

# _background_consumer
def run(): 
    ... 
    self._queue.put(sentinel)

# ibuffer
def next(self):
    item = self._queue.get()
    if item is sentinel:
        raise StopIteration
    return item
Javier Ruere (author) 13 years, 11 months ago  # | flag

Thanks! That's an excellent way to do it!

Javier Ruere 9 years, 1 month ago  # | flag