Welcome, guest | Sign In | My Account | Store | Cart

A Queue that allows the producer thread to raise an exception in the consumer threads, or vice versa.

Python, 54 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class QueueInterrupt(Exception):
    pass

class InterruptibleQueue(Queue.Queue):
    """Subclass of Queue allows one to interrupt producers and consumers."""

    def __init__(self,maxsize=0):
        Queue.Queue.__init__(self,maxsize)
        self.consumer_interrupt = False
        self.producer_interrupt = False
    
    def interrupt_all_consumers(self):
        """Raise QueueInterrupt in all consumer threads.

        Any thread currently waiting to get an item, and any subsequent thread
        that calls the get() method, will receive the QueueInterrupt exception.

        """
        self.not_empty.acquire()
        self.consumer_interrupt = True
        self.not_empty.notifyAll()
        self.not_empty.release()
    
    def interrupt_all_producers(self):
        """Raise QueueInterrupt in all producer threads.

        Any thread currently waiting to put an item, and any subsequent thread
        that calls the put() method, will receive the QueueInterrupt exception.

        """
        self.not_full.acquire()
        self.producer_interrupt = True
        self.not_full.notifyAll()
        self.not_full.release()

    def _empty(self):
        if self.consumer_interrupt:
            return False
        return Queue.Queue._empty(self)

    def _full(self):
        if self.producer_interrupt:
            return False
        return Queue.Queue._full(self)

    def _get(self):
        if self.consumer_interrupt:
            raise QueueInterrupt
        return Queue.Queue._get(self)

    def _put(self,item):
        if self.producer_interrupt:
            raise QueueInterrupt
        Queue.Queue._put(self,item)

There are many cases in threaded applications where a producer or consumer thread becomes aware of some situation and has to notify the other thread of that situation immediately.

One obvious case is when the user cancels the entire operation: whichever thread is aware of it has to notify the other one. Consider a slide show viewer that has a producer thread that runs in the background and prerenders slides. It puts the slides into a Queue, and the consumer thread gets one whenever the user requests a new slide. If the user were to quit the application, only the consumer thead would know about it (since it is the GUI thread). The consumer has to notify the producer to stop producing and terminate.

A straightforward way to do this is to cause an exception to be raised in the threads on the other end of the Queue. That's what this class does. It adds methods to set flags indicating whether a current or subsequent call to either put or get should result in an exception.

In the above example, when user quits, the consumer thread would call queue.interrupt_all_producers(), which sets the producer interrupt flag and notifies all waiting threads. If the producer is waiting on the queue an exception is immediately raised; otherwise the exception is raised as soon as it calls queue.put().

This recipe is a minimal implementation of this technique and can be improved in several obvious ways, such as allowing threads to pass data to the threads they're interrupting via the QueueInterrupt object, or providing methods to cancel the interrupt state.

3 comments

Carl Banks (author) 13 years ago  # | flag

I realized I had misspelled "Interruptible" as "Interruptable". It is now corrected.

Jason Heeris 11 years, 3 months ago  # | flag

There is one problem with this. Say you have one thread that calls interrupt_all_consumers() when the queue is empty. Then in the consumer thread, the get() call will appear to block. Internally to Queue, it is looping over:

while not self._qsize():
    self.not_empty.wait()

...before it ever makes the call to _get().

I get around this by writing:

self.queue.put(None)
self.queue.interrupt_all_consumers()

But this may not be appropriate in all cases.

Carl Banks (author) 10 years, 10 months ago  # | flag

The reason for that problem is the implementation of Queue changed sometime between 2.5 and 2.7. Queue.get used to call _empty(); for some reason--a good one, I'm sure--they changed it to call _qsize().

Such is the danger of subclassing code you don't control.

Now I'll probably have to come up with a version that works with all Python 2.x at least.

Created by Carl Banks on Thu, 28 Aug 2008 (MIT)
Python recipes (4591)
Carl Banks's recipes (4)

Required Modules

  • (none specified)

Other Information and Tasks