Welcome, guest | Sign In | My Account | Store | Cart
import gobject

# Infrastructure: tasks and jobs.

def task(function, *args):
    """Wrapps a task function (expected signature 'function(return_task, [args])'"""
    return function, args

def start_job(generator):
    """Start a job (a generator that yield tasks)."""
    def _next(result):
        def _callback():
            try:
                fun, args = generator.send(result)
            except StopIteration:
                return
            fun(_next, *args)
        # isolate the advance of the coroutine to another gobject loop
        gobject.idle_add(_callback)            
    _next(None)    

# Tasks: A task is a normal function, except for the first argument, a function
#        that must be called with the result of the task.

def sleep(task_return, seconds):
    """Wait 'seconds' seconds. Returns nothing."""
    def callback():
        task_return(None)
    gobject.timeout_add(int(seconds * 1000), callback)

def threaded(task_return, function, *args, **kwargs):
    """Run function(*args, **kwargs) in a thread and return the value."""
    from Queue import Queue
    from threading import Thread
    def thread(queue):
        x = function(*args, **kwargs)
        queue.put(x)
    def receiver(queue):
        if queue.empty():
            return True
        task_return(queue.get())
    queue = Queue()
    thread = Thread(target=thread, args=(queue,))
    thread.setDaemon(True)
    thread.start()
    gobject.timeout_add(100, receiver, queue) # poll queue every 0.1 seconds

# Job example

def heavy_function(a, b):
    import time
    time.sleep(1.0)
    return a + b
        
def myjob(a, b):
    import random
    yield task(sleep, random.uniform(1.0, 3.0))
    result = yield task(threaded, heavy_function, a, b)
    print result

def basso_continuo():
    import sys
    sys.stderr.write(".")
    return True
          
gobject.threads_init()      
start_job(myjob(1, 2))
start_job(myjob(3, 4))
start_job(myjob(5, 6))
gobject.timeout_add(100, basso_continuo)
loop = gobject.MainLoop()
loop.run()

Diff to Previous Revision

--- revision 5 2010-03-25 17:40:44
+++ revision 6 2010-03-26 16:15:52
@@ -1,139 +1,72 @@
-import time
-import functools
-from threading import Thread
-from Queue import Queue
-
 import gobject
 
-class Job:
-    """Wrap a co-routines that yields asynchronous tasks (see Task class).""" 
-    def __init__(self, generator):
-        self.generator = generator
-        self._advance_task(generator.send, None)
-            
-    def _start_task(self, task):
-        return_cb = functools.partial(self._advance_task, self.generator.send)
-        exception_cb = functools.partial(self._advance_task, self.generator.throw)
-        task.config(return_cb, exception_cb)
-        task.run()
-    
-    def _advance_task(self, genmethod, result=None):
-        try:
-            task = genmethod(result)
-        except StopIteration:
-            return
-        self._start_task(task)                    
+# Infrastructure: tasks and jobs.
 
-class TaskError(Exception):
-    pass
-                
-class Task:
-    """Base class for asynchronous tasks."""
-    def config(self, return_cb, exception_cb):
-        """Set return and exception callbacks."""
-        self.return_cb = return_cb
-        self.exception_cb = exception_cb
+def task(function, *args):
+    """Wrapps a task function (expected signature 'function(return_task, [args])'"""
+    return function, args
 
-    def run(self):        
-        raise RuntimeError, "Run method must be overriden"
+def start_job(generator):
+    """Start a job (a generator that yield tasks)."""
+    def _next(result):
+        def _callback():
+            try:
+                fun, args = generator.send(result)
+            except StopIteration:
+                return
+            fun(_next, *args)
+        # isolate the advance of the coroutine to another gobject loop
+        gobject.idle_add(_callback)            
+    _next(None)    
 
-# Tasks examples
+# Tasks: A task is a normal function, except for the first argument, a function
+#        that must be called with the result of the task.
 
