Welcome, guest | Sign In | My Account | Store | Cart

Synchronize/sequentialize sensitive parts of an asynchronous event solution with EventQueue. For example, sequentialize changes to a document buffer which may come from multiple threads.

Python, 77 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import threading

class EventQueue(threading.Thread):
    ''' queues function calls, saves their return values in self.return_values.
        >>> eq = EventQueue()    # start a thread watching self.queue
        >>> eq.push(slow_func); eq.push(slow_func, args); eq.push(slow_func, args, kwargs)
        # push some slow [but terminating!] function calls, which will be executed in the order in which they were pushed
        >>> eq.stop()    # stop the thread as soon as the current call returns, possibly preventing some calls from being executed.
    '''
    def __init__(self):
        threading.Thread.__init__(self)
        self.queue = []
        self.nap_time = .1
        self.start()
        self.n = 0
        self.serving = None
        self.keep_history = True
        self.return_values = {}    # maps numbers of calls to their return values; unused if not self.keep_history
    
    def error(self, error, function, a=(), kw=None):
        ''' called when function raises error.
        '''
        print >> sys.stderr, 'EventQueue event raised exception (', function, a, kw or {}, '):', error
    
    def push(self, function, a=(), kw=None):
        ''' queue the call to function, return the number of the call.
        '''
        self.queue.append( (function, a, kw or {}) )
        return self.n + len(self.queue)
    
    def stop(self):
        self.running = False
    
    def get(self, n):
        ''' Block until self.n >= n; return whatever the n-th call returned [assuming self.keep_history].
        '''
        while self.n < n:
            time.sleep(self.nap_time)
        return self.return_values.get(n, None)
    
    def run(self):
        ''' a blocking loop which continually calls functions as specified in self.queue.
        '''
        self.running = True
        while self.running:
            if len(self.queue) == 0:
                time.sleep(self.nap_time)
            else:
                function, a, kw = self.queue.pop(0)
                self.serving = (function, a, kw)
                try:
                    if self.keep_history:
                        self.return_values[self.n] = function(*a, **kw)
                    else:
                        function(*a, **kw)
                except Exception, error:
                    self.error(error, function, a, kw)
                self.n += 1



def queue_event(f):
    ''' decorator which queues method/function calls in
        self.eventqueue [if f is a method whose first argument is 'self'],
        otherwise f.eventqueue.
    '''
    args = inspect.getargspec(f)[0]
    if args and (args[0] == 'self'):
        def decorated(self, *a, **kw):
            self.eventqueue.push(f, (self,) + a, kw)
    else:
        f.eventqueue = EventQueue()
        def decorated(*a, **kw):
            f.eventqueue.push(f, a, kw)
    decorated.__name__ = f.__name__
    decorated.__doc__ = f.__doc__
    return decorated

I'm building a pure-python implementation of a generalization of libobby. http://gobby.0x539.de/index.html Google around for pysynob (Python Synchronized Objects) in a few weeks. ;-)

I know that list.append is atomic, so I assume there are no issues with EventQueue.push, though I have not tested it extensively.

It is likely that you will want to queue all calls to particular methods or functions, hence the decorator.