Welcome, guest | Sign In | My Account | Store | Cart
import time
import functools
from threading import Thread
from Queue import Queue

import gobject

class Job:
    """Wrap a co-routines that yields asynchronous tasks (see Task class).""" 
    def __init__(self, generator):
        self.generator = generator
        self._advance_task(generator.send, None)
            
    def _start_task(self, task):
        return_cb = functools.partial(self._advance_task, self.generator.send)
        exception_cb = functools.partial(self._advance_task, self.generator.throw)
        task.config(return_cb, exception_cb)
        task.run()
    
    def _advance_task(self, genmethod, result=None):
        try:
            task = genmethod(result)
        except StopIteration:
            return
        self._start_task(task)                    

class TaskError(Exception):
    pass
                
class Task:
    """Base class for asynchronous tasks."""
    def config(self, return_cb, exception_cb):
        """Set return and exception callbacks."""
        self.return_cb = return_cb
        self.exception_cb = exception_cb

    def run(self):        
        raise RuntimeError, "Run method must be overriden"

# Tasks examples

class SleepTask(Task):
    """Sleep for some time and return."""
    def __init__(self, seconds):
        self.seconds = seconds
        
    def run(self):
        def _return():
            self.return_cb()
            return False
        self.source_id = gobject.timeout_add(int(self.seconds * 1000), _return)
    
class ThreadedTask(Task):
    """Run a function in a new thread and return its output."""               
    def __init__(self, fun, *args, **kwargs):
        self.function = (fun, args, kwargs)
                
    def run(self):
        """Start thread and set callback to get the result value."""
        queue = Queue()        
        thread = Thread(target=self._thread, args=(self.function, queue))
        thread.setDaemon(True)
        thread.start()
        self.source_id = gobject.timeout_add(50, self._queue_manager, thread, queue)
    
    def _queue_manager(self, thread, queue):
        if queue.empty():
            if not thread.isAlive():
                # Thread is not active and the queue is empty: something went wrong!
                self.exception_cb(TaskError)
                return False
            return True
        rtype, rvalue = queue.get()
        if rtype == "return":
            self.return_cb(rvalue)
        else:
            self.exception_cb(rvalue)
        return False

    def _thread(self, function, queue):
        fun, args, kwargs = function
        try:
            result = fun(*args, **kwargs)
        except Exception, exception:
            queue.put(("exception", exception))
            raise
        queue.put(("return", result))

# Usage example with PyGTK

def heavy_function(x, y):
    time.sleep(1.0)
    return int(x) + int(y)

def my_job(wait, a, b):
    sys.stderr.write("[W%0.1f]" % wait)
    yield SleepTask(wait)
    sys.stderr.write("[H1,%s,%s]" % (a, b))
    result1 = (yield ThreadedTask(heavy_function, a, b))
    sys.stderr.write("[H2,%s,%s]" % (result1, b))
    result2 = (yield ThreadedTask(heavy_function, result1, b))
    sys.stderr.write("[RES:%s]" % result2)
    
def other_async_work():
    import sys
    sys.stderr.write(".")
    return True

def on_start(button, entrya, entryb):
    import random
    job = Job(my_job(random.uniform(1, 5), entrya.get_text(), entryb.get_text()))
    sys.stderr.write("J%d" % id(job))

def main(args):
    import gtk
    gobject.threads_init()
    window = gtk.Window()
    start = gtk.Button("start")
    window.connect("delete-event", lambda *args: gtk.main_quit())
    box = gtk.VBox()
    entrya, entryb = gtk.Entry(), gtk.Entry()
    entrya.set_text("1")
    entryb.set_text("2")
    start.connect("clicked", on_start, entrya, entryb)
    for widget in (entrya, entryb, start):    
        box.pack_start(widget)
    window.add(box)
    start.set_flags(gtk.CAN_DEFAULT)
    entrya.set_activates_default(True)
    entryb.set_activates_default(True)
    window.set_default(start)
    window.show_all()
    
    gobject.timeout_add(100, other_async_work)
    gtk.main()

if __name__ == '__main__':
    import sys
    sys.exit(main(sys.argv[1:]))

Diff to Previous Revision

