#!/usr/bin/python
"""
Run asynchronous tasks in gobject using coroutines. Terminology used:
* Job: A coroutine that yield tasks.
* Task: A function which returns a callable whose only parameter
(task_return) is called with the result of the task.
Be aware that tasks themselves must be written in a asynchronous implementation,
as they are run in the main thread of the events loop. So odds are you'll
use functions like gobject.idle_add/timeout_add/io_add_watch. If you don't see
how to implement your task in a asynchronous way (or you just can't, i.e. a IO
operation), you can always use a generic threaded_task (see example below).
"""
import gobject
def start_job(generator):
"""Start a job (a coroutine that yield generic tasks)."""
def _task_return(result):
"""Function to be sent to tasks to be used as task_return."""
def _advance_generator():
try:
new_task = generator.send(result)
except StopIteration:
return
new_task(_task_return)
# make sure the generator is advanced in the main thread
gobject.idle_add(_advance_generator)
_task_return(None)
return generator
# 2 task examples: sleep_task, threaded_task
def sleep_task(secs):
"""Suspend job for the given number of seconds and return elapsed time."""
def _task(task_return):
start_time = time.time()
def _on_timeout():
task_return(time.time() - start_time)
gobject.timeout_add(int(secs * 1000), _on_timeout)
return _task
import threading
gobject.threads_init()
def threaded_task(function, *args, **kwargs):
"""Run function(*args, **kwargs) inside a thread and return the result."""
def _task(task_return):
def _thread():
result = function(*args, **kwargs)
gobject.idle_add(task_return, result)
thread = threading.Thread(target=_thread, args=())
thread.setDaemon(True)
thread.start()
return _task
# Example of usage
import sys
import time
import random
import urllib2
def myjob(url):
def download(url):
return urllib2.urlopen(url).read()
elapsed = yield sleep_task(random.uniform(0.0, 3.0))
sys.stderr.write("[slept_for:%0.2f]" % elapsed)
sys.stderr.write("[start_download:%s]" % url)
html = yield threaded_task(download, url)
sys.stderr.write("[done:%s:%d]" % (url, len(html)))
def basso_continuo():
sys.stderr.write(".")
return True
urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
jobs = [start_job(myjob(url)) for url in urls]
# See how easily can we raise a exception in the job couroutine:
# gobject.timeout_add(1000, lambda: jobs[0].throw(JobStopped))
gobject.timeout_add(100, basso_continuo)
loop = gobject.MainLoop()
loop.run()
Diff to Previous Revision
--- revision 18 2010-04-25 23:04:58
+++ revision 19 2010-08-06 16:09:08
@@ -1,22 +1,35 @@
#!/usr/bin/python
-"""Run asynchronous tasks with gobject and coroutines."""
+"""
+Run asynchronous tasks in gobject using coroutines. Terminology used:
+
+ * Job: A coroutine that yield tasks.
+ * Task: A function which returns a callable whose only parameter
+ (task_return) is called with the result of the task.
+
+Be aware that tasks themselves must be written in a asynchronous implementation,
+as they are run in the main thread of the events loop. So odds are you'll
+use functions like gobject.idle_add/timeout_add/io_add_watch. If you don't see
+how to implement your task in a asynchronous way (or you just can't, i.e. a IO
+operation), you can always use a generic threaded_task (see example below).
+"""
import gobject
def start_job(generator):
- """Start a job (a coroutine that yield tasks)."""
+ """Start a job (a coroutine that yield generic tasks)."""
def _task_return(result):
+ """Function to be sent to tasks to be used as task_return."""
def _advance_generator():
try:
new_task = generator.send(result)
except StopIteration:
return
new_task(_task_return)
- # isolate the advance of the coroutine
+ # make sure the generator is advanced in the main thread
gobject.idle_add(_advance_generator)
_task_return(None)
return generator
-# Task examples
+# 2 task examples: sleep_task, threaded_task
def sleep_task(secs):
"""Suspend job for the given number of seconds and return elapsed time."""
@@ -26,27 +39,22 @@
task_return(time.time() - start_time)
gobject.timeout_add(int(secs * 1000), _on_timeout)
return _task
-
-from threading import Thread
-from Queue import Queue
+
+import threading
+gobject.threads_init()
def threaded_task(function, *args, **kwargs):
- """Run function(*args, **kwargs) in a thread and return the value."""
+ """Run function(*args, **kwargs) inside a thread and return the result."""
def _task(task_return):
- def _thread(queue):
- queue.put(function(*args, **kwargs))
- def _manager(queue):
- if queue.empty():
- return True
- task_return(queue.get())
- queue = Queue()
- thread = Thread(target=_thread, args=(queue,))
+ def _thread():
+ result = function(*args, **kwargs)
+ gobject.idle_add(task_return, result)
+ thread = threading.Thread(target=_thread, args=())
thread.setDaemon(True)
thread.start()
- gobject.timeout_add(100, _manager, queue)
return _task
-###
+# Example of usage
import sys
import time
@@ -56,21 +64,22 @@
def myjob(url):
def download(url):
return urllib2.urlopen(url).read()
- sys.stderr.write("[sleep]")
elapsed = yield sleep_task(random.uniform(0.0, 3.0))
- sys.stderr.write("[sleep_done:%0.2f]" % elapsed)
- sys.stderr.write("[download:%s]" % url)
+ sys.stderr.write("[slept_for:%0.2f]" % elapsed)
+ sys.stderr.write("[start_download:%s]" % url)
html = yield threaded_task(download, url)
- sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))
+ sys.stderr.write("[done:%s:%d]" % (url, len(html)))
def basso_continuo():
sys.stderr.write(".")
return True
urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
-for url in urls:
- start_job(myjob(url))
+jobs = [start_job(myjob(url)) for url in urls]
+
+# See how easily can we raise a exception in the job couroutine:
+# gobject.timeout_add(1000, lambda: jobs[0].throw(JobStopped))
+
gobject.timeout_add(100, basso_continuo)
-gobject.threads_init() # only needed if you use threaded tasks
loop = gobject.MainLoop()
-loop.run() # or gtk.main()
+loop.run()