Welcome, guest | Sign In | My Account | Store | Cart
NOTE: Recipes have moved! Please visit GitHub.com/activestate/code for the current versions.

A class that contains a dictionary of named queues, with read requests blocking until a message has been added to any one of a supplied list of queues.

Python, 125 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from sets import Set
import thread
from threading import Thread


class Lockable:
   def __init__(self):
        self.mutex = thread.allocate_lock()
        
def locked(method):
   def _locked(self, *args, **kwargs):
       self.mutex.acquire()
       try:
           result = method(self, *args, **kwargs)
       finally:
           self.mutex.release()
       return result
   return _locked


class TaggedSignal:
   def __init__(self, names):
       self.names = names
       self.lock = thread.allocate_lock()
       self.lock.acquire()
       self.tag = None
       
   def set(self, tag):
       self.tag = tag
       self.lock.release()

   def wait(self):
       self.lock.acquire()
       self.lock.release()
       return self.tag


class MultiQueue(Lockable):
    def __init__(self, maxsize):
        self.maxsize = maxsize
        self.queues = {}
        self.writeLocks = {}
        self.waitingLists = {}
        Lockable.__init__(self)

    def put(self, name, value):
        writeLock = self.getWriteLock(name)
        must_release = True
        writeLock.acquire()
        try:
            must_release = self._put(name, value, writeLock)
        finally:
            if must_release:
                writeLock.release()

    def _put(self, name, value, writeLock):        
        if self.waitingLists.has_key(name):
            self.notify_signal(name, value)
            return True
        else:
            queue = self.getQueue(name)
            queue.append(value)
            return len(queue)<self.maxsize
    _put = locked(_put)

    def notify_signal(self, name, value):
        signal = self.waitingLists[name][0]
        signal.set((name, value))
        for name in signal.names:
            waitingList = self.waitingLists[name]
            waitingList.remove(signal)
            if len(waitingList)==0:
                del self.waitingLists[name]

    def get(self, *names):
        must_wait, result = self._get(names)
        if must_wait:
            return result.wait()
        else:
            return result
            
    def _get(self, names):
        populatedQueues = Set(self.queues.keys()).intersection(names)
        if len(populatedQueues)==0:
            return True, self.createSignal(names)
        else:
            return False, self.getValue(populatedQueues.pop())
    _get = locked(_get)
    
    def getValue(self, queueName):
        queue = self.queues[queueName]
        was_full = len(queue)==self.maxsize
        value = queue.pop(0)
        if len(queue)==0:
            del self.queues[queueName]
            del self.writeLocks[queueName]
        else:
            if was_full:
                self.writeLocks[queueName].release()
        return queueName, value
            
    def createSignal(self, names):
        signal = TaggedSignal(names)
        for name in names:
            try:
                waitingList = self.waitingLists[name]
            except KeyError:
                waitingList = self.waitingLists[name] = []
            waitingList.append(signal)
        return signal

    def getWriteLock(self, name):
        try:
            writeLock = self.writeLocks[name]
        except KeyError:
            writeLock = self.writeLocks[name] = thread.allocate_lock()
        return writeLock
    getWriteLock = locked(getWriteLock)

    def getQueue(self, name):
        try:
            queue = self.queues[name]
        except KeyError:
            queue = self.queues[name] = []
        return queue

It is sometimes useful to be able to poll a number of message queues simultaneously to see if any message has arrived on any queue. The MultiQueue class demonstrated here makes it possible to add messages to a named queue, and retrieve a message from any of a list of named queues.

The test code below illustrates the desired behaviour of the multiqueue:

<pre> from threading import Thread, RLock, Event

class AsyncResult: """Represents an asynchronous operation that may not have completed yet.""" def __init__(self): self.completed = False self.failed = False self.__wait = Event() self.__callbacks = [] self.__errbacks = [] self.__retval = None self.__error = None self.__lock = RLock()

def complete(self):
    self.__lock.acquire()
    self.completed = True
    self.__wait.set()
    self.__lock.release()

def succeed(self, retval):
    self.__retval = retval
    self.complete()
    for callback in self.__callbacks:
        callback(retval)
    self.clearCallbacks()

def fail(self, error):
    self.__error = error
    self.failed = True
    self.complete()
    for errback in self.__errbacks:
        errback(error)
    self.clearCallbacks()

def clearCallbacks(self):
    self.__callbacks = []
    self.__errbacks = []

def addCallback(self, callback, errback=None):
    self.__lock.acquire()
    try:
        if self.completed:
            if not self.failed:
                callback(self.__retval)
        else:
            self.__callbacks.append(callback)
        if not errback == None:
            self.addErrback(errback)
    finally:
        self.__lock.release()

def addErrback(self, errback):
    self.__lock.acquire()
    try:
        if self.completed:
            if self.failed:
                errback(self.__error)
        else:
            self.__errbacks.append(errback)
    finally:
        self.__lock.release()

def __getResult(self):
    self.__wait.wait()
    if not self.failed:
        return self.__retval
    else:
        raise self.__error
result=property(__getResult)

def threaded(method): def _threaded(args, *kwargs): async_result = AsyncResult() def _method(): try: result = method(args, *kwargs) except Exception, e: async_result.fail(e) return async_result.succeed(result) Thread(target = _method).start() return async_result return _threaded

class TestMultiQueue: def __init__(self, maxsize): self.mq = MultiQueue(maxsize)

def test_put(self, name, value):
    print "Putting value %s into queue %s" % (str(value), str(name))
    self.mq.put(name, value)
    print "--Put value %s into queue %s" % (str(value), str(name))
test_put=threaded(test_put)

def test_get(self, *names):
    print "Getting value from queue(s) %s" % (str(names))
    name, value = self.mq.get(*names)
    print "--Got value %s from queue %s while waiting for queues(s) %s" % (str(value), str(name), str(names))
test_get = threaded(test_get)

print "\nStart\n" tmq = TestMultiQueue(3)

One queue at a time:

tmq.test_get(1) tmq.test_put(1, 1) # Should be fetched immediately tmq.test_put(1, 2) tmq.test_put(1, 3) tmq.test_put(1, 4) tmq.test_put(1, 5) # Should not be put immediately tmq.test_put(2, 1) # Should be put immediately tmq.test_get(1) # Should be fetched immediately, and allow 5 to be put into queue 1 for i in xrange(3): tmq.test_get(1) # Should empty queue 1 tmq.test_get(2) tmq.test_get(2) # Should wait until a value is available tmq.test_put(2, 2) # Should be fetched immediately

Several queues at once:

for i in xrange(3): tmq.test_get(1, 2, 3) tmq.test_put(1, 1) tmq.test_put(2, 2) tmq.test_put(3, 3)

print "\nEnd\n" </pre>