--- revision 4 2010-03-21 03:43:05
+++ revision 5 2010-03-25 17:40:44
@@ -1,4 +1,3 @@
-#!/usr/bin/python
 import time
 import functools
 from threading import Thread
@@ -24,6 +23,9 @@
         except StopIteration:
             return
         self._start_task(task)                    
+
+class TaskError(Exception):
+    pass
                 
 class Task:
     """Base class for asynchronous tasks."""
@@ -34,39 +36,9 @@
 
     def run(self):        
         raise RuntimeError, "Run method must be overriden"
-    
-class ThreadedTask(Task):
-    """Run a function in a new thread and return its output."""               
-    def __init__(self, fun, *args, **kwargs):
-        self.function = (fun, args, kwargs)
-                
-    def run(self):
-        """Start thread and set callback to get the result value."""
-        queue = Queue()        
-        thread = Thread(target=self._thread_manager, args=(self.function, queue))
-        thread.setDaemon(True)
-        thread.start()
-        self.source_id = gobject.timeout_add(50, self._thread_receiver, queue)
-    
-    def _thread_receiver(self, queue):
-        if queue.empty():
-            return True
-        rtype, rvalue = queue.get()
-        if rtype == "return":
-            self.return_cb(rvalue)
-        else:
-            self.exception_cb(rvalue)
-        return False
 
-    def _thread_manager(self, function, queue):
-        fun, args, kwargs = function
-        try:
-            result = fun(*args, **kwargs)
-        except Exception, exc:
-            queue.put(("exception", Exception(exc)))
-            raise
-        queue.put(("return", result))
-          
+# Tasks examples
+
 class SleepTask(Task):
     """Sleep for some time and return."""
     def __init__(self, seconds):
@@ -77,21 +49,57 @@
             self.return_cb()
             return False
         self.source_id = gobject.timeout_add(int(self.seconds * 1000), _return)
+    
+class ThreadedTask(Task):
+    """Run a function in a new thread and return its output."""               
+    def __init__(self, fun, *args, **kwargs):
+        self.function = (fun, args, kwargs)
+                
+    def run(self):
+        """Start thread and set callback to get the result value."""
+        queue = Queue()        
+        thread = Thread(target=self._thread, args=(self.function, queue))
+        thread.setDaemon(True)
+        thread.start()
+        self.source_id = gobject.timeout_add(50, self._queue_manager, thread, queue)
+    
+    def _queue_manager(self, thread, queue):
+        if queue.empty():
+            if not thread.isAlive():
+                # Thread is not active and the queue is empty: something went wrong!
+                self.exception_cb(TaskError)
+                return False
+            return True
+        rtype, rvalue = queue.get()
+        if rtype == "return":
+            self.return_cb(rvalue)
+        else:
+            self.exception_cb(rvalue)
+        return False
 
-# Example with PyGTK
+    def _thread(self, function, queue):
+        fun, args, kwargs = function
+        try:
+            result = fun(*args, **kwargs)
+        except Exception, exception:
+            queue.put(("exception", exception))
+            raise
+        queue.put(("return", result))
+
+# Usage example with PyGTK
 
 def heavy_function(x, y):
     time.sleep(1.0)
     return int(x) + int(y)
 
 def my_job(wait, a, b):
-    print "wait", wait
+    sys.stderr.write("[W%0.1f]" % wait)
     yield SleepTask(wait)
-    print "do sequential heavy work in a thread"
+    sys.stderr.write("[H1,%s,%s]" % (a, b))
     result1 = (yield ThreadedTask(heavy_function, a, b))
-    print "do more sequential work"
+    sys.stderr.write("[H2,%s,%s]" % (result1, b))
     result2 = (yield ThreadedTask(heavy_function, result1, b))
-    print "done! result of work was", result2
+    sys.stderr.write("[RES:%s]" % result2)
     
 def other_async_work():
     import sys
@@ -101,7 +109,7 @@
 def on_start(button, entrya, entryb):
     import random
     job = Job(my_job(random.uniform(1, 5), entrya.get_text(), entryb.get_text()))
-    print "start job", job
+    sys.stderr.write("J%d" % id(job))
 
 def main(args):
     import gtk

History