import gobject
# Infrastructure: tasks and jobs.
def task(function, *args, **kwargs):
"""Wrapps a task (expected signature: 'def function(return_task, [args])'"""
return function, args, kwargs
def start_job(generator):
"""Start a job (a generator that yield tasks)."""
def _next(result):
def _callback():
try:
fun, args, kwargs = generator.send(result)
except StopIteration:
return
fun(_next, *args, **kwargs)
# isolate the advance of the coroutine to another gobject loop
gobject.idle_add(_callback)
_next(None)
# Tasks: A task is a normal function, except for the first argument, a function
# that must be called with the result of the task.
def sleep(task_return, seconds):
"""Wait 'seconds' seconds. Returns nothing."""
def _callback():
task_return(None)
gobject.timeout_add(int(seconds * 1000), _callback)
def threaded(task_return, function, *args, **kwargs):
"""Run function(*args, **kwargs) in a thread and return the value."""
from Queue import Queue
from threading import Thread
def _thread(queue):
x = function(*args, **kwargs)
queue.put(x)
def _receiver(queue):
if queue.empty():
return True
task_return(queue.get())
queue = Queue()
thread = Thread(target=_thread, args=(queue,))
thread.setDaemon(True)
thread.start()
gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds
# Job example
def heavy_function(a, b):
import time
time.sleep(1.0)
return a + b
def myjob(a, b):
import random
yield task(sleep, random.uniform(1.0, 3.0))
result = yield task(threaded, heavy_function, a, b)
print result
def basso_continuo():
import sys
sys.stderr.write(".")
return True
gobject.threads_init()
for x in range(1, 11):
start_job(myjob(3*x, 7*x))
gobject.timeout_add(100, basso_continuo)
loop = gobject.MainLoop()
loop.run()
Diff to Previous Revision
--- revision 7 2010-03-26 16:23:30
+++ revision 8 2010-03-26 16:33:15
@@ -2,19 +2,19 @@
# Infrastructure: tasks and jobs.
-def task(function, *args):
- """Wrapps a task function (expected signature 'function(return_task, [args])'"""
- return function, args
+def task(function, *args, **kwargs):
+ """Wrapps a task (expected signature: 'def function(return_task, [args])'"""
+ return function, args, kwargs
def start_job(generator):
"""Start a job (a generator that yield tasks)."""
def _next(result):
def _callback():
try:
- fun, args = generator.send(result)
+ fun, args, kwargs = generator.send(result)
except StopIteration:
return
- fun(_next, *args)
+ fun(_next, *args, **kwargs)
# isolate the advance of the coroutine to another gobject loop
gobject.idle_add(_callback)
_next(None)
@@ -63,10 +63,9 @@
sys.stderr.write(".")
return True
-gobject.threads_init()
-start_job(myjob(1, 2))
-start_job(myjob(3, 4))
-start_job(myjob(5, 6))
+gobject.threads_init()
+for x in range(1, 11):
+ start_job(myjob(3*x, 7*x))
gobject.timeout_add(100, basso_continuo)
loop = gobject.MainLoop()
loop.run()