A class that contains a dictionary of named queues, with read requests blocking until a message has been added to any one of a supplied list of queues.
Python, 125 lines
It is sometimes useful to be able to poll a number of message queues simultaneously to see if any message has arrived on any queue. The MultiQueue class demonstrated here makes it possible to add messages to a named queue, and retrieve a message from any of a list of named queues.
The test code below illustrates the desired behaviour of the multiqueue:
<pre> from threading import Thread, RLock, Event
class AsyncResult: """Represents an asynchronous operation that may not have completed yet.""" def __init__(self): self.completed = False self.failed = False self.__wait = Event() self.__callbacks =  self.__errbacks =  self.__retval = None self.__error = None self.__lock = RLock()
def threaded(method): def _threaded(args, *kwargs): async_result = AsyncResult() def _method(): try: result = method(args, *kwargs) except Exception, e: async_result.fail(e) return async_result.succeed(result) Thread(target = _method).start() return async_result return _threaded
class TestMultiQueue: def __init__(self, maxsize): self.mq = MultiQueue(maxsize)
print "\nStart\n" tmq = TestMultiQueue(3)
One queue at a time:
tmq.test_get(1) tmq.test_put(1, 1) # Should be fetched immediately tmq.test_put(1, 2) tmq.test_put(1, 3) tmq.test_put(1, 4) tmq.test_put(1, 5) # Should not be put immediately tmq.test_put(2, 1) # Should be put immediately tmq.test_get(1) # Should be fetched immediately, and allow 5 to be put into queue 1 for i in xrange(3): tmq.test_get(1) # Should empty queue 1 tmq.test_get(2) tmq.test_get(2) # Should wait until a value is available tmq.test_put(2, 2) # Should be fetched immediately
Several queues at once:
for i in xrange(3): tmq.test_get(1, 2, 3) tmq.test_put(1, 1) tmq.test_put(2, 2) tmq.test_put(3, 3)
print "\nEnd\n" </pre>