A Queue that allows the producer thread to raise an exception in the consumer threads, or vice versa.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | class QueueInterrupt(Exception):
pass
class InterruptibleQueue(Queue.Queue):
"""Subclass of Queue allows one to interrupt producers and consumers."""
def __init__(self,maxsize=0):
Queue.Queue.__init__(self,maxsize)
self.consumer_interrupt = False
self.producer_interrupt = False
def interrupt_all_consumers(self):
"""Raise QueueInterrupt in all consumer threads.
Any thread currently waiting to get an item, and any subsequent thread
that calls the get() method, will receive the QueueInterrupt exception.
"""
self.not_empty.acquire()
self.consumer_interrupt = True
self.not_empty.notifyAll()
self.not_empty.release()
def interrupt_all_producers(self):
"""Raise QueueInterrupt in all producer threads.
Any thread currently waiting to put an item, and any subsequent thread
that calls the put() method, will receive the QueueInterrupt exception.
"""
self.not_full.acquire()
self.producer_interrupt = True
self.not_full.notifyAll()
self.not_full.release()
def _empty(self):
if self.consumer_interrupt:
return False
return Queue.Queue._empty(self)
def _full(self):
if self.producer_interrupt:
return False
return Queue.Queue._full(self)
def _get(self):
if self.consumer_interrupt:
raise QueueInterrupt
return Queue.Queue._get(self)
def _put(self,item):
if self.producer_interrupt:
raise QueueInterrupt
Queue.Queue._put(self,item)
|
There are many cases in threaded applications where a producer or consumer thread becomes aware of some situation and has to notify the other thread of that situation immediately.
One obvious case is when the user cancels the entire operation: whichever thread is aware of it has to notify the other one. Consider a slide show viewer that has a producer thread that runs in the background and prerenders slides. It puts the slides into a Queue, and the consumer thread gets one whenever the user requests a new slide. If the user were to quit the application, only the consumer thead would know about it (since it is the GUI thread). The consumer has to notify the producer to stop producing and terminate.
A straightforward way to do this is to cause an exception to be raised in the threads on the other end of the Queue. That's what this class does. It adds methods to set flags indicating whether a current or subsequent call to either put or get should result in an exception.
In the above example, when user quits, the consumer thread would call queue.interrupt_all_producers()
, which sets the producer interrupt flag and notifies all waiting threads. If the producer is waiting on the queue an exception is immediately raised; otherwise the exception is raised as soon as it calls queue.put()
.
This recipe is a minimal implementation of this technique and can be improved in several obvious ways, such as allowing threads to pass data to the threads they're interrupting via the QueueInterrupt object, or providing methods to cancel the interrupt state.
I realized I had misspelled "Interruptible" as "Interruptable". It is now corrected.
There is one problem with this. Say you have one thread that calls interrupt_all_consumers() when the queue is empty. Then in the consumer thread, the get() call will appear to block. Internally to Queue, it is looping over:
...before it ever makes the call to _get().
I get around this by writing:
But this may not be appropriate in all cases.
The reason for that problem is the implementation of Queue changed sometime between 2.5 and 2.7. Queue.get used to call _empty(); for some reason--a good one, I'm sure--they changed it to call _qsize().
Such is the danger of subclassing code you don't control.
Now I'll probably have to come up with a version that works with all Python 2.x at least.