Welcome, guest | Sign In | My Account | Store | Cart
import gobject

# Infrastructure: tasks and jobs.

def task(function, *args, **kwargs):
    """Wrapps a task (expected signature: 'def function(return_task, [args])'"""
    return function, args, kwargs

def start_job(generator):
    """Start a job (a generator that yield tasks)."""
    def _next(result):
        def _callback():
            try:
                fun, args, kwargs = generator.send(result)
            except StopIteration:
                return
            fun(_next, *args, **kwargs)
        # isolate the advance of the coroutine to another gobject loop
        gobject.idle_add(_callback)            
    _next(None)    

# Tasks: A task is a normal function, except for the first argument, a function
#        that must be called with the result of the task.

def sleep(task_return, seconds):
    """Wait 'seconds' seconds. Returns nothing."""
    def _callback():
        task_return(None)
    gobject.timeout_add(int(seconds * 1000), _callback)

def threaded(task_return, function, *args, **kwargs):
    """Run function(*args, **kwargs) in a thread and return the value."""
    from Queue import Queue
    from threading import Thread
    def _thread(queue):
        x = function(*args, **kwargs)
        queue.put(x)
    def _receiver(queue):
        if queue.empty():
            return True
        task_return(queue.get())
    queue = Queue()
    thread = Thread(target=_thread, args=(queue,))
    thread.setDaemon(True)
    thread.start()
    gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds

# Job example

def heavy_function(a, b):
    import time
    time.sleep(1.0)
    return a + b
        
def myjob(a, b):
    import random
    yield task(sleep, random.uniform(1.0, 3.0))
    result = yield task(threaded, heavy_function, a, b)
    print result

def basso_continuo():
    import sys
    sys.stderr.write(".")
    return True
          
gobject.threads_init()
for x in range(1, 11):      
    start_job(myjob(3*x, 7*x))
gobject.timeout_add(100, basso_continuo)
loop = gobject.MainLoop()
loop.run()

Diff to Previous Revision

--- revision 7 2010-03-26 16:23:30
+++ revision 8 2010-03-26 16:33:15
@@ -2,19 +2,19 @@
 
 # Infrastructure: tasks and jobs.
 
-def task(function, *args):
-    """Wrapps a task function (expected signature 'function(return_task, [args])'"""
-    return function, args
+def task(function, *args, **kwargs):
+    """Wrapps a task (expected signature: 'def function(return_task, [args])'"""
+    return function, args, kwargs
 
 def start_job(generator):
     """Start a job (a generator that yield tasks)."""
     def _next(result):
         def _callback():
             try:
-                fun, args = generator.send(result)
+                fun, args, kwargs = generator.send(result)
             except StopIteration:
                 return
-            fun(_next, *args)
+            fun(_next, *args, **kwargs)
         # isolate the advance of the coroutine to another gobject loop
         gobject.idle_add(_callback)            
     _next(None)    
@@ -63,10 +63,9 @@
     sys.stderr.write(".")
     return True
           
-gobject.threads_init()      
-start_job(myjob(1, 2))
-start_job(myjob(3, 4))
-start_job(myjob(5, 6))
+gobject.threads_init()
+for x in range(1, 11):      
+    start_job(myjob(3*x, 7*x))
 gobject.timeout_add(100, basso_continuo)
 loop = gobject.MainLoop()
 loop.run()

History