#!/usr/bin/env python # -*- coding: iso-8859-1 -*- ################################################################################ # # Decorator @threadmethod(sec), makes decorated method calls to always # execute in a separate new thread with a specified timeout, propagating # exceptions, as well as a result. # Dmitry Dvoinikov # # from threadmethod import * # # class NetworkedSomething(object): # @threadmethod(10.0) # def connect(self, host, port): # ... this could take long long time ... # # # the following call throws ThreadMethodTimeoutError upon a 10 sec. timeout # NetworkedSomething().connect("123.45.67.89", 1234). Similarly, # # @threadmethod() # def foo(): # ... # # makes foo() an async method, which just executes in a new separate thread # each time, but that thread is not waited for, it's just launched to execute # in parallel. Besides, in the latter case foo() returns a reference to the # created thread, so that it can be join()ed. # ################################################################################ __all__ = [ "threadmethod", "ThreadMethodTimeoutError" ] ################################################################################ class ThreadMethodTimeoutError(Exception): pass ################################################################################ from threading import Thread class ThreadMethodThread(Thread): "ThreadMethodThread, daemonic descendant class of threading.Thread which " \ "simply runs the specified target method with the specified arguments." def __init__(self, target, args, kwargs): Thread.__init__(self) self.setDaemon(True) self.target, self.args, self.kwargs = target, args, kwargs self.start() def run(self): try: self.result = self.target(*self.args, **self.kwargs) except Exception, e: self.exception = e except: self.exception = Exception() else: self.exception = None ################################################################################ def threadmethod(timeout = None): "@threadmethod(timeout), decorator function, returns a method wrapper " \ "which runs the wrapped method in a separate new thread." def threadmethod_proxy(method): if hasattr(method, "__name__"): method_name = method.__name__ else: method_name = "unknown" def threadmethod_invocation_proxy(*args, **kwargs): worker = ThreadMethodThread(method, args, kwargs) if timeout is None: return worker worker.join(timeout) if worker.isAlive(): raise ThreadMethodTimeoutError("A call to %s() has timed out" % method_name) elif worker.exception is not None: raise worker.exception else: return worker.result threadmethod_invocation_proxy.__name__ = method_name return threadmethod_invocation_proxy return threadmethod_proxy ################################################################################ if __name__ == "__main__": # run self-tests print "self-testing module threadmethod.py:" from threading import currentThread mainthread = currentThread() @threadmethod(5) def tryme(): assert currentThread() is not mainthread tryme() @threadmethod(5) def foo(a, b, c): return a + b + c assert foo(1, 2, 3) == 6 @threadmethod(5) def foo(*args): assert args == ("foo", ) return args[0] assert foo("foo") == "foo" @threadmethod(5) def foo(**kwargs): assert kwargs == { "foo" : "bar" } return kwargs["foo"] assert foo(foo = "bar") == "bar" @threadmethod(5) def foo(a, b, *args, **kwargs): assert a == 1 and b == "foo" and args == ("bar", ) and kwargs == { "biz" : "baz" } assert foo(1, "foo", "bar", biz = "baz") is None from time import sleep class bar(object): @threadmethod(3) def __init__(self, timeout): sleep(timeout) @threadmethod(1) def throw(self, e): raise e try: bar(5) except ThreadMethodTimeoutError: pass else: assert False, "Constructor should have timed out" try: bar(1).throw(IOError("fatal")) except IOError, e: assert str(e) == "fatal" else: assert False, "Expected IOError(\"fatal\")" x = 0 @threadmethod() def async(): global x sleep(0.25) x += 1 async() while x == 0: pass @threadmethod() def foo(): sleep(1.0) foo().join() print "ok" ################################################################################