Welcome, guest | Sign In | My Account | Store | Cart

An accurate, simple event scheduler using blocking and event notification features available in the threading module.

Python, 45 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import thread
import threading

class Operation(threading._Timer):
    def __init__(self, *args, **kwargs):
        threading._Timer.__init__(self, *args, **kwargs)
        self.setDaemon(True)

    def run(self):
        while True:
            self.finished.clear()
            self.finished.wait(self.interval)
            if not self.finished.isSet():
                self.function(*self.args, **self.kwargs)
            else:
                return
            self.finished.set()

class Manager(object):

    ops = []

    def add_operation(self, operation, interval, args=[], kwargs={}):
        op = Operation(interval, operation, args, kwargs)
        self.ops.append(op)
        thread.start_new_thread(op.run, ())

    def stop(self):
        for op in self.ops:
            op.cancel()
        self._event.set()

if __name__ == '__main__':
    # Print "Hello World!" every 5 seconds
    
    import time

    def hello():
        print "Hello World!"

    timer = Manager()
    timer.add_operation(hello, 5)

    while True:
        time.sleep(.1)

This approach uses blocking features in the threading module (which the threading._Timer class utilizes) to time a loop that calls the desired function. The manager just keeps track of the Operations with a quick index, but if you're turning on and off operations regularly, it's easy to associate a name with each operation and cancel them independently of each other.

A daemon thread is launched for each desired operation.

A different approach (by Simon Foster) here: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/114644, although Mr. Foster's is very similar to using the sched module now.

1 comment

Cameron Christensen 15 years, 7 months ago  # | flag

Hi James,

I really appreciate this succinct example of a timed event class. It works in my code with only a single modification: I had to remove the 'self._event.set()' line from the stop() function of Manager. What is _event supposed to be referring to here?

Thanks,

Cameron