A Queue that allows the producer thread to raise an exception in the consumer threads, or vice versa.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
class QueueInterrupt(Exception): pass class InterruptibleQueue(Queue.Queue): """Subclass of Queue allows one to interrupt producers and consumers.""" def __init__(self,maxsize=0): Queue.Queue.__init__(self,maxsize) self.consumer_interrupt = False self.producer_interrupt = False def interrupt_all_consumers(self): """Raise QueueInterrupt in all consumer threads. Any thread currently waiting to get an item, and any subsequent thread that calls the get() method, will receive the QueueInterrupt exception. """ self.not_empty.acquire() self.consumer_interrupt = True self.not_empty.notifyAll() self.not_empty.release() def interrupt_all_producers(self): """Raise QueueInterrupt in all producer threads. Any thread currently waiting to put an item, and any subsequent thread that calls the put() method, will receive the QueueInterrupt exception. """ self.not_full.acquire() self.producer_interrupt = True self.not_full.notifyAll() self.not_full.release() def _empty(self): if self.consumer_interrupt: return False return Queue.Queue._empty(self) def _full(self): if self.producer_interrupt: return False return Queue.Queue._full(self) def _get(self): if self.consumer_interrupt: raise QueueInterrupt return Queue.Queue._get(self) def _put(self,item): if self.producer_interrupt: raise QueueInterrupt Queue.Queue._put(self,item)
There are many cases in threaded applications where a producer or consumer thread becomes aware of some situation and has to notify the other thread of that situation immediately.
One obvious case is when the user cancels the entire operation: whichever thread is aware of it has to notify the other one. Consider a slide show viewer that has a producer thread that runs in the background and prerenders slides. It puts the slides into a Queue, and the consumer thread gets one whenever the user requests a new slide. If the user were to quit the application, only the consumer thead would know about it (since it is the GUI thread). The consumer has to notify the producer to stop producing and terminate.
A straightforward way to do this is to cause an exception to be raised in the threads on the other end of the Queue. That's what this class does. It adds methods to set flags indicating whether a current or subsequent call to either put or get should result in an exception.
In the above example, when user quits, the consumer thread would call
queue.interrupt_all_producers(), which sets the producer interrupt flag and notifies all waiting threads. If the producer is waiting on the queue an exception is immediately raised; otherwise the exception is raised as soon as it calls
This recipe is a minimal implementation of this technique and can be improved in several obvious ways, such as allowing threads to pass data to the threads they're interrupting via the QueueInterrupt object, or providing methods to cancel the interrupt state.