-class SleepTask(Task):
-    """Sleep for some time and return."""
-    def __init__(self, seconds):
-        self.seconds = seconds
+def sleep(task_return, seconds):
+    """Wait 'seconds' seconds. Returns nothing."""
+    def callback():
+        task_return(None)
+    gobject.timeout_add(int(seconds * 1000), callback)
+
+def threaded(task_return, function, *args, **kwargs):
+    """Run function(*args, **kwargs) in a thread and return the value."""
+    from Queue import Queue
+    from threading import Thread
+    def thread(queue):
+        x = function(*args, **kwargs)
+        queue.put(x)
+    def receiver(queue):
+        if queue.empty():
+            return True
+        task_return(queue.get())
+    queue = Queue()
+    thread = Thread(target=thread, args=(queue,))
+    thread.setDaemon(True)
+    thread.start()
+    gobject.timeout_add(100, receiver, queue) # poll queue every 0.1 seconds
+
+# Job example
+
+def heavy_function(a, b):
+    import time
+    time.sleep(1.0)
+    return a + b
         
-    def run(self):
-        def _return():
-            self.return_cb()
-            return False
-        self.source_id = gobject.timeout_add(int(self.seconds * 1000), _return)
-    
-class ThreadedTask(Task):
-    """Run a function in a new thread and return its output."""               
-    def __init__(self, fun, *args, **kwargs):
-        self.function = (fun, args, kwargs)
-                
-    def run(self):
-        """Start thread and set callback to get the result value."""
-        queue = Queue()        
-        thread = Thread(target=self._thread, args=(self.function, queue))
-        thread.setDaemon(True)
-        thread.start()
-        self.source_id = gobject.timeout_add(50, self._queue_manager, thread, queue)
-    
-    def _queue_manager(self, thread, queue):
-        if queue.empty():
-            if not thread.isAlive():
-                # Thread is not active and the queue is empty: something went wrong!
-                self.exception_cb(TaskError)
-                return False
-            return True
-        rtype, rvalue = queue.get()
-        if rtype == "return":
-            self.return_cb(rvalue)
-        else:
-            self.exception_cb(rvalue)
-        return False
+def myjob(a, b):
+    import random
+    yield task(sleep, random.uniform(1.0, 3.0))
+    result = yield task(threaded, heavy_function, a, b)
+    print result
 
-    def _thread(self, function, queue):
-        fun, args, kwargs = function
-        try:
-            result = fun(*args, **kwargs)
-        except Exception, exception:
-            queue.put(("exception", exception))
-            raise
-        queue.put(("return", result))
-
-# Usage example with PyGTK
-
-def heavy_function(x, y):
-    time.sleep(1.0)
-    return int(x) + int(y)
-
-def my_job(wait, a, b):
-    sys.stderr.write("[W%0.1f]" % wait)
-    yield SleepTask(wait)
-    sys.stderr.write("[H1,%s,%s]" % (a, b))
-    result1 = (yield ThreadedTask(heavy_function, a, b))
-    sys.stderr.write("[H2,%s,%s]" % (result1, b))
-    result2 = (yield ThreadedTask(heavy_function, result1, b))
-    sys.stderr.write("[RES:%s]" % result2)
-    
-def other_async_work():
+def basso_continuo():
     import sys
     sys.stderr.write(".")
     return True
-
-def on_start(button, entrya, entryb):
-    import random
-    job = Job(my_job(random.uniform(1, 5), entrya.get_text(), entryb.get_text()))
-    sys.stderr.write("J%d" % id(job))
-
-def main(args):
-    import gtk
-    gobject.threads_init()
-    window = gtk.Window()
-    start = gtk.Button("start")
-    window.connect("delete-event", lambda *args: gtk.main_quit())
-    box = gtk.VBox()
-    entrya, entryb = gtk.Entry(), gtk.Entry()
-    entrya.set_text("1")
-    entryb.set_text("2")
-    start.connect("clicked", on_start, entrya, entryb)
-    for widget in (entrya, entryb, start):    
-        box.pack_start(widget)
-    window.add(box)
-    start.set_flags(gtk.CAN_DEFAULT)
-    entrya.set_activates_default(True)
-    entryb.set_activates_default(True)
-    window.set_default(start)
-    window.show_all()
-    
-    gobject.timeout_add(100, other_async_work)
-    gtk.main()
-
-if __name__ == '__main__':
-    import sys
-    sys.exit(main(sys.argv[1:]))
+          
+gobject.threads_init()      
+start_job(myjob(1, 2))
+start_job(myjob(3, 4))
+start_job(myjob(5, 6))
+gobject.timeout_add(100, basso_continuo)
+loop = gobject.MainLoop()
+loop.run()

History