import gobject
# Infrastructure: tasks and jobs.
def task(function, *args):
"""Wrapps a task function (expected signature 'function(return_task, [args])'"""
return function, args
def start_job(generator):
"""Start a job (a generator that yield tasks)."""
def _next(result):
def _callback():
try:
fun, args = generator.send(result)
except StopIteration:
return
fun(_next, *args)
# isolate the advance of the coroutine to another gobject loop
gobject.idle_add(_callback)
_next(None)
# Tasks: A task is a normal function, except for the first argument, a function
# that must be called with the result of the task.
def sleep(task_return, seconds):
"""Wait 'seconds' seconds. Returns nothing."""
def callback():
task_return(None)
gobject.timeout_add(int(seconds * 1000), callback)
def threaded(task_return, function, *args, **kwargs):
"""Run function(*args, **kwargs) in a thread and return the value."""
from Queue import Queue
from threading import Thread
def thread(queue):
x = function(*args, **kwargs)
queue.put(x)
def receiver(queue):
if queue.empty():
return True
task_return(queue.get())
queue = Queue()
thread = Thread(target=thread, args=(queue,))
thread.setDaemon(True)
thread.start()
gobject.timeout_add(100, receiver, queue) # poll queue every 0.1 seconds
# Job example
def heavy_function(a, b):
import time
time.sleep(1.0)
return a + b
def myjob(a, b):
import random
yield task(sleep, random.uniform(1.0, 3.0))
result = yield task(threaded, heavy_function, a, b)
print result
def basso_continuo():
import sys
sys.stderr.write(".")
return True
gobject.threads_init()
start_job(myjob(1, 2))
start_job(myjob(3, 4))
start_job(myjob(5, 6))
gobject.timeout_add(100, basso_continuo)
loop = gobject.MainLoop()
loop.run()
Diff to Previous Revision
--- revision 5 2010-03-25 17:40:44
+++ revision 6 2010-03-26 16:15:52
@@ -1,139 +1,72 @@
-import time
-import functools
-from threading import Thread
-from Queue import Queue
-
import gobject
-class Job:
- """Wrap a co-routines that yields asynchronous tasks (see Task class)."""
- def __init__(self, generator):
- self.generator = generator
- self._advance_task(generator.send, None)
-
- def _start_task(self, task):
- return_cb = functools.partial(self._advance_task, self.generator.send)
- exception_cb = functools.partial(self._advance_task, self.generator.throw)
- task.config(return_cb, exception_cb)
- task.run()
-
- def _advance_task(self, genmethod, result=None):
- try:
- task = genmethod(result)
- except StopIteration:
- return
- self._start_task(task)
+# Infrastructure: tasks and jobs.
-class TaskError(Exception):
- pass
-
-class Task:
- """Base class for asynchronous tasks."""
- def config(self, return_cb, exception_cb):
- """Set return and exception callbacks."""
- self.return_cb = return_cb
- self.exception_cb = exception_cb
+def task(function, *args):
+ """Wrapps a task function (expected signature 'function(return_task, [args])'"""
+ return function, args
- def run(self):
- raise RuntimeError, "Run method must be overriden"
+def start_job(generator):
+ """Start a job (a generator that yield tasks)."""
+ def _next(result):
+ def _callback():
+ try:
+ fun, args = generator.send(result)
+ except StopIteration:
+ return
+ fun(_next, *args)
+ # isolate the advance of the coroutine to another gobject loop
+ gobject.idle_add(_callback)
+ _next(None)
-# Tasks examples
+# Tasks: A task is a normal function, except for the first argument, a function
+# that must be called with the result of the task.
-class SleepTask(Task):
- """Sleep for some time and return."""
- def __init__(self, seconds):
- self.seconds = seconds
+def sleep(task_return, seconds):
+ """Wait 'seconds' seconds. Returns nothing."""
+ def callback():
+ task_return(None)
+ gobject.timeout_add(int(seconds * 1000), callback)
+
+def threaded(task_return, function, *args, **kwargs):
+ """Run function(*args, **kwargs) in a thread and return the value."""
+ from Queue import Queue
+ from threading import Thread
+ def thread(queue):
+ x = function(*args, **kwargs)
+ queue.put(x)
+ def receiver(queue):
+ if queue.empty():
+ return True
+ task_return(queue.get())
+ queue = Queue()
+ thread = Thread(target=thread, args=(queue,))
+ thread.setDaemon(True)
+ thread.start()
+ gobject.timeout_add(100, receiver, queue) # poll queue every 0.1 seconds
+
+# Job example
+
+def heavy_function(a, b):
+ import time
+ time.sleep(1.0)
+ return a + b
- def run(self):
- def _return():
- self.return_cb()
- return False
- self.source_id = gobject.timeout_add(int(self.seconds * 1000), _return)
-
-class ThreadedTask(Task):
- """Run a function in a new thread and return its output."""
- def __init__(self, fun, *args, **kwargs):
- self.function = (fun, args, kwargs)
-
- def run(self):
- """Start thread and set callback to get the result value."""
- queue = Queue()
- thread = Thread(target=self._thread, args=(self.function, queue))
- thread.setDaemon(True)
- thread.start()
- self.source_id = gobject.timeout_add(50, self._queue_manager, thread, queue)
-
- def _queue_manager(self, thread, queue):
- if queue.empty():
- if not thread.isAlive():
- # Thread is not active and the queue is empty: something went wrong!
- self.exception_cb(TaskError)
- return False
- return True
- rtype, rvalue = queue.get()
- if rtype == "return":
- self.return_cb(rvalue)
- else:
- self.exception_cb(rvalue)
- return False
+def myjob(a, b):
+ import random
+ yield task(sleep, random.uniform(1.0, 3.0))
+ result = yield task(threaded, heavy_function, a, b)
+ print result
- def _thread(self, function, queue):
- fun, args, kwargs = function
- try:
- result = fun(*args, **kwargs)
- except Exception, exception:
- queue.put(("exception", exception))
- raise
- queue.put(("return", result))
-
-# Usage example with PyGTK
-
-def heavy_function(x, y):
- time.sleep(1.0)
- return int(x) + int(y)
-
-def my_job(wait, a, b):
- sys.stderr.write("[W%0.1f]" % wait)
- yield SleepTask(wait)
- sys.stderr.write("[H1,%s,%s]" % (a, b))
- result1 = (yield ThreadedTask(heavy_function, a, b))
- sys.stderr.write("[H2,%s,%s]" % (result1, b))
- result2 = (yield ThreadedTask(heavy_function, result1, b))
- sys.stderr.write("[RES:%s]" % result2)
-
-def other_async_work():
+def basso_continuo():
import sys
sys.stderr.write(".")
return True
-
-def on_start(button, entrya, entryb):
- import random
- job = Job(my_job(random.uniform(1, 5), entrya.get_text(), entryb.get_text()))
- sys.stderr.write("J%d" % id(job))
-
-def main(args):
- import gtk
- gobject.threads_init()
- window = gtk.Window()
- start = gtk.Button("start")
- window.connect("delete-event", lambda *args: gtk.main_quit())
- box = gtk.VBox()
- entrya, entryb = gtk.Entry(), gtk.Entry()
- entrya.set_text("1")
- entryb.set_text("2")
- start.connect("clicked", on_start, entrya, entryb)
- for widget in (entrya, entryb, start):
- box.pack_start(widget)
- window.add(box)
- start.set_flags(gtk.CAN_DEFAULT)
- entrya.set_activates_default(True)
- entryb.set_activates_default(True)
- window.set_default(start)
- window.show_all()
-
- gobject.timeout_add(100, other_async_work)
- gtk.main()
-
-if __name__ == '__main__':
- import sys
- sys.exit(main(sys.argv[1:]))
+
+gobject.threads_init()
+start_job(myjob(1, 2))
+start_job(myjob(3, 4))
+start_job(myjob(5, 6))
+gobject.timeout_add(100, basso_continuo)
+loop = gobject.MainLoop()
+loop.run()