Welcome, guest | Sign In | My Account | Store | Cart

I needed multiple consumers to retrieve data from a queue fed by one producer. Could not find a good working code for that, so I implemented my own queue. Docstring should describe how this works.

Two notes: 1) my code uses multiprocessing code, but in this module, the Lock and Condition could be easily replaced with the same objects from the threading module 2) the attached test uses syntax for "nose" testing package, I did not convert it to doctest or UnitTest.

Python, 322 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
'''
A tagged queue. The producer(s) place items in the queue, each item _may_ have
a list of tags attached to it. Consumer(s) may request an item of a given tag.
The tag is then "consumed" from this item. The item is removed from the queue
when all its tags are consumed.

For better understanding, think about it as set of discinct queues, 
one for a given tag (and one for no tag). 
This will be appropriate for most cases (although internal 
plumbing is different), except for full() which counts all
items regardless of tags to guarrantee maximum object count in memory

If tags are not used, the TaggedQueue behaves exactly as Queue.Queue,
but performance is worse (see below).

Assumptions:
    - tag can be anything that works as a dictionary index
    - when putting an item in the queue, any number of tags can be used,
      but list must be provided (or the tag_list parameter not used at all)
    - when getting an item for the queue, up to 1 tag can be provided. 

Example:

>>> q.put('red only', tag_list=['red'])
>>> q.put('red and blue', tag_list=['red', 'blue'])
>>> q.put('blue only', tag_list=['blue'])
>>> q.put('both colours', tag_list=['red', 'blue'])
>>> q.get(tag='blue')
'red and blue'
>>> q.get(tag='blue')
'blue only'
>>> q.get(tag='red')
'red only'
>>> q.get(tag='red')
'red and blue'
>>> q.get(tag='red')
'both colours'
>>> q.get(tag='blue')
'both colours'


Performance (in seconds, smaller is better):

 - adding an object in the queue 100000 times, then getting it back 100000 times
   (max queue length is 100000)
    Queue.Queue: 1.124342
    TaggedQueue without tags: 5.291548
    TaggedQueue with 10 distinct tags: 2.100272
    TaggedQueue with 1000 distinct tags: 1.692542

 - 100000 consecutive put() - get() calls, (max queue length is 1)
    Queue.Queue: 1.158024
    TaggedQueue without tags: 2.218153
    TaggedQueue with 10 distinct tags: 2.270345
    TaggedQueue with 1000 distinct tags: 2.344355
'''

import multiprocessing
from sys import maxint
from time import time as _time

def id_generator():
    '''a generator returning numeric ID's as identifiers'''
    _id = 0
    while True:
        _id += 1
        if _id == maxint:
            _id = 0
        yield _id

class Empty(Exception):
    '''raised when TaggedQueue is empty for a given tag'''

class Full(Exception):
    '''raised when TaggedQueue is full (regardless of tags)'''

NOTAG = '_notag'

# based on Queue.Queue (no surprise)
class TaggedQueue(object):
    '''A queue similar to Queue.Queue, but producers can tag items
    and consumers can specify what tag are they interested in.
    If an item has multiple tags, it is removed from the queue only after
    all tags attached to it have been consumed.
    '''
    def __init__(self, maxsize=0):
        '''initialize the queue'''
        self.mutex = multiprocessing.Lock()
        self.not_empty = multiprocessing.Condition(self.mutex)
        self.not_full = multiprocessing.Condition(self.mutex)
        self.maxsize = maxsize
        self._tags = {}  # list of refid's for each tag
        self._queue = {}  # the actual queue data
        self._refcount = {}  # how many tags refer to a given refid in the queue
        self.id_generator = id_generator()

    def qsize(self, tag=NOTAG):
        '''Return the approximate size of the queue for a given tag.'''
        self.mutex.acquire()
        try:
            n = len(self._tags[tag])
        except KeyError:
            n = 0
        self.mutex.release()
        return n

    def empty(self, tag=NOTAG):
        '''Return True if the queue is empty for a given tag'''
        self.mutex.acquire()
        n = self._empty(tag)
        self.mutex.release()
        return n

    def full(self):
        '''Return True if the queue is full, regardless of tags'''
        self.mutex.acquire()
        n = self._full() 
        self.mutex.release()
        return n

    def get(self, tag=NOTAG, block=True, timeout=None):
        '''get item object with defined tag from the queue, assuming only
        one tag
        '''
        self.not_empty.acquire()
        try:
            if not block:
                if self._empty(tag):
                    raise Empty
            elif timeout is None:
                while self._empty(tag):
                    self.not_empty.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                endtime = _time() + timeout
                while self._empty(tag):
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get(tag)
            self.not_full.notify()
            return item
        finally:
            self.not_empty.release()

    def get_nowait(self, tag=NOTAG):
        '''get tagged item immediatelly or raise an exception'''
        return self.get(tag, block=False)
    
    def put(self, item, tag_list=[NOTAG], block=True, timeout=None):
        '''put an item in queue, tag_list is an optional list of tags
        to be attached to the item
        '''
        self.not_full.acquire()
        try:
            if not block:
                if self._full():
                    raise Full
            elif timeout is None:
                while self._full():
                    self.not_full.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                endtime = _time() + timeout
                while self._full():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Full
                    self.not_full.wait(remaining)
            self._put(item, tag_list)
            self.not_empty.notify()
        finally:
            self.not_full.release()

    def put_nowait(self, item, tag_list=[NOTAG]):
        '''put an item to the list immediatelly or raise an exception'''
        return self.put(item, tag_list, False)

    def _empty(self, tag):
        return not (tag in self._tags)

    def _full(self):
        return self.maxsize > 0 and len(self._refcount) == self.maxsize 

    def _get(self, tag):
        if not tag in self._tags:
            raise Empty
        # get item's refid of the item
        refid = self._tags[tag].pop(0)  # get and remove first item from list
        if not self._tags[tag]:  # no item in the queue have this tag anymore
            del(self._tags[tag])
        # get the item from the queue
        item = self._queue[refid]
        # decrease the reference counter
        self._refcount[refid] -= 1
        if self._refcount[refid] == 0:  # no more references
            del self._refcount[refid]
            del self._queue[refid]
        return item

    def _put(self, item, tag_list):
        refid = self.id_generator.next()
        self._queue[refid] = item
        self._refcount[refid] = len(tag_list)
        for t in tag_list:
            try:
                self._tags[t].append(refid)
            except KeyError:
                self._tags[t] = [refid]

