A class that contains a dictionary of named queues, with read requests blocking until a message has been added to any one of a supplied list of queues.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | from sets import Set
import thread
from threading import Thread
class Lockable:
def __init__(self):
self.mutex = thread.allocate_lock()
def locked(method):
def _locked(self, *args, **kwargs):
self.mutex.acquire()
try:
result = method(self, *args, **kwargs)
finally:
self.mutex.release()
return result
return _locked
class TaggedSignal:
def __init__(self, names):
self.names = names
self.lock = thread.allocate_lock()
self.lock.acquire()
self.tag = None
def set(self, tag):
self.tag = tag
self.lock.release()
def wait(self):
self.lock.acquire()
self.lock.release()
return self.tag
class MultiQueue(Lockable):
def __init__(self, maxsize):
self.maxsize = maxsize
self.queues = {}
self.writeLocks = {}
self.waitingLists = {}
Lockable.__init__(self)
def put(self, name, value):
writeLock = self.getWriteLock(name)
must_release = True
writeLock.acquire()
try:
must_release = self._put(name, value, writeLock)
finally:
if must_release:
writeLock.release()
def _put(self, name, value, writeLock):
if self.waitingLists.has_key(name):
self.notify_signal(name, value)
return True
else:
queue = self.getQueue(name)
queue.append(value)
return len(queue)<self.maxsize
_put = locked(_put)
def notify_signal(self, name, value):
signal = self.waitingLists[name][0]
signal.set((name, value))
for name in signal.names:
waitingList = self.waitingLists[name]
waitingList.remove(signal)
if len(waitingList)==0:
del self.waitingLists[name]
def get(self, *names):
must_wait, result = self._get(names)
if must_wait:
return result.wait()
else:
return result
def _get(self, names):
populatedQueues = Set(self.queues.keys()).intersection(names)
if len(populatedQueues)==0:
return True, self.createSignal(names)
else:
return False, self.getValue(populatedQueues.pop())
_get = locked(_get)
def getValue(self, queueName):
queue = self.queues[queueName]
was_full = len(queue)==self.maxsize
value = queue.pop(0)
if len(queue)==0:
del self.queues[queueName]
del self.writeLocks[queueName]
else:
if was_full:
self.writeLocks[queueName].release()
return queueName, value
def createSignal(self, names):
signal = TaggedSignal(names)
for name in names:
try:
waitingList = self.waitingLists[name]
except KeyError:
waitingList = self.waitingLists[name] = []
waitingList.append(signal)
return signal
def getWriteLock(self, name):
try:
writeLock = self.writeLocks[name]
except KeyError:
writeLock = self.writeLocks[name] = thread.allocate_lock()
return writeLock
getWriteLock = locked(getWriteLock)
def getQueue(self, name):
try:
queue = self.queues[name]
except KeyError:
queue = self.queues[name] = []
return queue
|
It is sometimes useful to be able to poll a number of message queues simultaneously to see if any message has arrived on any queue. The MultiQueue class demonstrated here makes it possible to add messages to a named queue, and retrieve a message from any of a list of named queues.
The test code below illustrates the desired behaviour of the multiqueue:
<pre> from threading import Thread, RLock, Event
class AsyncResult: """Represents an asynchronous operation that may not have completed yet.""" def __init__(self): self.completed = False self.failed = False self.__wait = Event() self.__callbacks = [] self.__errbacks = [] self.__retval = None self.__error = None self.__lock = RLock()
def complete(self):
self.__lock.acquire()
self.completed = True
self.__wait.set()
self.__lock.release()
def succeed(self, retval):
self.__retval = retval
self.complete()
for callback in self.__callbacks:
callback(retval)
self.clearCallbacks()
def fail(self, error):
self.__error = error
self.failed = True
self.complete()
for errback in self.__errbacks:
errback(error)
self.clearCallbacks()
def clearCallbacks(self):
self.__callbacks = []
self.__errbacks = []
def addCallback(self, callback, errback=None):
self.__lock.acquire()
try:
if self.completed:
if not self.failed:
callback(self.__retval)
else:
self.__callbacks.append(callback)
if not errback == None:
self.addErrback(errback)
finally:
self.__lock.release()
def addErrback(self, errback):
self.__lock.acquire()
try:
if self.completed:
if self.failed:
errback(self.__error)
else:
self.__errbacks.append(errback)
finally:
self.__lock.release()
def __getResult(self):
self.__wait.wait()
if not self.failed:
return self.__retval
else:
raise self.__error
result=property(__getResult)
def threaded(method): def _threaded(args, *kwargs): async_result = AsyncResult() def _method(): try: result = method(args, *kwargs) except Exception, e: async_result.fail(e) return async_result.succeed(result) Thread(target = _method).start() return async_result return _threaded
class TestMultiQueue: def __init__(self, maxsize): self.mq = MultiQueue(maxsize)
def test_put(self, name, value):
print "Putting value %s into queue %s" % (str(value), str(name))
self.mq.put(name, value)
print "--Put value %s into queue %s" % (str(value), str(name))
test_put=threaded(test_put)
def test_get(self, *names):
print "Getting value from queue(s) %s" % (str(names))
name, value = self.mq.get(*names)
print "--Got value %s from queue %s while waiting for queues(s) %s" % (str(value), str(name), str(names))
test_get = threaded(test_get)
print "\nStart\n" tmq = TestMultiQueue(3)
One queue at a time:
tmq.test_get(1) tmq.test_put(1, 1) # Should be fetched immediately tmq.test_put(1, 2) tmq.test_put(1, 3) tmq.test_put(1, 4) tmq.test_put(1, 5) # Should not be put immediately tmq.test_put(2, 1) # Should be put immediately tmq.test_get(1) # Should be fetched immediately, and allow 5 to be put into queue 1 for i in xrange(3): tmq.test_get(1) # Should empty queue 1 tmq.test_get(2) tmq.test_get(2) # Should wait until a value is available tmq.test_put(2, 2) # Should be fetched immediately
Several queues at once:
for i in xrange(3): tmq.test_get(1, 2, 3) tmq.test_put(1, 1) tmq.test_put(2, 2) tmq.test_put(3, 3)
print "\nEnd\n" </pre>