Makes it easier to execute async calls or deal with external systems calls to which can block forever or occassionally take long time to complete.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | #!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
################################################################################
#
# Decorator @threadmethod(sec), makes decorated method calls to always
# execute in a separate new thread with a specified timeout, propagating
# exceptions, as well as a result.
# Dmitry Dvoinikov <dmitry@targeted.org>
#
# from threadmethod import *
#
# class NetworkedSomething(object):
# @threadmethod(10.0)
# def connect(self, host, port):
# ... this could take long long time ...
#
# # the following call throws ThreadMethodTimeoutError upon a 10 sec. timeout
# NetworkedSomething().connect("123.45.67.89", 1234). Similarly,
#
# @threadmethod()
# def foo():
# ...
#
# makes foo() an async method, which just executes in a new separate thread
# each time, but that thread is not waited for, it's just launched to execute
# in parallel. Besides, in the latter case foo() returns a reference to the
# created thread, so that it can be join()ed.
#
################################################################################
__all__ = [ "threadmethod", "ThreadMethodTimeoutError" ]
################################################################################
class ThreadMethodTimeoutError(Exception): pass
################################################################################
from threading import Thread
class ThreadMethodThread(Thread):
"ThreadMethodThread, daemonic descendant class of threading.Thread which " \
"simply runs the specified target method with the specified arguments."
def __init__(self, target, args, kwargs):
Thread.__init__(self)
self.setDaemon(True)
self.target, self.args, self.kwargs = target, args, kwargs
self.start()
def run(self):
try:
self.result = self.target(*self.args, **self.kwargs)
except Exception, e:
self.exception = e
except:
self.exception = Exception()
else:
self.exception = None
################################################################################
def threadmethod(timeout = None):
"@threadmethod(timeout), decorator function, returns a method wrapper " \
"which runs the wrapped method in a separate new thread."
def threadmethod_proxy(method):
if hasattr(method, "__name__"):
method_name = method.__name__
else:
method_name = "unknown"
def threadmethod_invocation_proxy(*args, **kwargs):
worker = ThreadMethodThread(method, args, kwargs)
if timeout is None:
return worker
worker.join(timeout)
if worker.isAlive():
raise ThreadMethodTimeoutError("A call to %s() has timed out"
% method_name)
elif worker.exception is not None:
raise worker.exception
else:
return worker.result
threadmethod_invocation_proxy.__name__ = method_name
return threadmethod_invocation_proxy
return threadmethod_proxy
################################################################################
if __name__ == "__main__": # run self-tests
print "self-testing module threadmethod.py:"
from threading import currentThread
mainthread = currentThread()
@threadmethod(5)
def tryme():
assert currentThread() is not mainthread
tryme()
@threadmethod(5)
def foo(a, b, c):
return a + b + c
assert foo(1, 2, 3) == 6
@threadmethod(5)
def foo(*args):
assert args == ("foo", )
return args[0]
assert foo("foo") == "foo"
@threadmethod(5)
def foo(**kwargs):
assert kwargs == { "foo" : "bar" }
return kwargs["foo"]
assert foo(foo = "bar") == "bar"
@threadmethod(5)
def foo(a, b, *args, **kwargs):
assert a == 1 and b == "foo" and args == ("bar", ) and kwargs == { "biz" : "baz" }
assert foo(1, "foo", "bar", biz = "baz") is None
from time import sleep
class bar(object):
@threadmethod(3)
def __init__(self, timeout):
sleep(timeout)
@threadmethod(1)
def throw(self, e):
raise e
try:
bar(5)
except ThreadMethodTimeoutError:
pass
else:
assert False, "Constructor should have timed out"
try:
bar(1).throw(IOError("fatal"))
except IOError, e:
assert str(e) == "fatal"
else:
assert False, "Expected IOError(\"fatal\")"
x = 0
@threadmethod()
def async():
global x
sleep(0.25)
x += 1
async()
while x == 0:
pass
@threadmethod()
def foo():
sleep(1.0)
foo().join()
print "ok"
################################################################################
|
Note that a separate thread is launched on each call, and is marked as daemonic. Therefore be careful as not to make thousands of calls and beware of threads dying in the middle of something when the application terminates. In short, this recipe should be used with conscious care.
Needs docstrings. Wonderful recipe, but please add docstrings.
Re: Needs docstrings. Added docstrings.