Synchronize/sequentialize sensitive parts of an asynchronous event solution with EventQueue. For example, sequentialize changes to a document buffer which may come from multiple threads.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | import threading
class EventQueue(threading.Thread):
    ''' queues function calls, saves their return values in self.return_values.
        >>> eq = EventQueue()    # start a thread watching self.queue
        >>> eq.push(slow_func); eq.push(slow_func, args); eq.push(slow_func, args, kwargs)
        # push some slow [but terminating!] function calls, which will be executed in the order in which they were pushed
        >>> eq.stop()    # stop the thread as soon as the current call returns, possibly preventing some calls from being executed.
    '''
    def __init__(self):
        threading.Thread.__init__(self)
        self.queue = []
        self.nap_time = .1
        self.start()
        self.n = 0
        self.serving = None
        self.keep_history = True
        self.return_values = {}    # maps numbers of calls to their return values; unused if not self.keep_history
    
    def error(self, error, function, a=(), kw=None):
        ''' called when function raises error.
        '''
        print >> sys.stderr, 'EventQueue event raised exception (', function, a, kw or {}, '):', error
    
    def push(self, function, a=(), kw=None):
        ''' queue the call to function, return the number of the call.
        '''
        self.queue.append( (function, a, kw or {}) )
        return self.n + len(self.queue)
    
    def stop(self):
        self.running = False
    
    def get(self, n):
        ''' Block until self.n >= n; return whatever the n-th call returned [assuming self.keep_history].
        '''
        while self.n < n:
            time.sleep(self.nap_time)
        return self.return_values.get(n, None)
    
    def run(self):
        ''' a blocking loop which continually calls functions as specified in self.queue.
        '''
        self.running = True
        while self.running:
            if len(self.queue) == 0:
                time.sleep(self.nap_time)
            else:
                function, a, kw = self.queue.pop(0)
                self.serving = (function, a, kw)
                try:
                    if self.keep_history:
                        self.return_values[self.n] = function(*a, **kw)
                    else:
                        function(*a, **kw)
                except Exception, error:
                    self.error(error, function, a, kw)
                self.n += 1
def queue_event(f):
    ''' decorator which queues method/function calls in
        self.eventqueue [if f is a method whose first argument is 'self'],
        otherwise f.eventqueue.
    '''
    args = inspect.getargspec(f)[0]
    if args and (args[0] == 'self'):
        def decorated(self, *a, **kw):
            self.eventqueue.push(f, (self,) + a, kw)
    else:
        f.eventqueue = EventQueue()
        def decorated(*a, **kw):
            f.eventqueue.push(f, a, kw)
    decorated.__name__ = f.__name__
    decorated.__doc__ = f.__doc__
    return decorated
 | 
I'm building a pure-python implementation of a generalization of libobby. http://gobby.0x539.de/index.html Google around for pysynob (Python Synchronized Objects) in a few weeks. ;-)
I know that list.append is atomic, so I assume there are no issues with EventQueue.push, though I have not tested it extensively.
It is likely that you will want to queue all calls to particular methods or functions, hence the decorator.

 Download
Download Copy to clipboard
Copy to clipboard