Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/python
"""Run asynchronous tasks with gobject and coroutines."""
import gobject

def start_job(generator):
    """Start a job (a coroutine that yield tasks)."""
    def _task_return(result):
        def _advance_generator():
            try:
                new_task = generator.send(result)
            except StopIteration:
                return
            new_task(_task_return)
        # isolate the advance of the coroutine
        gobject.idle_add(_advance_generator)            
    _task_return(None)
    return generator

# Task examples

def sleep_task(secs):
    """Suspend job for the given number of seconds and return elapsed time."""
    def _task(task_return):
        start_time = time.time()
        def _on_timeout():
            task_return(time.time() - start_time)
        gobject.timeout_add(int(secs * 1000), _on_timeout)
    return _task

def threaded_task(function, *args, **kwargs):
    """Run function(*args, **kwargs) in a thread and return the value."""
    from Queue import Queue
    from threading import Thread
    def _task(task_return):
        def _thread(queue):
            queue.put(function(*args, **kwargs))
        def _manager(queue):
            if queue.empty():
                return True
            task_return(queue.get())
        queue = Queue()
        thread = Thread(target=_thread, args=(queue,))
        thread.setDaemon(True)
        thread.start()
        gobject.timeout_add(100, _manager, queue)
    return _task

###

import sys
import time
import random
import urllib2

def myjob(url):
    def download(url):
        return urllib2.urlopen(url).read()
    elapsed = yield sleep_task(random.uniform(0.0, 3.0))
    sys.stderr.write("[slept:%0.2f]" % elapsed)
    sys.stderr.write("[start:%s]" % url)
    html = yield threaded_task(download, url)
    sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))

def basso_continuo():
    sys.stderr.write(".")
    return True

urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
for url in urls:      
    sys.stderr.write("myjob: %s\n" % start_job(myjob(url)))    
gobject.timeout_add(100, basso_continuo)
gobject.threads_init()  # needed because we use threaded tasks
loop = gobject.MainLoop()
loop.run()

Diff to Previous Revision

--- revision 12 2010-03-29 21:56:25
+++ revision 13 2010-03-30 09:15:19
@@ -1,51 +1,49 @@
+#!/usr/bin/python
+"""Run asynchronous tasks with gobject and coroutines."""
 import gobject
 
-def task(fun):
-    """Decorator for tasks."""
-    def _wrapper(*args, **kwargs):
-        return fun, args, kwargs
-    return _wrapper
-
 def start_job(generator):
-    """Start a job (a coroutine that yield tasks) and return the coroutine itself."""
-    def _next(result):
-        def _callback():
+    """Start a job (a coroutine that yield tasks)."""
+    def _task_return(result):
+        def _advance_generator():
             try:
-                fun, args, kwargs = generator.send(result)
+                new_task = generator.send(result)
             except StopIteration:
                 return
-            fun(_next, *args, **kwargs)
-        # isolate advance of the coroutine
-        gobject.idle_add(_callback)            
-    _next(None)
+            new_task(_task_return)
+        # isolate the advance of the coroutine
+        gobject.idle_add(_advance_generator)            
+    _task_return(None)
     return generator
 
-# Example tasks
+# Task examples
 
-@task
-def sleep_task(task_return, seconds):
-    """Wait some seconds and return."""
-    def _callback():
-        task_return(None)
-    gobject.timeout_add(int(seconds * 1000), _callback)
+def sleep_task(secs):
+    """Suspend job for the given number of seconds and return elapsed time."""
+    def _task(task_return):
+        start_time = time.time()
+        def _on_timeout():
+            task_return(time.time() - start_time)
+        gobject.timeout_add(int(secs * 1000), _on_timeout)
+    return _task
 
-@task
-def threaded_task(task_return, function, *args, **kwargs):
+def threaded_task(function, *args, **kwargs):
     """Run function(*args, **kwargs) in a thread and return the value."""
     from Queue import Queue
     from threading import Thread
-    def _thread(queue):
-        # this runs inside the thread, dare not touch anything but the queue
-        queue.put(function(*args, **kwargs))
-    def _receiver(queue):
-        if queue.empty():
-            return True
-        task_return(queue.get())
-    queue = Queue()
-    thread = Thread(target=_thread, args=(queue,))
-    thread.setDaemon(True)
-    thread.start()
-    gobject.timeout_add(100, _receiver, queue) # poll queue every 0.1 seconds
+    def _task(task_return):
+        def _thread(queue):
+            queue.put(function(*args, **kwargs))
+        def _manager(queue):
+            if queue.empty():
+                return True
+            task_return(queue.get())
+        queue = Queue()
+        thread = Thread(target=_thread, args=(queue,))
+        thread.setDaemon(True)
+        thread.start()
+        gobject.timeout_add(100, _manager, queue)
+    return _task
 
 ###
 
@@ -57,7 +55,8 @@
 def myjob(url):
     def download(url):
         return urllib2.urlopen(url).read()
-    yield sleep_task(random.uniform(0.0, 3.0)) # just to test the task
+    elapsed = yield sleep_task(random.uniform(0.0, 3.0))
+    sys.stderr.write("[slept:%0.2f]" % elapsed)
     sys.stderr.write("[start:%s]" % url)
     html = yield threaded_task(download, url)
     sys.stderr.write("[HTML:%s:%d]" % (url, len(html)))
@@ -68,8 +67,8 @@
 
 urls = ["http://www.google.com", "http://python.com", "http://www.pygtk.org"]
 for url in urls:      
-    print "myjob", start_job(myjob(url))    
+    sys.stderr.write("myjob: %s\n" % start_job(myjob(url)))    
 gobject.timeout_add(100, basso_continuo)
-gobject.threads_init()
+gobject.threads_init()  # needed because we use threaded tasks
 loop = gobject.MainLoop()
 loop.run()

History