import gobject
# Infrastructure: tasks and jobs (a coroutine that yields tasks).
def task(fun):
"""Decorator for tasks."""
def _wrapper(*args, **kwargs):
return fun, args, kwargs
return _wrapper
def start_job(generator):
"""Start a job (a coroutine that yield tasks) and return the coroutine itself."""
def _next(result):
def _callback():
try:
fun, args, kwargs = generator.send(result)
except StopIteration:
return
fun(_next, *args, **kwargs)
# isolate advance of the coroutine
gobject.idle_add(_callback)
_next(None)
return generator
# Tasks: decorate with @task. Call the first argument with the result of the task.
@task
def sleep_task(task_return, seconds):
"""Wait some seconds and return."""
def _callback():
task_return(None)
gobject.timeout_add(int(seconds * 1000), _callback)
@task
def threaded_task(task_return, function, *args, **kwargs):
"""Run function(*args, **kwargs) in a thread and return the value."""
from Queue import Queue
from threading import Thread
def _thread(queue):
# this runs inside the thread, dare not touch anything but the queue
queue.put(function(*args, **kwargs))
def _receiver(queue):
if queue.empty():
return True
task_return(queue.get())
queue = Queue()
thread = Thread(target=_thread, args=(queue,))
thread.setDaemon(True)
thread.start()
gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds
# Example
import sys
import time
import random
import urllib2
def myjob(url):
def download(url):
return urllib2.urlopen(url).read()
yield sleep_task(random.uniform(0.0, 3.0)) # just to test the task
sys.stderr.write("[start:%s]" % url)
html = yield threaded_task(download, url)
sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))
def basso_continuo():
sys.stderr.write(".")
return True
urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
for url in urls:
print "myjob", start_job(myjob(url))
gobject.timeout_add(100, basso_continuo)
gobject.threads_init()
loop = gobject.MainLoop()
loop.run()
Diff to Previous Revision
--- revision 10 2010-03-27 18:51:34
+++ revision 11 2010-03-29 19:48:09
@@ -1,13 +1,15 @@
import gobject
-# Infrastructure: tasks and jobs.
+# Infrastructure: tasks and jobs (a coroutine that yields tasks).
-def task(fun, *args, **kwargs):
- """Wraps a task (expected signature: 'def fun(task_return, [args], [kwargs])'"""
- return fun, args, kwargs
+def task(fun):
+ """Decorator for tasks."""
+ def _wrapper(*args, **kwargs):
+ return fun, args, kwargs
+ return _wrapper
def start_job(generator):
- """Start a job (a generator that yield tasks)."""
+ """Start a job (a coroutine that yield tasks) and return the coroutine itself."""
def _next(result):
def _callback():
try:
@@ -15,24 +17,27 @@
except StopIteration:
return
fun(_next, *args, **kwargs)
- # isolate the advance of the coroutine to another gobject loop
+ # isolate advance of the coroutine
gobject.idle_add(_callback)
- _next(None)
+ _next(None)
+ return generator
-# Tasks: A task is a normal function, except for the first argument, a function
-# that must be called with the result of the task.
+# Tasks: decorate with @task. Call the first argument with the result of the task.
-def sleep(task_return, seconds):
- """Wait 'seconds' seconds. Returns nothing."""
+@task
+def sleep_task(task_return, seconds):
+ """Wait some seconds and return."""
def _callback():
task_return(None)
gobject.timeout_add(int(seconds * 1000), _callback)
-def threaded(task_return, function, *args, **kwargs):
+@task
+def threaded_task(task_return, function, *args, **kwargs):
"""Run function(*args, **kwargs) in a thread and return the value."""
from Queue import Queue
from threading import Thread
def _thread(queue):
+ # this runs inside the thread, dare not touch anything but the queue
queue.put(function(*args, **kwargs))
def _receiver(queue):
if queue.empty():
@@ -44,28 +49,29 @@
thread.start()
gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds
-# Job example
+# Example
import sys
import time
import random
+import urllib2
-def heavy_function(a, b):
- time.sleep(1.0)
- return a + b
-
-def myjob(a, b):
- yield task(sleep, random.uniform(1.0, 3.0))
- result = yield task(threaded, heavy_function, a, b)
- print result
+def myjob(url):
+ def download(url):
+ return urllib2.urlopen(url).read()
+ yield sleep_task(random.uniform(0.0, 3.0)) # just to test the task
+ sys.stderr.write("[start:%s]" % url)
+ html = yield threaded_task(download, url)
+ sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))
def basso_continuo():
sys.stderr.write(".")
return True
-
+
+urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
+for url in urls:
+ print "myjob", start_job(myjob(url))
+gobject.timeout_add(100, basso_continuo)
gobject.threads_init()
-for x in range(1, 11):
- start_job(myjob(3*x, 7*x))
-gobject.timeout_add(100, basso_continuo)
loop = gobject.MainLoop()
loop.run()