Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/python
# vim: ts=4 sw=4 expandtab fileencoding=UTF-8 :

import Queue
from threading import Thread


# Object used by _background_consumer to signal the source is exhausted
# to the main thread.
_sentinel = object()


class _background_consumer(Thread):
    """Will fill the queue with content of the source in a separate thread.

    >>> import Queue
    >>> q = Queue.Queue()
    >>> c = _background_consumer(q, range(3))
    >>> c.start()
    >>> q.get(True, 1)
    0
    >>> q.get(True, 1)
    1
    >>> q.get(True, 1)
    2
    >>> q.get(True, 1) is _sentinel
    True
    """
    def __init__(self, queue, source):
        Thread.__init__(self)

        self._queue = queue
        self._source = source

    def run(self):
        for item in self._source:
            self._queue.put(item)

        # Signal the consumer we are done.
        self._queue.put(_sentinel)


class ibuffer(object):
    """Buffers content of an iterator polling the contents of the given
    iterator in a separate thread.
    When the consumer is faster than many producers, this kind of
    concurrency and buffering makes sense.

    The size parameter is the number of elements to buffer.

    The source must be threadsafe.

    Next is a slow task:
    >>> from itertools import chain
    >>> import time
    >>> def slow_source():
    ...     for i in range(10):
    ...         time.sleep(0.1)
    ...         yield i
    ...
    >>>
    >>> t0 = time.time()
    >>> max(chain(*( slow_source() for _ in range(10) )))
    9
    >>> int(time.time() - t0)
    10

    Now with the ibuffer:
    >>> t0 = time.time()
    >>> max(chain(*( ibuffer(5, slow_source()) for _ in range(10) )))
    9
    >>> int(time.time() - t0)
    4

    60% FASTER!!!!!11
    """
    def __init__(self, size, source):
        self._queue = Queue.Queue(size)

        self._poller = _background_consumer(self._queue, source)
        self._poller.daemon = True
        self._poller.start()

    def __iter__(self):
        return self

    def next(self):
        item = self._queue.get(True)
        if item is _sentinel:
            raise StopIteration()
        return item


if __name__ == "__main__":
    import doctest
    doctest.testmod()

Diff to Previous Revision

--- revision 1 2010-01-10 17:48:11
+++ revision 2 2010-05-05 22:47:33
@@ -1,11 +1,16 @@
+#!/usr/bin/python
 # vim: ts=4 sw=4 expandtab fileencoding=UTF-8 :
 
-import itertools
 import Queue
-import threading
+from threading import Thread
 
 
-class _background_consumer(threading.Thread):
+# Object used by _background_consumer to signal the source is exhausted
+# to the main thread.
+_sentinel = object()
+
+
+class _background_consumer(Thread):
     """Will fill the queue with content of the source in a separate thread.
 
     >>> import Queue
@@ -18,27 +23,21 @@
     1
     >>> q.get(True, 1)
     2
-    >>> q.get_nowait()
-    Traceback (most recent call last):
-      ...
-    Empty
-    >>> c.done
+    >>> q.get(True, 1) is _sentinel
     True
     """
     def __init__(self, queue, source):
-        threading.Thread.__init__(self)
+        Thread.__init__(self)
 
         self._queue = queue
         self._source = source
-
-        self.done = False
 
     def run(self):
         for item in self._source:
             self._queue.put(item)
 
         # Signal the consumer we are done.
-        self.done = True
+        self._queue.put(_sentinel)
 
 
 class ibuffer(object):
@@ -63,35 +62,33 @@
     >>> t0 = time.time()
     >>> max(chain(*( slow_source() for _ in range(10) )))
     9
-    >>> print "Time:", int(time.time() - t0)
-    Time: 10
+    >>> int(time.time() - t0)
+    10
 
     Now with the ibuffer:
     >>> t0 = time.time()
     >>> max(chain(*( ibuffer(5, slow_source()) for _ in range(10) )))
     9
-    >>> print "Time:", int(time.time() - t0)
-    Time: 4
+    >>> int(time.time() - t0)
+    4
 
     60% FASTER!!!!!11
     """
     def __init__(self, size, source):
         self._queue = Queue.Queue(size)
-        self._source = source
 
         self._poller = _background_consumer(self._queue, source)
+        self._poller.daemon = True
         self._poller.start()
 
     def __iter__(self):
         return self
 
     def next(self):
-        while True:
-            try:
-                return self._queue.get(True, 0.01)
-            except Queue.Empty:
-                if self._poller.done:
-                    raise StopIteration()
+        item = self._queue.get(True)
+        if item is _sentinel:
+            raise StopIteration()
+        return item
 
 
 if __name__ == "__main__":

History