A framework for easy (2-way) inter-thread communication resembling normal function calling.
Especially useful for non-blocking UI techniques and for load distribution on jerky resources. Can replace stiff Queue.Queue techniques in most cases - making threading code more readable and functional.
CallQueue lets you express function directly in local context, but execute things in a target thread. It focuses naturally on 2-way communication (with return value responses) and includes a fluid concept for inter-thread exception (transfer) issues. Supports also multi-producer, multi-consumer communication.
A target thread just has to do callqueue.receive() periodically without worrying about any data passing. Thus CallQueue also supports naturally a high-level bulk threading concept with anonymous "default consumer threads": Allocated "thread resources" can be thrown efficently on bunches of jobs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | def example_CallQueue():
# 3 default consumer threads work and collect on 6 I/O jobs
import urllib,time
l=['http://www.python.org/',
'http://www.python.org/about/gettingstarted',
'http://www.python.org/about/apps/',
'http://www.python.org/about/quotes',
'http://www.python.org/about/website',
'http://www.python.org/about/help', ]
def work(url):
return len(urllib.urlopen(url).read())
#
cq=CallQueue(max_default_consumer_threads=3)
for url in l:
cqitem=cq.call_and_collect(work,(url,)) #schedule
while not cq.is_done():
for cqitem in cq.get_next_collected(): #harvest
print cqitem.args, cqitem.get_return()
print ".",
time.sleep(0.001)
print "example_CallQueue done."
def example2_CallQueue():
# interacts with my_thread and finally kills it ..
import sys,thread,time,copy,random
cq=CallQueue()
l=[] # to be filled by my_thread
def my_thread():
try:
while 1:
cq.receive()
l.append(random.random())
finally:
print "my_thread terminated."
thread.start_new(my_thread,())
time.sleep(0.020)
l_snapshot=cq.call(lambda:copy.copy(l),wait=1) #synchronous call
print "1st snapshot:", len(l_snapshot),"numbers"
cqitem=cq.call(lambda:copy.copy(l)) #non-blocking call
while 1:
if cqitem.is_done():
l_snapshot2=cqitem.get_return()
print "2n snapshot:", len(l_snapshot2),"numbers"
break
else:
print ".",
time.sleep(0.001)
cq.call(sys.exit, wait=1, raise_exception=2) #raises SystemExit inside my_thread
print "end state:",len(l),"numbers"
print "example2_CallQueue done."
import sys
from time import time as _time, sleep as _sleep
class Full(Exception):pass
class Empty(Exception):pass
class CQItem:
args=None
kwargs=None
done=0 # 1=return value; 2=exception
delivered=0
raise_exception=1
def get_return(self,alt_return=None):
"""delivers the return value or (by default) echoes the exception of the call job
"""
if self.done==2:
if self.raise_exception & 1: #by default exception is raised
exc=self.exc
del self.exc
raise exc[0],exc[1],exc[2]
else:
return alt_return
return self.ret
def get_exception(self):
return self.exc
def is_done(self):
"""returns 1, if the call return's a value; 2, if an exception was raised
"""
return self.done
class CallQueue:
closed=0
exc=None
max_dthreads=0
dthreads_count=0
def __init__(self,maxsize=None,max_default_consumer_threads=0):
self.fifo=[] # self.fifo=Queue.Queue() not necessary, if .append() and .pop(0) Python atomic
self.collected=[]
self.maxsize=maxsize # approximate guarantee, if Queue.Queue is not used
self.max_dthreads=max_default_consumer_threads
def call( self, func, args=(), kwargs={}, wait=0, timeout=None, raise_exception=1, alt_return=None ):
"""Puts a call into the queue and optionally waits for return.
wait: 0=asynchronous call. A call queue item is returned
1=waits for return value or exception
callable -> waits and wait()-call's while waiting for return
raise_exception: 1=raise in caller, 2=raise in receiver, 3=raise in both,
0=silent replace with alt_return
"""
if self.dthreads_count<self.max_dthreads:
self.add_default_consumer_threads(n=1)
if self.closed:
raise Full, "queue already closed"
cqitem=CQItem()
cqitem.func=func
cqitem.args=args
cqitem.kwargs=kwargs
cqitem.wait=wait
cqitem.raise_exception=raise_exception
if self.maxsize and len(self.fifo)>=self.maxsize:
raise Full, "queue's maxsize exceeded"
self.fifo.append( cqitem )
if self.closed:
raise Full, "queue already closed"
if wait:
starttime = _time()
delay=0.0005
while not cqitem.is_done():
if timeout:
remaining = starttime + timeout - _time()
if remaining <= 0: #time is over and no element arrived
if raise_exception:
raise Empty, "return timed out"
else:
return alt_return
delay = min(delay * 2, remaining, .05)
else:
delay = min(delay * 2, .05)
if callable(wait): wait()
_sleep(delay) #reduce CPU usage by using a sleep
return cqitem.get_return()
return cqitem
def call_and_collect(self,*args,**kwargs):
r=self.call(*args,**kwargs)
self.collected.append(r)
return r
def add_default_consumer_threads(self,n=1,maxdelay=0.016):
import thread, weakref
weak_self=weakref.proxy(self)
for i in range(n):
self.dthreads_count+=1
tid=thread.start_new(_default_consumer_thread,(weak_self,maxdelay))
def is_done(self):
"""check if call-queue and collected are flushed"""
if self.fifo or self.collected:
return False
return True
def get_next_collected(self):
next=[]
for cqitem in self.collected[:]:
if not isinstance(cqitem,CQItem) or cqitem.is_done():
next.append(cqitem)
self.collected.remove(cqitem)
return next
def receive(self):
"""To be called (periodically) by target thread(s). Returns number of calls handled.
"""
count=0
while self.fifo:
try:
cqitem=self.fifo.pop(0)
except IndexError:
break # multi-consumer race lost
try:
cqitem.ret=cqitem.func(*cqitem.args,**cqitem.kwargs)
cqitem.done=1
except:
if cqitem.raise_exception & 1:
cqitem.exc=sys.exc_info()
cqitem.done=2
if cqitem.raise_exception & 2:
raise
count+=1
return count
def qsize(self):
"""Returns current number of unconsumed calls in the queue
"""
return len(self.fifo)
def close(self):
"""stops further attempts for calling and terminates default consumer threads
"""
self.closed=1
def close_and_receive_last(self):
self.close()
self.receive()
def __del__(self):
self.close()
def _default_consumer_thread(cq,maxdelay=0.016):
delay=0.001
try:
while not cq.closed:
count=cq.receive()
if count: delay=0.001
_sleep(delay)
delay=min(delay*2,maxdelay)
except ReferenceError:
pass
if __name__=='__main__':
example_CallQueue()
example2_CallQueue()
|
While bare Queue.Queue techniques require the design of a a full blown client-server like protocol in the style of inter-process communication (or clumsy Java-ish alter ego classes), the function-oriented CallQueue makes inter-thread communication a snap and treading code remains readable and cohesive.
See also twin-brother recipe "BackgroundCall" http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/491280 .
Have requested that kind of layout for addition to Python's Queue module. Feedback / Suggestions?