#!/usr/bin/python import time import functools from threading import Thread from Queue import Queue import gobject class Job: """Wrap a co-routines that yields asynchronous tasks (see Task class).""" def __init__(self, generator): self.generator = generator task = generator.send(None) self._start_task(task) def _start_task(self, task): return_cb = functools.partial(self._advance_task, "send") exception_cb = functools.partial(self._advance_task, "throw") task.config(return_cb, exception_cb) task.run() def _advance_task(self, method, result=None): assert method in ("send", "throw") try: task = getattr(self.generator, method)(result) except StopIteration: return self._start_task(task) class Task: """Base class for asynchronous tasks.""" def config(self, return_cb, exception_cb): """Set return and exception callbacks.""" self.return_cb = return_cb self.exception_cb = exception_cb def run(self): raise RuntimeError, "Run method must be overriden" class ThreadedTask(Task): """Run a function in a new thread and return its output.""" def __init__(self, fun, *args, **kwargs): self.function = (fun, args, kwargs) def run(self): """Start thread and set callback to get the result value.""" queue = Queue() thread = Thread(target=self._thread_manager, args=(self.function, queue)) thread.setDaemon(True) thread.start() self.source_id = gobject.timeout_add(50, self._thread_receiver, queue) def _thread_receiver(self, queue): if queue.empty(): return True rtype, rvalue = queue.get() if rtype == "return": self.return_cb(rvalue) else: self.exception_cb(rvalue) return False def _thread_manager(self, function, queue): fun, args, kwargs = function try: result = fun(*args, **kwargs) except Exception as exc: queue.put(("exception", exc)) raise queue.put(("return", result)) class SleepTask(Task): """Sleep for some time and return.""" def __init__(self, seconds): self.seconds = seconds def run(self): def _return(): self.return_cb() return False self.source_id = gobject.timeout_add(int(self.seconds * 1000), _return) # Example with PyGTK def heavy_function(x, y): time.sleep(1.0) return int(x) + int(y) def my_job(wait, a, b): print "wait", wait yield SleepTask(wait) print "do sequential heavy work in a thread" result1 = (yield ThreadedTask(heavy_function, a, b)) print "do more sequential work" result2 = (yield ThreadedTask(heavy_function, result1, b)) print "done! result of work was", result2 def other_async_work(): import sys sys.stderr.write(".") return True def on_start(button, entrya, entryb): import random job = Job(my_job(random.uniform(1, 5), entrya.get_text(), entryb.get_text())) print "start job", job def main(args): import gtk gobject.threads_init() window = gtk.Window() start = gtk.Button("start") window.connect("delete-event", lambda *args: gtk.main_quit()) box = gtk.VBox() entrya, entryb = gtk.Entry(), gtk.Entry() entrya.set_text("1") entryb.set_text("2") start.connect("clicked", on_start, entrya, entryb) for widget in (entrya, entryb, start): box.pack_start(widget) window.add(box) start.set_flags(gtk.CAN_DEFAULT) entrya.set_activates_default(True) entryb.set_activates_default(True) window.set_default(start) window.show_all() gobject.timeout_add(100, other_async_work) gtk.main() if __name__ == '__main__': import sys sys.exit(main(sys.argv[1:]))