#!/usr/bin/python
"""Run asynchronous tasks with gobject and coroutines."""
import gobject
def start_job(generator):
"""Start a job (a coroutine that yield tasks)."""
def _task_return(result):
def _advance_generator():
try:
new_task = generator.send(result)
except StopIteration:
return
new_task(_task_return)
# isolate the advance of the coroutine
gobject.idle_add(_advance_generator)
_task_return(None)
return generator
# Task examples
def sleep_task(secs):
"""Suspend job for the given number of seconds and return elapsed time."""
def _task(task_return):
start_time = time.time()
def _on_timeout():
task_return(time.time() - start_time)
gobject.timeout_add(int(secs * 1000), _on_timeout)
return _task
def threaded_task(function, *args, **kwargs):
"""Run function(*args, **kwargs) in a thread and return the value."""
from Queue import Queue
from threading import Thread
def _task(task_return):
def _thread(queue):
queue.put(function(*args, **kwargs))
def _manager(queue):
if queue.empty():
return True
task_return(queue.get())
queue = Queue()
thread = Thread(target=_thread, args=(queue,))
thread.setDaemon(True)
thread.start()
gobject.timeout_add(100, _manager, queue)
return _task
###
import sys
import time
import random
import urllib2
def myjob(url):
def download(url):
return urllib2.urlopen(url).read()
elapsed = yield sleep_task(random.uniform(0.0, 3.0))
sys.stderr.write("[slept:%0.2f]" % elapsed)
sys.stderr.write("[start:%s]" % url)
html = yield threaded_task(download, url)
sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))
def basso_continuo():
sys.stderr.write(".")
return True
urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
for url in urls:
sys.stderr.write("myjob: %s\n" % start_job(myjob(url)))
gobject.timeout_add(100, basso_continuo)
gobject.threads_init() # needed because we use threaded tasks
loop = gobject.MainLoop()
loop.run()
Diff to Previous Revision
--- revision 12 2010-03-29 21:56:25
+++ revision 13 2010-03-30 09:15:19
@@ -1,51 +1,49 @@
+#!/usr/bin/python
+"""Run asynchronous tasks with gobject and coroutines."""
import gobject
-def task(fun):
- """Decorator for tasks."""
- def _wrapper(*args, **kwargs):
- return fun, args, kwargs
- return _wrapper
-
def start_job(generator):
- """Start a job (a coroutine that yield tasks) and return the coroutine itself."""
- def _next(result):
- def _callback():
+ """Start a job (a coroutine that yield tasks)."""
+ def _task_return(result):
+ def _advance_generator():
try:
- fun, args, kwargs = generator.send(result)
+ new_task = generator.send(result)
except StopIteration:
return
- fun(_next, *args, **kwargs)
- # isolate advance of the coroutine
- gobject.idle_add(_callback)
- _next(None)
+ new_task(_task_return)
+ # isolate the advance of the coroutine
+ gobject.idle_add(_advance_generator)
+ _task_return(None)
return generator
-# Example tasks
+# Task examples
-@task
-def sleep_task(task_return, seconds):
- """Wait some seconds and return."""
- def _callback():
- task_return(None)
- gobject.timeout_add(int(seconds * 1000), _callback)
+def sleep_task(secs):
+ """Suspend job for the given number of seconds and return elapsed time."""
+ def _task(task_return):
+ start_time = time.time()
+ def _on_timeout():
+ task_return(time.time() - start_time)
+ gobject.timeout_add(int(secs * 1000), _on_timeout)
+ return _task
-@task
-def threaded_task(task_return, function, *args, **kwargs):
+def threaded_task(function, *args, **kwargs):
"""Run function(*args, **kwargs) in a thread and return the value."""
from Queue import Queue
from threading import Thread
- def _thread(queue):
- # this runs inside the thread, dare not touch anything but the queue
- queue.put(function(*args, **kwargs))
- def _receiver(queue):
- if queue.empty():
- return True
- task_return(queue.get())
- queue = Queue()
- thread = Thread(target=_thread, args=(queue,))
- thread.setDaemon(True)
- thread.start()
- gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds
+ def _task(task_return):
+ def _thread(queue):
+ queue.put(function(*args, **kwargs))
+ def _manager(queue):
+ if queue.empty():
+ return True
+ task_return(queue.get())
+ queue = Queue()
+ thread = Thread(target=_thread, args=(queue,))
+ thread.setDaemon(True)
+ thread.start()
+ gobject.timeout_add(100, _manager, queue)
+ return _task
###
@@ -57,7 +55,8 @@
def myjob(url):
def download(url):
return urllib2.urlopen(url).read()
- yield sleep_task(random.uniform(0.0, 3.0)) # just to test the task
+ elapsed = yield sleep_task(random.uniform(0.0, 3.0))
+ sys.stderr.write("[slept:%0.2f]" % elapsed)
sys.stderr.write("[start:%s]" % url)
html = yield threaded_task(download, url)
sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))
@@ -68,8 +67,8 @@
urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
for url in urls:
- print "myjob", start_job(myjob(url))
+ sys.stderr.write("myjob: %s\n" % start_job(myjob(url)))
gobject.timeout_add(100, basso_continuo)
-gobject.threads_init()
+gobject.threads_init() # needed because we use threaded tasks
loop = gobject.MainLoop()
loop.run()