A scheduled queue is a queue with priorities that are scheduled. It is not preemtitive, higher priorities are not executed always before than lower priorities (only more often).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 | """
A scheduled queue is a queue with priorities that are scheduled. It is not preemtitive, higher priorities are not
executed always before than lower priorities (only more often).
USAGE:
init args:
maxsize: maximum size of the queue, if maxsize<=0 then the queue size is infinite
realtime: if true then the queue has a priority REAL_TIME. REAL_TIME priorities are executed before any other priorities.items
idle: if true then the queue has a priority IDLE. IDLE priorities are executed when there are no other priorities left.
priorities: a dictionary with the definitions of the priorities. The key defines the priority name and the value (an int>0)
defines the relative importance of the priority. A higher number implies that the priority will be checked first more often.
In this implementation:
36 = VERY_HIGH + HIGH + ABOVE_DEFAULT + BELOW_DEFAULT + DEFAULT + BELOW_DEFAULT + LOW + VERY_LOW
VERY_HIGH priorities are checked first 10/36 times
VERY_LOW only of 1/36
Of every 36 gets at least one will be VERY_LOW (if VERY_LOW queue is not empty). Even if there are higher non
empty queue of higher priority VERY_LOW will be checked once every 36 gets.
"""
from Queue import Queue, Full, Empty
#standard priorities
REAL_TIME = 999
VERY_HIGH = 10
HIGH = 8
ABOVE_DEFAULT = 6
DEFAULT = 5
BELOW_DEFAULT = 4
LOW = 2
VERY_LOW = 1
IDLE = -1
standard_priorities = {VERY_HIGH :10,
HIGH :8,
ABOVE_DEFAULT :6,
DEFAULT :5,
BELOW_DEFAULT :4,
LOW :2,
VERY_LOW :1}
class ScheduledQueue(Queue):
def __init__(self, maxsize=0, priorities = standard_priorities, realtime = 1, idle = 1):
"""Initialize a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
priorities: a dictionary with definition of priorities
"""
assert self._check_priorities(priorities) #check only if not -OO
import thread
self._init(priorities, maxsize, realtime, idle)
self.mutex = thread.allocate_lock()
self.esema = thread.allocate_lock()
self.esema.acquire()
self.fsema = thread.allocate_lock()
def put(self, item, priority = DEFAULT, block=1):
"""Put an item into the queue.
If optional arg 'block' is 1 (the default), block if
necessary until a free slot is available. Otherwise (block
is 0), put an item on the queue if a free slot is immediately
available, else raise the Full exception.
"""
assert self._queues.has_key(priority),"inexistent priority "+str(priority)
self._acquirePUT(block)
was_empty = self._empty()
try:
self._put(item, priority)
finally:
self._releasePUT(was_empty)
def put_nowait(self, item, priority = DEFAULT):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, priority, 0)
def get(self, block=1):
"""Remove and return an item from the queue.
If optional arg 'block' is 1 (the default), block if
necessary until an item is available. Otherwise (block is 0),
return an item if one is immediately available, else raise the
Empty exception.
"""
self._acquireGET(block)
was_full = self._full()
try:
item = self._get()
finally:
self._releaseGET(was_full)
return item
def drain(self):
self.mutex.acquire()
self._drain()
self.mutex.release()
def _acquirePUT(self, block):
if block:
self.fsema.acquire()
elif not self.fsema.acquire(0):
raise Full
self.mutex.acquire()
def _acquireGET(self, block):
if block:
self.esema.acquire()
elif not self.esema.acquire(0):
raise Empty
self.mutex.acquire()
was_full = self._full()
def _releasePUT(self, was_empty):
if was_empty:
self.esema.release()
if not self._full():
self.fsema.release()
self.mutex.release()
def _releaseGET(self, was_full):
if was_full:
self.fsema.release()
if not self._empty():
self.esema.release()
self.mutex.release()
def __len__(self):
return self.qsize()
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
def _qsize(self):
return self._len
def _empty(self):
"""Check whether the queue is empty"""
return not self._len
def _full(self):
"""Check whether the queue is full"""
return self.maxsize > 0 and self._len >= self.maxsize
def _put(self, item, priority):
"""Put a new item in the queue"""
self._queues[priority].append(item)
self._len += 1
# Get an item from the queue
def _get(self):
item = filter(None,self._roundRobinQueues.get())[0].pop(0)
self._len -= 1
return item
def _drain(self):
for queue in self._queues.values():
while len(queue)>0:queue.pop()
self._len = 0
# Initialize the queue representation
def _init(self, priorities, maxsize, realtime, idle):
self.maxsize = maxsize
self._index = 0
self._len = 0
self._queues = self._buildDictQueues(priorities, realtime, idle)
self._roundRobinQueues = RoundRobin(self._buildMatrix(priorities))
self._drain()
def _buildDictQueues(self, priorities, realtime, idle):
result = {}
if realtime: result[REAL_TIME] = [REAL_TIME]
if idle: result[IDLE] = [IDLE]
for key in priorities.keys():
result[key]=[key]
return result
def _buildMatrix(self, priorities):
result = []
_list = self._buildQueueList(priorities)
for i in index(_list):
result.append((remove_duplicates(_list[i:]+ _list[:i])))
self._addRealTimeIdle2Matrix(result)
return matrix2tuple(result)
def _buildQueueList(self, priorities):
result = []
for key, value in priorities.items():
for i in range(value):
result.append(self._queues[key])
return shuffle_list(result)
def _addRealTimeIdle2Matrix(self, matrix):
realtime = self._queues.has_key(REAL_TIME)
idle = self._queues.has_key(IDLE)
if not realtime and not idle:return
for row in matrix:
if realtime:
row.insert(0,self._queues[REAL_TIME])
if idle:
row.append(self._queues[IDLE])
def _check_priorities(self, priorities):
for value in priorities.values():
assert value>0 and type(value)==type(1),"Incorrect definition of priorities"+str(priorities)
return 1
#utility classes and methods
def index(list):
return range(len(list))
def index_list(list):
return zip(index(list),list)
def matrix2tuple(matrix):
for i,row in index_list(matrix):
matrix[i]=tuple(row)
return tuple(matrix)
def shuffle_list(_list):
import random
for i in index(_list):
j = int(random.random()*len(_list))
_list[i],_list[j]=_list[j],_list[i]
return _list
def remove_duplicates(list):
assert type(list)==type([]) or type(list)==type(()), "List should be a [] or ()"
result = []
for elem in list:
if elem not in result:result.append(elem)
if type(list)==type(()):result=tuple(result)
return result
class RoundRobin:
def __init__(self, round_robin_list):
assert len(round_robin_list), str(round_robin_list)
self._index = -1
self._list = tuple(round_robin_list)
self._size = len(self._list)
def get(self):
self._index += 1
self._index %= self._size
return self._list[self._index]
#tests
def _test():
q = ScheduledQueue()
_empty_test()
_idle_last()
_real_first()
_max_size_test()
def _empty_test(q = ScheduledQueue()):
try:
q.get(0)
except Empty:
print "Empty test OK"
def _idle_last(q = ScheduledQueue()):
q.put(0,IDLE)
q.put(1)
q.get()
if not q.get():print "IDLE LAST OK"
def _real_first(q = ScheduledQueue()):
q.put(0)
q.put(1,REAL_TIME)
if q.get():print "REAL_TIME test OK"
q.get()
def _max_size_test(q = ScheduledQueue(maxsize = 1)):
q.put(0)
try:
q.put(0, block=0)
except Full:
print "Full test OK"
if __name__=='__main__':
_test()
|
Use a scheduled queue when you will use a priority queue, but you need to get low priority items even if there are higher priorities items in the queue. In this implementation there is a list for every priority, put an item is simple. With this dictionary of queues I build a matrix of queues. Every row is a tuple of the queues, but every row has a diferent order. This order is build in a manner that higher priorities are at first place more often than low priorities. Every time that get is called a row is obtained of the matrix in a round robin form. With a filter are obtained the non empty queues.