Welcome, guest | Sign In | My Account | Store | Cart
import gobject

# Infrastructure: tasks and jobs (a coroutine that yields tasks).

def task(fun):
    """Decorator for tasks."""
    def _wrapper(*args, **kwargs):
        return fun, args, kwargs
    return _wrapper

def start_job(generator):
    """Start a job (a coroutine that yield tasks) and return the coroutine itself."""
    def _next(result):
        def _callback():
            try:
                fun, args, kwargs = generator.send(result)
            except StopIteration:
                return
            fun(_next, *args, **kwargs)
        # isolate advance of the coroutine
        gobject.idle_add(_callback)            
    _next(None)
    return generator

# Tasks: decorate with @task. Call the first argument with the result of the task.

@task
def sleep_task(task_return, seconds):
    """Wait some seconds and return."""
    def _callback():
        task_return(None)
    gobject.timeout_add(int(seconds * 1000), _callback)

@task
def threaded_task(task_return, function, *args, **kwargs):
    """Run function(*args, **kwargs) in a thread and return the value."""
    from Queue import Queue
    from threading import Thread
    def _thread(queue):
        # this runs inside the thread, dare not touch anything but the queue
        queue.put(function(*args, **kwargs))
    def _receiver(queue):
        if queue.empty():
            return True
        task_return(queue.get())
    queue = Queue()
    thread = Thread(target=_thread, args=(queue,))
    thread.setDaemon(True)
    thread.start()
    gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds

# Example

import sys
import time
import random
import urllib2

def myjob(url):
    def download(url):
        return urllib2.urlopen(url).read()
    yield sleep_task(random.uniform(0.0, 3.0)) # just to test the task
    sys.stderr.write("[start:%s]" % url)
    html = yield threaded_task(download, url)
    sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))

def basso_continuo():
    sys.stderr.write(".")
    return True

urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
for url in urls:      
    print "myjob", start_job(myjob(url))    
gobject.timeout_add(100, basso_continuo)
gobject.threads_init()
loop = gobject.MainLoop()
loop.run()

Diff to Previous Revision

--- revision 10 2010-03-27 18:51:34
+++ revision 11 2010-03-29 19:48:09
@@ -1,13 +1,15 @@
 import gobject
 
-# Infrastructure: tasks and jobs.
+# Infrastructure: tasks and jobs (a coroutine that yields tasks).
 
-def task(fun, *args, **kwargs):
-    """Wraps a task (expected signature: 'def fun(task_return, [args], [kwargs])'"""
-    return fun, args, kwargs
+def task(fun):
+    """Decorator for tasks."""
+    def _wrapper(*args, **kwargs):
+        return fun, args, kwargs
+    return _wrapper
 
 def start_job(generator):
-    """Start a job (a generator that yield tasks)."""
+    """Start a job (a coroutine that yield tasks) and return the coroutine itself."""
     def _next(result):
         def _callback():
             try:
@@ -15,24 +17,27 @@
             except StopIteration:
                 return
             fun(_next, *args, **kwargs)
-        # isolate the advance of the coroutine to another gobject loop
+        # isolate advance of the coroutine
         gobject.idle_add(_callback)            
-    _next(None)    
+    _next(None)
+    return generator
 
-# Tasks: A task is a normal function, except for the first argument, a function
-#        that must be called with the result of the task.
+# Tasks: decorate with @task. Call the first argument with the result of the task.
 
-def sleep(task_return, seconds):
-    """Wait 'seconds' seconds. Returns nothing."""
+@task
+def sleep_task(task_return, seconds):
+    """Wait some seconds and return."""
     def _callback():
         task_return(None)
     gobject.timeout_add(int(seconds * 1000), _callback)
 
-def threaded(task_return, function, *args, **kwargs):
+@task
+def threaded_task(task_return, function, *args, **kwargs):
     """Run function(*args, **kwargs) in a thread and return the value."""
     from Queue import Queue
     from threading import Thread
     def _thread(queue):
+        # this runs inside the thread, dare not touch anything but the queue
         queue.put(function(*args, **kwargs))
     def _receiver(queue):
         if queue.empty():
@@ -44,28 +49,29 @@
     thread.start()
     gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds
 
-# Job example
+# Example
 
 import sys
 import time
 import random
+import urllib2
 
-def heavy_function(a, b):
-    time.sleep(1.0)
-    return a + b
-        
-def myjob(a, b):
-    yield task(sleep, random.uniform(1.0, 3.0))
-    result = yield task(threaded, heavy_function, a, b)
-    print result
+def myjob(url):
+    def download(url):
+        return urllib2.urlopen(url).read()
+    yield sleep_task(random.uniform(0.0, 3.0)) # just to test the task
+    sys.stderr.write("[start:%s]" % url)
+    html = yield threaded_task(download, url)
+    sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))
 
 def basso_continuo():
     sys.stderr.write(".")
     return True
-          
+
+urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
+for url in urls:      
+    print "myjob", start_job(myjob(url))    
+gobject.timeout_add(100, basso_continuo)
 gobject.threads_init()
-for x in range(1, 11):      
-    start_job(myjob(3*x, 7*x))
-gobject.timeout_add(100, basso_continuo)
 loop = gobject.MainLoop()
 loop.run()

History