This recipe shows a simple, transparent (and hopefully pythonic) way of running asynchronous tasks when writing a event-driven application (i.e. GUI). The aim is to allow a programmer to write time-consuming functions (usually IO-bound, but not only) with sequential-looking code, instead of scattering the logic over a bunch of callbacks. We will take advantage of the coroutines introduced in Python 2.5 (see http://www.python.org/dev/peps/pep-0342).
The goal: wouldn't it be great if we could write something like this?
def myjob(entry, arg1, arg2, arg3): result1 = function_that_takes_eons_to_complete(arg1, arg2) result2 = another_function_that_downloads_a_big_really_big_file(result1, arg3) entry.set_text("The result is: %d" % result2) def on_start_button___clicked(button, entry): myjob(entry, 1, 2, 3) ... gtk.main()
Indeed, but we can't! The GUI will hang until the job is done and the user will be rightfully angry. Coroutines to the rescue: the absolute minimal change we can make to this code is transforming myjob into a coroutine and yield every time we do blocking stuff:
def myjob(entry, arg1, arg2, arg3): result1 = yield some_task(arg1, arg2) result2 = yield some_other_task(result1, arg3) entry.set_text("The result is: %d" % result2) def on_start__clicked(button, entry): start_job(myjob(entry, 1, 2, 3))
some_task and some_other_task are here the asynchronous implementation of the sequential tasks used in the first fragment, and start_job the wrapper around the coroutine. Note that we still have to implement non-blocking versions of the tasks, but they are usually pretty generic (wait some time, download a file, ...) and can be re-used. If you happen to have a CPU-bound function or even a IO-bound code you cannot split (urllib2 anyone?), you can always use a generic threaded task (granted, the whole point of using co-routines should be avoiding threads, but there is no alternative here).
At the end, all the plumbing we need to make it work is just 1 function: start_job (wrapper around the job to manage the flow of the coroutine). The rest of the code -two asynchronous tasks (sleep_task, threaded_task) and a demo app- are shown solely as an example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
#!/usr/bin/python """ Run asynchronous tasks in gobject using coroutines. Terminology used: * Job: A coroutine that yield tasks. * Task: A function which returns a callable whose only parameter (task_return) is called with the result of the task. Tasks themselves must be asynchronous (they are run in the main thread of the events loop), so you will probably use functions like gobject.idle_add/ timeout_add/io_add_watch to implement them. If you are unable to write your task in a asynchronous way (or you just can't, i.e. an IO operation), you can always use a generic threaded_task (see example below). """ import gobject def start_job(generator): """Start a job (a coroutine that yield generic tasks).""" def _task_return(result): """Function to be sent to tasks to be used as task_return.""" def _advance_generator(): try: new_task = generator.send(result) except StopIteration: return new_task(_task_return) # make sure the generator is advanced in the main thread gobject.idle_add(_advance_generator) _task_return(None) return generator # 2 task examples: sleep_task, threaded_task def sleep_task(secs): """Suspend job for the given number of seconds and return elapsed time.""" def _task(task_return): start_time = time.time() def _on_timeout(): task_return(time.time() - start_time) gobject.timeout_add(int(secs * 1000), _on_timeout) return _task import threading gobject.threads_init() def threaded_task(function, *args, **kwargs): """Run function(*args, **kwargs) inside a thread and return the result.""" def _task(task_return): def _thread(): result = function(*args, **kwargs) gobject.idle_add(task_return, result) thread = threading.Thread(target=_thread, args=()) thread.setDaemon(True) thread.start() return _task # Example of usage import sys import time import random import urllib2 def myjob(url): def download(url): return urllib2.urlopen(url).read() elapsed = yield sleep_task(random.uniform(0.0, 3.0)) sys.stderr.write("[slept_for:%0.2f]" % elapsed) sys.stderr.write("[start_download:%s]" % url) html = yield threaded_task(download, url) sys.stderr.write("[done:%s:%d]" % (url, len(html))) def basso_continuo(): sys.stderr.write(".") return True urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"] jobs = [start_job(myjob(url)) for url in urls] # See how easily can we raise a exception in the job couroutine: # gobject.timeout_add(1000, lambda: jobs.throw(JobStopped)) gobject.timeout_add(100, basso_continuo) loop = gobject.MainLoop() loop.run()
The first version of this recipe used classes, I hope this new approach is simplier to grasp.
The use of coroutines to run asynchronous tasks is anything but new, but my aim here is to present the idea in its simplest form focusing on GUI and event-based libraries. Note that while this code uses gobject/gtk, it should be straighforward to make it work in other event-based libraries.
This is a very simplified example; on the Real-World(TM) you'd like to pause/resume/cancel jobs and tasks, get their current state, do some exception management (to make sure a job receives a exception raised inside a task) and some other details. PySheng is an example of a real PyGTK application using a full-fledged version of this recipe (asyncjobs.py).
One serious drawbrack of using generator/coroutines is that you cannot easily refactor code. Check this recipe to see how to overcome the problem.
If you found this asynchronous/non-blocking/coroutines stuff interesting, you may want to check these links: David Beazley's "A Curious Course on Coroutines and Concurrency", Weightless, Cogen, Twisted inline callbacks, Eventlet, Gevent.
Feedback much appreciated.