## --- test_tagged_queue.py

'''
Test that TaggedQueue works as expected
'''

from tagged_queue import TaggedQueue, Empty

def test_empty_queue():
    '''test that TaggedQueue returns Empty correctly'''
    q = TaggedQueue()
    try:
        q.get_nowait()
        assert False
    except Empty:
        pass
    
    try:
        q.get_nowait(tag='non_existing')
        assert False
    except Empty:
        pass

def test_put_and_get():
    '''test that tags are honoured for basic queue operations'''
    q = TaggedQueue()
    q.put_nowait('no tag')
    assert q.get_nowait() == 'no tag'
    q.put_nowait('test data', tag_list=['tag1', 'tag2', 3])
    try:
        q.get_nowait()
        assert False
    except Empty:
        pass
    q.get_nowait(tag=3)
    q.get_nowait(tag='tag1')
    q.get_nowait(tag='tag2')
    try:
        q.get_nowait(tag='tag2')
        assert False
    except Empty:
        pass

def test_empty():
    '''test that .empty() function works as expected'''
    q = TaggedQueue()
    assert q.empty()
    assert q.empty(tag='tag1')
    q.put_nowait('test data')
    assert not q.empty()
    assert q.empty(tag='tag1')
    q.put_nowait('other test data', tag_list=['tag1'])
    assert not q.empty()
    assert not q.empty(tag='tag1')
    q.get_nowait()
    assert q.empty()
    assert not q.empty(tag='tag1')
    q.get_nowait(tag='tag1')
    assert q.empty()
    assert q.empty('tag1')

def test_qsize():
    '''test that .qsize() function works as expected'''
    q = TaggedQueue()
    assert q.qsize() == 0
    assert q.qsize(tag='tag1') == 0
    q.put_nowait('test data')
    assert q.qsize() == 1
    assert q.qsize(tag='tag1') == 0
    q.put_nowait('other test data', tag_list=['tag1'])
    assert q.qsize() == 1
    assert q.qsize(tag='tag1') == 1
    q.put_nowait('other test data', tag_list=['tag1'])
    assert q.qsize() == 1
    assert q.qsize(tag='tag1') == 2
    q.get_nowait()
    assert q.qsize() == 0
    assert q.qsize(tag='tag1') == 2
    q.get_nowait(tag='tag1')
    assert q.qsize() == 0
    assert q.qsize(tag='tag1') == 1
    q.get_nowait(tag='tag1')
    assert q.qsize() == 0
    assert q.qsize(tag='tag1') == 0

def test_full():
    '''test that .full() function works as expected'''
    q = TaggedQueue(maxsize=3)
    assert not q.full()
    q.put_nowait('data')
    assert not q.full()
    q.put_nowait('other data', tag_list=['some_tag'])
    assert not q.full()
    q.put_nowait('yet another data')
    assert q.full()

def test_ordering():
    '''test that queue ordering works properly'''
    q = TaggedQueue()
    q.put_nowait('red only', tag_list=['red'])
    q.put_nowait('red and blue', tag_list=['red', 'blue'])
    q.put_nowait('blue only', tag_list=['blue'])
    q.put_nowait('both colours', tag_list=['red', 'blue'])
    assert q.get_nowait(tag='blue') == 'red and blue'
    assert q.get_nowait(tag='blue') == 'blue only'
    assert q.get_nowait(tag='red') == 'red only'
    assert q.get_nowait(tag='red') == 'red and blue'
    assert q.get_nowait(tag='red') == 'both colours'
    assert q.get_nowait(tag='blue') == 'both colours'