I needed multiple consumers to retrieve data from a queue fed by one producer. Could not find a good working code for that, so I implemented my own queue. Docstring should describe how this works.
Two notes: 1) my code uses multiprocessing code, but in this module, the Lock and Condition could be easily replaced with the same objects from the threading module 2) the attached test uses syntax for "nose" testing package, I did not convert it to doctest or UnitTest.
| '''
A tagged queue. The producer(s) place items in the queue, each item _may_ have
a list of tags attached to it. Consumer(s) may request an item of a given tag.
The tag is then "consumed" from this item. The item is removed from the queue
when all its tags are consumed.
For better understanding, think about it as set of discinct queues,
one for a given tag (and one for no tag).
This will be appropriate for most cases (although internal
plumbing is different), except for full() which counts all
items regardless of tags to guarrantee maximum object count in memory
If tags are not used, the TaggedQueue behaves exactly as Queue.Queue,
but performance is worse (see below).
Assumptions:
- tag can be anything that works as a dictionary index
- when putting an item in the queue, any number of tags can be used,
but list must be provided (or the tag_list parameter not used at all)
- when getting an item for the queue, up to 1 tag can be provided.
Example:
>>> q.put('red only', tag_list=['red'])
>>> q.put('red and blue', tag_list=['red', 'blue'])
>>> q.put('blue only', tag_list=['blue'])
>>> q.put('both colours', tag_list=['red', 'blue'])
>>> q.get(tag='blue')
'red and blue'
>>> q.get(tag='blue')
'blue only'
>>> q.get(tag='red')
'red only'
>>> q.get(tag='red')
'red and blue'
>>> q.get(tag='red')
'both colours'
>>> q.get(tag='blue')
'both colours'
Performance (in seconds, smaller is better):
- adding an object in the queue 100000 times, then getting it back 100000 times
(max queue length is 100000)
Queue.Queue: 1.124342
TaggedQueue without tags: 5.291548
TaggedQueue with 10 distinct tags: 2.100272
TaggedQueue with 1000 distinct tags: 1.692542
- 100000 consecutive put() - get() calls, (max queue length is 1)
Queue.Queue: 1.158024
TaggedQueue without tags: 2.218153
TaggedQueue with 10 distinct tags: 2.270345
TaggedQueue with 1000 distinct tags: 2.344355
'''
import multiprocessing
from sys import maxint
from time import time as _time
def id_generator():
'''a generator returning numeric ID's as identifiers'''
_id = 0
while True:
_id += 1
if _id == maxint:
_id = 0
yield _id
class Empty(Exception):
'''raised when TaggedQueue is empty for a given tag'''
class Full(Exception):
'''raised when TaggedQueue is full (regardless of tags)'''
NOTAG = '_notag'
# based on Queue.Queue (no surprise)
class TaggedQueue(object):
'''A queue similar to Queue.Queue, but producers can tag items
and consumers can specify what tag are they interested in.
If an item has multiple tags, it is removed from the queue only after
all tags attached to it have been consumed.
'''
def __init__(self, maxsize=0):
'''initialize the queue'''
self.mutex = multiprocessing.Lock()
self.not_empty = multiprocessing.Condition(self.mutex)
self.not_full = multiprocessing.Condition(self.mutex)
self.maxsize = maxsize
self._tags = {} # list of refid's for each tag
self._queue = {} # the actual queue data
self._refcount = {} # how many tags refer to a given refid in the queue
self.id_generator = id_generator()
def qsize(self, tag=NOTAG):
'''Return the approximate size of the queue for a given tag.'''
self.mutex.acquire()
try:
n = len(self._tags[tag])
except KeyError:
n = 0
self.mutex.release()
return n
def empty(self, tag=NOTAG):
'''Return True if the queue is empty for a given tag'''
self.mutex.acquire()
n = self._empty(tag)
self.mutex.release()
return n
def full(self):
'''Return True if the queue is full, regardless of tags'''
self.mutex.acquire()
n = self._full()
self.mutex.release()
return n
def get(self, tag=NOTAG, block=True, timeout=None):
'''get item object with defined tag from the queue, assuming only
one tag
'''
self.not_empty.acquire()
try:
if not block:
if self._empty(tag):
raise Empty
elif timeout is None:
while self._empty(tag):
self.not_empty.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._empty(tag):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get(tag)
self.not_full.notify()
return item
finally:
self.not_empty.release()
def get_nowait(self, tag=NOTAG):
'''get tagged item immediatelly or raise an exception'''
return self.get(tag, block=False)
def put(self, item, tag_list=[NOTAG], block=True, timeout=None):
'''put an item in queue, tag_list is an optional list of tags
to be attached to the item
'''
self.not_full.acquire()
try:
if not block:
if self._full():
raise Full
elif timeout is None:
while self._full():
self.not_full.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._full():
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item, tag_list)
self.not_empty.notify()
finally:
self.not_full.release()
def put_nowait(self, item, tag_list=[NOTAG]):
'''put an item to the list immediatelly or raise an exception'''
return self.put(item, tag_list, False)
def _empty(self, tag):
return not (tag in self._tags)
def _full(self):
return self.maxsize > 0 and len(self._refcount) == self.maxsize
def _get(self, tag):
if not tag in self._tags:
raise Empty
# get item's refid of the item
refid = self._tags[tag].pop(0) # get and remove first item from list
if not self._tags[tag]: # no item in the queue have this tag anymore
del(self._tags[tag])
# get the item from the queue
item = self._queue[refid]
# decrease the reference counter
self._refcount[refid] -= 1
if self._refcount[refid] == 0: # no more references
del self._refcount[refid]
del self._queue[refid]
return item
def _put(self, item, tag_list):
refid = self.id_generator.next()
self._queue[refid] = item
self._refcount[refid] = len(tag_list)
for t in tag_list:
try:
self._tags[t].append(refid)
except KeyError:
self._tags[t] = [refid]
## --- test_tagged_queue.py
'''
Test that TaggedQueue works as expected
'''
from tagged_queue import TaggedQueue, Empty
def test_empty_queue():
'''test that TaggedQueue returns Empty correctly'''
q = TaggedQueue()
try:
q.get_nowait()
assert False
except Empty:
pass
try:
q.get_nowait(tag='non_existing')
assert False
except Empty:
pass
def test_put_and_get():
'''test that tags are honoured for basic queue operations'''
q = TaggedQueue()
q.put_nowait('no tag')
assert q.get_nowait() == 'no tag'
q.put_nowait('test data', tag_list=['tag1', 'tag2', 3])
try:
q.get_nowait()
assert False
except Empty:
pass
q.get_nowait(tag=3)
q.get_nowait(tag='tag1')
q.get_nowait(tag='tag2')
try:
q.get_nowait(tag='tag2')
assert False
except Empty:
pass
def test_empty():
'''test that .empty() function works as expected'''
q = TaggedQueue()
assert q.empty()
assert q.empty(tag='tag1')
q.put_nowait('test data')
assert not q.empty()
assert q.empty(tag='tag1')
q.put_nowait('other test data', tag_list=['tag1'])
assert not q.empty()
assert not q.empty(tag='tag1')
q.get_nowait()
assert q.empty()
assert not q.empty(tag='tag1')
q.get_nowait(tag='tag1')
assert q.empty()
assert q.empty('tag1')
def test_qsize():
'''test that .qsize() function works as expected'''
q = TaggedQueue()
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 0
q.put_nowait('test data')
assert q.qsize() == 1
assert q.qsize(tag='tag1') == 0
q.put_nowait('other test data', tag_list=['tag1'])
assert q.qsize() == 1
assert q.qsize(tag='tag1') == 1
q.put_nowait('other test data', tag_list=['tag1'])
assert q.qsize() == 1
assert q.qsize(tag='tag1') == 2
q.get_nowait()
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 2
q.get_nowait(tag='tag1')
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 1
q.get_nowait(tag='tag1')
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 0
def test_full():
'''test that .full() function works as expected'''
q = TaggedQueue(maxsize=3)
assert not q.full()
q.put_nowait('data')
assert not q.full()
q.put_nowait('other data', tag_list=['some_tag'])
assert not q.full()
q.put_nowait('yet another data')
assert q.full()
def test_ordering():
'''test that queue ordering works properly'''
q = TaggedQueue()
q.put_nowait('red only', tag_list=['red'])
q.put_nowait('red and blue', tag_list=['red', 'blue'])
q.put_nowait('blue only', tag_list=['blue'])
q.put_nowait('both colours', tag_list=['red', 'blue'])
assert q.get_nowait(tag='blue') == 'red and blue'
assert q.get_nowait(tag='blue') == 'blue only'
assert q.get_nowait(tag='red') == 'red only'
assert q.get_nowait(tag='red') == 'red and blue'
assert q.get_nowait(tag='red') == 'both colours'
assert q.get_nowait(tag='blue') == 'both colours'
|