#!/usr/bin/python # vim: ts=4 sw=4 expandtab fileencoding=UTF-8 : import Queue from threading import Thread # Object used by _background_consumer to signal the source is exhausted # to the main thread. _sentinel = object() class _background_consumer(Thread): """Will fill the queue with content of the source in a separate thread. >>> import Queue >>> q = Queue.Queue() >>> c = _background_consumer(q, range(3)) >>> c.start() >>> q.get(True, 1) 0 >>> q.get(True, 1) 1 >>> q.get(True, 1) 2 >>> q.get(True, 1) is _sentinel True """ def __init__(self, queue, source): Thread.__init__(self) self._queue = queue self._source = source def run(self): for item in self._source: self._queue.put(item) # Signal the consumer we are done. self._queue.put(_sentinel) class ibuffer(object): """Buffers content of an iterator polling the contents of the given iterator in a separate thread. When the consumer is faster than many producers, this kind of concurrency and buffering makes sense. The size parameter is the number of elements to buffer. The source must be threadsafe. Next is a slow task: >>> from itertools import chain >>> import time >>> def slow_source(): ... for i in range(10): ... time.sleep(0.1) ... yield i ... >>> >>> t0 = time.time() >>> max(chain(*( slow_source() for _ in range(10) ))) 9 >>> int(time.time() - t0) 10 Now with the ibuffer: >>> t0 = time.time() >>> max(chain(*( ibuffer(5, slow_source()) for _ in range(10) ))) 9 >>> int(time.time() - t0) 4 60% FASTER!!!!!11 """ def __init__(self, size, source): self._queue = Queue.Queue(size) self._poller = _background_consumer(self._queue, source) self._poller.daemon = True self._poller.start() def __iter__(self): return self def next(self): item = self._queue.get(True) if item is _sentinel: raise StopIteration() return item if __name__ == "__main__": import doctest doctest.testmod()