Welcome, guest | Sign In | My Account | Store | Cart
class QueueInterrupt(Exception):
    pass

class InterruptibleQueue(Queue.Queue):
    """Subclass of Queue allows one to interrupt producers and consumers."""

    def __init__(self,maxsize=0):
        Queue.Queue.__init__(self,maxsize)
        self.consumer_interrupt = False
        self.producer_interrupt = False
    
    def interrupt_all_consumers(self):
        """Raise QueueInterrupt in all consumer threads.

        Any thread currently waiting to get an item, and any subsequent thread
        that calls the get() method, will receive the QueueInterrupt exception.

        """
        self.not_empty.acquire()
        self.consumer_interrupt = True
        self.not_empty.notifyAll()
        self.not_empty.release()
    
    def interrupt_all_producers(self):
        """Raise QueueInterrupt in all producer threads.

        Any thread currently waiting to put an item, and any subsequent thread
        that calls the put() method, will receive the QueueInterrupt exception.

        """
        self.not_full.acquire()
        self.producer_interrupt = True
        self.not_full.notifyAll()
        self.not_full.release()

    def _empty(self):
        if self.consumer_interrupt:
            return False
        return Queue.Queue._empty(self)

    def _full(self):
        if self.producer_interrupt:
            return False
        return Queue.Queue._full(self)

    def _get(self):
        if self.consumer_interrupt:
            raise QueueInterrupt
        return Queue.Queue._get(self)

    def _put(self,item):
        if self.producer_interrupt:
            raise QueueInterrupt
        Queue.Queue._put(self,item)

History

  • revision 3 (15 years ago)
  • previous revisions are not available