class QueueInterrupt(Exception):
pass
class InterruptibleQueue(Queue.Queue):
"""Subclass of Queue allows one to interrupt producers and consumers."""
def __init__(self,maxsize=0):
Queue.Queue.__init__(self,maxsize)
self.consumer_interrupt = False
self.producer_interrupt = False
def interrupt_all_consumers(self):
"""Raise QueueInterrupt in all consumer threads.
Any thread currently waiting to get an item, and any subsequent thread
that calls the get() method, will receive the QueueInterrupt exception.
"""
self.not_empty.acquire()
self.consumer_interrupt = True
self.not_empty.notifyAll()
self.not_empty.release()
def interrupt_all_producers(self):
"""Raise QueueInterrupt in all producer threads.
Any thread currently waiting to put an item, and any subsequent thread
that calls the put() method, will receive the QueueInterrupt exception.
"""
self.not_full.acquire()
self.producer_interrupt = True
self.not_full.notifyAll()
self.not_full.release()
def _empty(self):
if self.consumer_interrupt:
return False
return Queue.Queue._empty(self)
def _full(self):
if self.producer_interrupt:
return False
return Queue.Queue._full(self)
def _get(self):
if self.consumer_interrupt:
raise QueueInterrupt
return Queue.Queue._get(self)
def _put(self,item):
if self.producer_interrupt:
raise QueueInterrupt
Queue.Queue._put(self,item)