I needed multiple consumers to retrieve data from a queue fed by one producer. Could not find a good working code for that, so I implemented my own queue. Docstring should describe how this works.
Two notes: 1) my code uses multiprocessing code, but in this module, the Lock and Condition could be easily replaced with the same objects from the threading module 2) the attached test uses syntax for "nose" testing package, I did not convert it to doctest or UnitTest.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 | '''
A tagged queue. The producer(s) place items in the queue, each item _may_ have
a list of tags attached to it. Consumer(s) may request an item of a given tag.
The tag is then "consumed" from this item. The item is removed from the queue
when all its tags are consumed.
For better understanding, think about it as set of discinct queues,
one for a given tag (and one for no tag).
This will be appropriate for most cases (although internal
plumbing is different), except for full() which counts all
items regardless of tags to guarrantee maximum object count in memory
If tags are not used, the TaggedQueue behaves exactly as Queue.Queue,
but performance is worse (see below).
Assumptions:
- tag can be anything that works as a dictionary index
- when putting an item in the queue, any number of tags can be used,
but list must be provided (or the tag_list parameter not used at all)
- when getting an item for the queue, up to 1 tag can be provided.
Example:
>>> q.put('red only', tag_list=['red'])
>>> q.put('red and blue', tag_list=['red', 'blue'])
>>> q.put('blue only', tag_list=['blue'])
>>> q.put('both colours', tag_list=['red', 'blue'])
>>> q.get(tag='blue')
'red and blue'
>>> q.get(tag='blue')
'blue only'
>>> q.get(tag='red')
'red only'
>>> q.get(tag='red')
'red and blue'
>>> q.get(tag='red')
'both colours'
>>> q.get(tag='blue')
'both colours'
Performance (in seconds, smaller is better):
- adding an object in the queue 100000 times, then getting it back 100000 times
(max queue length is 100000)
Queue.Queue: 1.124342
TaggedQueue without tags: 5.291548
TaggedQueue with 10 distinct tags: 2.100272
TaggedQueue with 1000 distinct tags: 1.692542
- 100000 consecutive put() - get() calls, (max queue length is 1)
Queue.Queue: 1.158024
TaggedQueue without tags: 2.218153
TaggedQueue with 10 distinct tags: 2.270345
TaggedQueue with 1000 distinct tags: 2.344355
'''
import multiprocessing
from sys import maxint
from time import time as _time
def id_generator():
'''a generator returning numeric ID's as identifiers'''
_id = 0
while True:
_id += 1
if _id == maxint:
_id = 0
yield _id
class Empty(Exception):
'''raised when TaggedQueue is empty for a given tag'''
class Full(Exception):
'''raised when TaggedQueue is full (regardless of tags)'''
NOTAG = '_notag'
# based on Queue.Queue (no surprise)
class TaggedQueue(object):
'''A queue similar to Queue.Queue, but producers can tag items
and consumers can specify what tag are they interested in.
If an item has multiple tags, it is removed from the queue only after
all tags attached to it have been consumed.
'''
def __init__(self, maxsize=0):
'''initialize the queue'''
self.mutex = multiprocessing.Lock()
self.not_empty = multiprocessing.Condition(self.mutex)
self.not_full = multiprocessing.Condition(self.mutex)
self.maxsize = maxsize
self._tags = {} # list of refid's for each tag
self._queue = {} # the actual queue data
self._refcount = {} # how many tags refer to a given refid in the queue
self.id_generator = id_generator()
def qsize(self, tag=NOTAG):
'''Return the approximate size of the queue for a given tag.'''
self.mutex.acquire()
try:
n = len(self._tags[tag])
except KeyError:
n = 0
self.mutex.release()
return n
def empty(self, tag=NOTAG):
'''Return True if the queue is empty for a given tag'''
self.mutex.acquire()
n = self._empty(tag)
self.mutex.release()
return n
def full(self):
'''Return True if the queue is full, regardless of tags'''
self.mutex.acquire()
n = self._full()
self.mutex.release()
return n
def get(self, tag=NOTAG, block=True, timeout=None):
'''get item object with defined tag from the queue, assuming only
one tag
'''
self.not_empty.acquire()
try:
if not block:
if self._empty(tag):
raise Empty
elif timeout is None:
while self._empty(tag):
self.not_empty.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._empty(tag):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get(tag)
self.not_full.notify()
return item
finally:
self.not_empty.release()
def get_nowait(self, tag=NOTAG):
'''get tagged item immediatelly or raise an exception'''
return self.get(tag, block=False)
def put(self, item, tag_list=[NOTAG], block=True, timeout=None):
'''put an item in queue, tag_list is an optional list of tags
to be attached to the item
'''
self.not_full.acquire()
try:
if not block:
if self._full():
raise Full
elif timeout is None:
while self._full():
self.not_full.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._full():
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item, tag_list)
self.not_empty.notify()
finally:
self.not_full.release()
def put_nowait(self, item, tag_list=[NOTAG]):
'''put an item to the list immediatelly or raise an exception'''
return self.put(item, tag_list, False)
def _empty(self, tag):
return not (tag in self._tags)
def _full(self):
return self.maxsize > 0 and len(self._refcount) == self.maxsize
def _get(self, tag):
if not tag in self._tags:
raise Empty
# get item's refid of the item
refid = self._tags[tag].pop(0) # get and remove first item from list
if not self._tags[tag]: # no item in the queue have this tag anymore
del(self._tags[tag])
# get the item from the queue
item = self._queue[refid]
# decrease the reference counter
self._refcount[refid] -= 1
if self._refcount[refid] == 0: # no more references
del self._refcount[refid]
del self._queue[refid]
return item
def _put(self, item, tag_list):
refid = self.id_generator.next()
self._queue[refid] = item
self._refcount[refid] = len(tag_list)
for t in tag_list:
try:
self._tags[t].append(refid)
except KeyError:
self._tags[t] = [refid]
## --- test_tagged_queue.py
'''
Test that TaggedQueue works as expected
'''
from tagged_queue import TaggedQueue, Empty
def test_empty_queue():
'''test that TaggedQueue returns Empty correctly'''
q = TaggedQueue()
try:
q.get_nowait()
assert False
except Empty:
pass
try:
q.get_nowait(tag='non_existing')
assert False
except Empty:
pass
def test_put_and_get():
'''test that tags are honoured for basic queue operations'''
q = TaggedQueue()
q.put_nowait('no tag')
assert q.get_nowait() == 'no tag'
q.put_nowait('test data', tag_list=['tag1', 'tag2', 3])
try:
q.get_nowait()
assert False
except Empty:
pass
q.get_nowait(tag=3)
q.get_nowait(tag='tag1')
q.get_nowait(tag='tag2')
try:
q.get_nowait(tag='tag2')
assert False
except Empty:
pass
def test_empty():
'''test that .empty() function works as expected'''
q = TaggedQueue()
assert q.empty()
assert q.empty(tag='tag1')
q.put_nowait('test data')
assert not q.empty()
assert q.empty(tag='tag1')
q.put_nowait('other test data', tag_list=['tag1'])
assert not q.empty()
assert not q.empty(tag='tag1')
q.get_nowait()
assert q.empty()
assert not q.empty(tag='tag1')
q.get_nowait(tag='tag1')
assert q.empty()
assert q.empty('tag1')
def test_qsize():
'''test that .qsize() function works as expected'''
q = TaggedQueue()
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 0
q.put_nowait('test data')
assert q.qsize() == 1
assert q.qsize(tag='tag1') == 0
q.put_nowait('other test data', tag_list=['tag1'])
assert q.qsize() == 1
assert q.qsize(tag='tag1') == 1
q.put_nowait('other test data', tag_list=['tag1'])
assert q.qsize() == 1
assert q.qsize(tag='tag1') == 2
q.get_nowait()
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 2
q.get_nowait(tag='tag1')
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 1
q.get_nowait(tag='tag1')
assert q.qsize() == 0
assert q.qsize(tag='tag1') == 0
def test_full():
'''test that .full() function works as expected'''
q = TaggedQueue(maxsize=3)
assert not q.full()
q.put_nowait('data')
assert not q.full()
q.put_nowait('other data', tag_list=['some_tag'])
assert not q.full()
q.put_nowait('yet another data')
assert q.full()
def test_ordering():
'''test that queue ordering works properly'''
q = TaggedQueue()
q.put_nowait('red only', tag_list=['red'])
q.put_nowait('red and blue', tag_list=['red', 'blue'])
q.put_nowait('blue only', tag_list=['blue'])
q.put_nowait('both colours', tag_list=['red', 'blue'])
assert q.get_nowait(tag='blue') == 'red and blue'
assert q.get_nowait(tag='blue') == 'blue only'
assert q.get_nowait(tag='red') == 'red only'
assert q.get_nowait(tag='red') == 'red and blue'
assert q.get_nowait(tag='red') == 'both colours'
assert q.get_nowait(tag='blue') == 'both colours'
|