Welcome, guest | Sign In | My Account | Store | Cart
"""
A scheduled queue is a queue with priorities that are scheduled. It is not preemtitive, higher priorities are not
executed always before than lower priorities (only more often).
USAGE:
init args:
maxsize: maximum size of the queue, if maxsize<=0 then the queue size is infinite
realtime: if true then the queue has a priority REAL_TIME. REAL_TIME priorities are executed before any other priorities.items
idle: if true then the queue has a priority IDLE. IDLE priorities are executed when there are no other priorities left.
priorities: a dictionary with the definitions of the priorities. The key defines the priority name and the value (an int>0)
defines the relative importance of the priority. A higher number implies that the priority will be checked first more often.
In this implementation:
36 = VERY_HIGH + HIGH + ABOVE_DEFAULT + BELOW_DEFAULT + DEFAULT + BELOW_DEFAULT + LOW + VERY_LOW
VERY_HIGH priorities are checked first 10/36 times
VERY_LOW only of 1/36
Of every 36 gets at least one will be VERY_LOW (if VERY_LOW queue is not empty). Even if there are higher non
empty queue of higher priority VERY_LOW will be checked once every 36 gets.
"""

from Queue import Queue, Full, Empty

#standard priorities
REAL_TIME       = 999
VERY_HIGH       = 10
HIGH            = 8
ABOVE_DEFAULT   = 6
DEFAULT         = 5
BELOW_DEFAULT   = 4
LOW             = 2
VERY_LOW        = 1
IDLE            = -1

standard_priorities = {VERY_HIGH        :10,
HIGH             :8,
ABOVE_DEFAULT    :6,
DEFAULT          :5,
BELOW_DEFAULT    :4,
LOW              :2,
VERY_LOW         :1}

class ScheduledQueue(Queue):
def __init__(self, maxsize=0, priorities = standard_priorities, realtime = 1, idle = 1):
"""Initialize a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.
priorities: a dictionary with definition of priorities
"""
assert self._check_priorities(priorities)       #check only if not -OO
self._init(priorities, maxsize, realtime, idle)
self.esema.acquire()

def put(self, item, priority = DEFAULT, block=1):
"""Put an item into the queue.

If optional arg 'block' is 1 (the default), block if
necessary until a free slot is available.  Otherwise (block
is 0), put an item on the queue if a free slot is immediately
available, else raise the Full exception.
"""
assert self._queues.has_key(priority),"inexistent priority "+str(priority)
self._acquirePUT(block)
was_empty = self._empty()
try:
self._put(item, priority)
finally:
self._releasePUT(was_empty)

def put_nowait(self, item, priority = DEFAULT):
"""Put an item into the queue without blocking.

Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, priority, 0)

def get(self, block=1):
"""Remove and return an item from the queue.

If optional arg 'block' is 1 (the default), block if
necessary until an item is available.  Otherwise (block is 0),
return an item if one is immediately available, else raise the
Empty exception.
"""
self._acquireGET(block)
was_full = self._full()
try:
item = self._get()
finally:
self._releaseGET(was_full)
return item

def drain(self):
self.mutex.acquire()
self._drain()
self.mutex.release()

def _acquirePUT(self, block):
if block:
self.fsema.acquire()
elif not self.fsema.acquire(0):
raise Full
self.mutex.acquire()

def _acquireGET(self, block):
if block:
self.esema.acquire()
elif not self.esema.acquire(0):
raise Empty
self.mutex.acquire()
was_full = self._full()

def _releasePUT(self, was_empty):
if was_empty:
self.esema.release()
if not self._full():
self.fsema.release()
self.mutex.release()

def _releaseGET(self, was_full):
if was_full:
self.fsema.release()
if not self._empty():
self.esema.release()
self.mutex.release()

def __len__(self):
return self.qsize()

# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
def _qsize(self):
return self._len

def _empty(self):
"""Check whether the queue is empty"""
return not self._len

def _full(self):
"""Check whether the queue is full"""
return self.maxsize > 0 and self._len >= self.maxsize

def _put(self, item, priority):
"""Put a new item in the queue"""
self._queues[priority].append(item)
self._len += 1

# Get an item from the queue
def _get(self):
item = filter(None,self._roundRobinQueues.get()).pop(0)
self._len -= 1
return item

def _drain(self):
for queue in self._queues.values():
while len(queue)>0:queue.pop()
self._len = 0

# Initialize the queue representation
def _init(self, priorities, maxsize, realtime, idle):
self.maxsize = maxsize
self._index = 0
self._len = 0
self._queues = self._buildDictQueues(priorities, realtime, idle)
self._roundRobinQueues = RoundRobin(self._buildMatrix(priorities))
self._drain()

def _buildDictQueues(self, priorities, realtime, idle):
result = {}
if realtime: result[REAL_TIME] = [REAL_TIME]
if idle: result[IDLE] = [IDLE]
for key in priorities.keys():
result[key]=[key]
return result

def _buildMatrix(self, priorities):
result = []
_list = self._buildQueueList(priorities)
for i in index(_list):
result.append((remove_duplicates(_list[i:]+ _list[:i])))
return matrix2tuple(result)

def _buildQueueList(self, priorities):
result = []
for key, value in priorities.items():
for i in range(value):
result.append(self._queues[key])
return shuffle_list(result)

realtime = self._queues.has_key(REAL_TIME)
idle = self._queues.has_key(IDLE)
if not realtime and not idle:return
for row in matrix:
if realtime:
row.insert(0,self._queues[REAL_TIME])
if idle:
row.append(self._queues[IDLE])

def _check_priorities(self, priorities):
for value in priorities.values():
assert value>0 and type(value)==type(1),"Incorrect definition of priorities"+str(priorities)
return 1

#utility classes and methods
def index(list):
return range(len(list))

def index_list(list):
return zip(index(list),list)

def matrix2tuple(matrix):
for i,row in index_list(matrix):
matrix[i]=tuple(row)
return tuple(matrix)

def shuffle_list(_list):
import random
for i in index(_list):
j = int(random.random()*len(_list))

_list[i],_list[j]=_list[j],_list[i]
return _list

def remove_duplicates(list):
assert type(list)==type([]) or type(list)==type(()), "List should be a [] or ()"
result = []
for elem in list:
if elem not in result:result.append(elem)
if type(list)==type(()):result=tuple(result)
return result

class RoundRobin:
def __init__(self, round_robin_list):
assert len(round_robin_list), str(round_robin_list)
self._index = -1
self._list = tuple(round_robin_list)
self._size = len(self._list)

def get(self):
self._index += 1
self._index %= self._size
return self._list[self._index]

#tests
def _test():
q = ScheduledQueue()
_empty_test()
_idle_last()
_real_first()
_max_size_test()

def _empty_test(q = ScheduledQueue()):
try:
q.get(0)
except Empty:
print "Empty test OK"

def _idle_last(q = ScheduledQueue()):
q.put(0,IDLE)
q.put(1)
q.get()
if not q.get():print "IDLE LAST OK"

def _real_first(q = ScheduledQueue()):
q.put(0)
q.put(1,REAL_TIME)
if q.get():print "REAL_TIME test OK"
q.get()

def _max_size_test(q = ScheduledQueue(maxsize = 1)):
q.put(0)
try:
q.put(0, block=0)
except Full:
print "Full test OK"

if __name__=='__main__':
_test()