Welcome, guest | Sign In | My Account | Store | Cart

A scheduled queue is a queue with priorities that are scheduled. It is not preemtitive, higher priorities are not executed always before than lower priorities (only more often).

Python, 286 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"""
A scheduled queue is a queue with priorities that are scheduled. It is not preemtitive, higher priorities are not
executed always before than lower priorities (only more often).
USAGE:
init args:
maxsize: maximum size of the queue, if maxsize<=0 then the queue size is infinite
realtime: if true then the queue has a priority REAL_TIME. REAL_TIME priorities are executed before any other priorities.items
idle: if true then the queue has a priority IDLE. IDLE priorities are executed when there are no other priorities left.
priorities: a dictionary with the definitions of the priorities. The key defines the priority name and the value (an int>0)
            defines the relative importance of the priority. A higher number implies that the priority will be checked first more often.
            In this implementation:
            36 = VERY_HIGH + HIGH + ABOVE_DEFAULT + BELOW_DEFAULT + DEFAULT + BELOW_DEFAULT + LOW + VERY_LOW
            VERY_HIGH priorities are checked first 10/36 times
            VERY_LOW only of 1/36
            Of every 36 gets at least one will be VERY_LOW (if VERY_LOW queue is not empty). Even if there are higher non
            empty queue of higher priority VERY_LOW will be checked once every 36 gets.
"""

from Queue import Queue, Full, Empty

#standard priorities
REAL_TIME       = 999
VERY_HIGH       = 10
HIGH            = 8
ABOVE_DEFAULT   = 6
DEFAULT         = 5
BELOW_DEFAULT   = 4
LOW             = 2
VERY_LOW        = 1
IDLE            = -1

standard_priorities = {VERY_HIGH        :10,
                       HIGH             :8,
                       ABOVE_DEFAULT    :6,
                       DEFAULT          :5,
                       BELOW_DEFAULT    :4,
                       LOW              :2,
                       VERY_LOW         :1}

class ScheduledQueue(Queue):
    def __init__(self, maxsize=0, priorities = standard_priorities, realtime = 1, idle = 1):
        """Initialize a queue object with a given maximum size.

        If maxsize is <= 0, the queue size is infinite.
        priorities: a dictionary with definition of priorities
        """
        assert self._check_priorities(priorities)       #check only if not -OO
        import thread
        self._init(priorities, maxsize, realtime, idle)
        self.mutex = thread.allocate_lock()
        self.esema = thread.allocate_lock()
        self.esema.acquire()
        self.fsema = thread.allocate_lock()

    def put(self, item, priority = DEFAULT, block=1):
        """Put an item into the queue.

        If optional arg 'block' is 1 (the default), block if
        necessary until a free slot is available.  Otherwise (block
        is 0), put an item on the queue if a free slot is immediately
        available, else raise the Full exception.
        """
        assert self._queues.has_key(priority),"inexistent priority "+str(priority)
        self._acquirePUT(block)
        was_empty = self._empty()
        try:
            self._put(item, priority)
        finally:
            self._releasePUT(was_empty)

    def put_nowait(self, item, priority = DEFAULT):
        """Put an item into the queue without blocking.

        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
        """
        return self.put(item, priority, 0)

    def get(self, block=1):
        """Remove and return an item from the queue.

        If optional arg 'block' is 1 (the default), block if
        necessary until an item is available.  Otherwise (block is 0),
        return an item if one is immediately available, else raise the
        Empty exception.
        """
        self._acquireGET(block)
        was_full = self._full()
        try:
            item = self._get()
        finally:
            self._releaseGET(was_full)        
        return item


    def drain(self):
        self.mutex.acquire()
        self._drain()
        self.mutex.release()
        
    def _acquirePUT(self, block):
        if block:
            self.fsema.acquire()
        elif not self.fsema.acquire(0):
            raise Full
        self.mutex.acquire()

    def _acquireGET(self, block):
        if block:
            self.esema.acquire()
        elif not self.esema.acquire(0):
            raise Empty
        self.mutex.acquire()
        was_full = self._full()        
        
    def _releasePUT(self, was_empty):
        if was_empty:
            self.esema.release()
        if not self._full():
            self.fsema.release()
        self.mutex.release()

    def _releaseGET(self, was_full):
        if was_full:
            self.fsema.release()
        if not self._empty():
            self.esema.release()
        self.mutex.release()  
        
    def __len__(self):
        return self.qsize()
    
    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held
    def _qsize(self):
        return self._len

    def _empty(self):
        """Check whether the queue is empty"""
        return not self._len

    def _full(self):
        """Check whether the queue is full"""
        return self.maxsize > 0 and self._len >= self.maxsize

    def _put(self, item, priority):
        """Put a new item in the queue"""
        self._queues[priority].append(item)
        self._len += 1

    # Get an item from the queue
    def _get(self):
        item = filter(None,self._roundRobinQueues.get())[0].pop(0)
        self._len -= 1
        return item

    def _drain(self):
        for queue in self._queues.values():
            while len(queue)>0:queue.pop()
        self._len = 0    

    # Initialize the queue representation
    def _init(self, priorities, maxsize, realtime, idle):
        self.maxsize = maxsize
        self._index = 0
        self._len = 0
        self._queues = self._buildDictQueues(priorities, realtime, idle)
        self._roundRobinQueues = RoundRobin(self._buildMatrix(priorities))
        self._drain()

    def _buildDictQueues(self, priorities, realtime, idle):
        result = {}
        if realtime: result[REAL_TIME] = [REAL_TIME]
        if idle: result[IDLE] = [IDLE]
        for key in priorities.keys():
            result[key]=[key]
        return result

    def _buildMatrix(self, priorities):
        result = []
        _list = self._buildQueueList(priorities)
        for i in index(_list):
            result.append((remove_duplicates(_list[i:]+ _list[:i])))
        self._addRealTimeIdle2Matrix(result)
        return matrix2tuple(result)

    
    def _buildQueueList(self, priorities):
        result = []
        for key, value in priorities.items():
            for i in range(value):
                result.append(self._queues[key])
        return shuffle_list(result)
    
    def _addRealTimeIdle2Matrix(self, matrix):
        realtime = self._queues.has_key(REAL_TIME)
        idle = self._queues.has_key(IDLE)
        if not realtime and not idle:return
        for row in matrix:
            if realtime:
                row.insert(0,self._queues[REAL_TIME])
            if idle:
                row.append(self._queues[IDLE])

    def _check_priorities(self, priorities):
        for value in priorities.values():
            assert value>0 and type(value)==type(1),"Incorrect definition of priorities"+str(priorities)
        return 1


