import threading,Queue,time,sys,traceback
#Globals (start with a captial letter)
Qin = Queue.Queue()
Qout = Queue.Queue()
Qerr = Queue.Queue()
Pool = []
def err_msg():
trace= sys.exc_info()[2]
try:
exc_value=str(sys.exc_value)
except:
exc_value=''
return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value
def get_errors():
try:
while 1:
yield Qerr.get_nowait()
except Queue.Empty:
pass
def process_queue():
flag='ok'
while flag !='stop':
try:
flag,item=Qin.get() #will wait here!
if flag=='ok':
newdata='new'+item
Qout.put(newdata)
except:
Qerr.put(err_msg())
def start_threads(amount=5):
for i in range(amount):
thread = threading.Thread(target=process_queue)
thread.start()
Pool.append(thread)
def put(data,flag='ok'):
Qin.put([flag,data])
def get(): return Qout.get() #will wait here!
def get_all():
try:
while 1:
yield Qout.get_nowait()
except Queue.Empty:
pass
def stop_threads():
for i in range(len(Pool)):
Qin.put(('stop',None))
while Pool:
time.sleep(1)
for index,the_thread in enumerate(Pool):
if the_thread.isAlive():
continue
else:
del Pool[index]
break
#STANDARD use:
for i in ('b','c'): put(i)
start_threads()
stop_threads()
for i in get_all(): print i
for i in get_errors(): print i
#POOL use
#put element into input queue
put('a')
#setup threads -- will run forever as a pool until you shutdown
start_threads()
for i in ('b','c'): put(i)
#get an element from output queue
print get()
#put even more data in, 7 causes an error
for i in ('d','e',7): put(i)
#get whatever is available
for i in get_all(): print i
#stop_threads only returns when all threads have stopped
stop_threads()
print '__threads finished last data available__'
for i in get_all(): print i
for i in get_errors(): print i
#starting up threads again
start_threads()
put('f')
stop_threads()
print '__threads finished(again) last data available__'
for i in get_all(): print i
for i in get_errors(): print i