#utility classes and methods
def index(list):
    return range(len(list))

def index_list(list):
	return zip(index(list),list)
             
def matrix2tuple(matrix):
    for i,row in index_list(matrix):
        matrix[i]=tuple(row)
    return tuple(matrix)

def shuffle_list(_list):
    import random
    for i in index(_list):
        j = int(random.random()*len(_list))
        
        _list[i],_list[j]=_list[j],_list[i]
    return _list

def remove_duplicates(list):
    assert type(list)==type([]) or type(list)==type(()), "List should be a [] or ()"
    result = []
    for elem in list:
        if elem not in result:result.append(elem)
    if type(list)==type(()):result=tuple(result)
    return result

class RoundRobin:
    def __init__(self, round_robin_list):
        assert len(round_robin_list), str(round_robin_list)
        self._index = -1
        self._list = tuple(round_robin_list)
        self._size = len(self._list)
        
    def get(self):
        self._index += 1
        self._index %= self._size
        return self._list[self._index]

#tests
def _test():
    q = ScheduledQueue()
    _empty_test()
    _idle_last()
    _real_first()
    _max_size_test()

def _empty_test(q = ScheduledQueue()):
    try:
        q.get(0)
    except Empty:
        print "Empty test OK"
        
def _idle_last(q = ScheduledQueue()):
    q.put(0,IDLE)
    q.put(1)
    q.get()
    if not q.get():print "IDLE LAST OK"

def _real_first(q = ScheduledQueue()):
    q.put(0)
    q.put(1,REAL_TIME)
    if q.get():print "REAL_TIME test OK"
    q.get()
    
def _max_size_test(q = ScheduledQueue(maxsize = 1)):
    q.put(0)
    try:
        q.put(0, block=0)
    except Full:
        print "Full test OK"

if __name__=='__main__':
    _test()

Use a scheduled queue when you will use a priority queue, but you need to get low priority items even if there are higher priorities items in the queue. In this implementation there is a list for every priority, put an item is simple. With this dictionary of queues I build a matrix of queues. Every row is a tuple of the queues, but every row has a diferent order. This order is build in a manner that higher priorities are at first place more often than low priorities. Every time that get is called a row is obtained of the matrix in a round robin form. With a filter are obtained the non empty queues.