1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 | #!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
################################################################################
#
# Decorator @threadmethod(sec), makes decorated method calls to always
# execute in a separate new thread with a specified timeout, propagating
# exceptions, as well as a result.
# Dmitry Dvoinikov <dmitry@targeted.org>
#
# from threadmethod import *
#
# class NetworkedSomething(object):
# @threadmethod(10.0)
# def connect(self, host, port):
# ... this could take long long time ...
#
# # the following call throws ThreadMethodTimeoutError upon a 10 sec. timeout
# NetworkedSomething().connect("123.45.67.89", 1234). Similarly,
#
# @threadmethod()
# def foo():
# ...
#
# makes foo() an async method, which just executes in a new separate thread
# each time, but that thread is not waited for, it's just launched to execute
# in parallel. Besides, in the latter case foo() returns a reference to the
# created thread, so that it can be join()ed.
#
################################################################################
__all__ = [ "threadmethod", "ThreadMethodTimeoutError" ]
################################################################################
class ThreadMethodTimeoutError(Exception): pass
################################################################################
from threading import Thread
class ThreadMethodThread(Thread):
"ThreadMethodThread, daemonic descendant class of threading.Thread which " \
"simply runs the specified target method with the specified arguments."
def __init__(self, target, args, kwargs):
Thread.__init__(self)
self.setDaemon(True)
self.target, self.args, self.kwargs = target, args, kwargs
self.start()
def run(self):
try:
self.result = self.target(*self.args, **self.kwargs)
except Exception, e:
self.exception = e
except:
self.exception = Exception()
else:
self.exception = None
################################################################################
def threadmethod(timeout = None):
"@threadmethod(timeout), decorator function, returns a method wrapper " \
"which runs the wrapped method in a separate new thread."
def threadmethod_proxy(method):
if hasattr(method, "__name__"):
method_name = method.__name__
else:
method_name = "unknown"
def threadmethod_invocation_proxy(*args, **kwargs):
worker = ThreadMethodThread(method, args, kwargs)
if timeout is None:
return worker
worker.join(timeout)
if worker.isAlive():
raise ThreadMethodTimeoutError("A call to %s() has timed out"
% method_name)
elif worker.exception is not None:
raise worker.exception
else:
return worker.result
threadmethod_invocation_proxy.__name__ = method_name
return threadmethod_invocation_proxy
return threadmethod_proxy
################################################################################
if __name__ == "__main__": # run self-tests
print "self-testing module threadmethod.py:"
from threading import currentThread
mainthread = currentThread()
@threadmethod(5)
def tryme():
assert currentThread() is not mainthread
tryme()
@threadmethod(5)
def foo(a, b, c):
return a + b + c
assert foo(1, 2, 3) == 6
@threadmethod(5)
def foo(*args):
assert args == ("foo", )
return args[0]
assert foo("foo") == "foo"
@threadmethod(5)
def foo(**kwargs):
assert kwargs == { "foo" : "bar" }
return kwargs["foo"]
assert foo(foo = "bar") == "bar"
@threadmethod(5)
def foo(a, b, *args, **kwargs):
assert a == 1 and b == "foo" and args == ("bar", ) and kwargs == { "biz" : "baz" }
assert foo(1, "foo", "bar", biz = "baz") is None
from time import sleep
class bar(object):
@threadmethod(3)
def __init__(self, timeout):
sleep(timeout)
@threadmethod(1)
def throw(self, e):
raise e
try:
bar(5)
except ThreadMethodTimeoutError:
pass
else:
assert False, "Constructor should have timed out"
try:
bar(1).throw(IOError("fatal"))
except IOError, e:
assert str(e) == "fatal"
else:
assert False, "Expected IOError(\"fatal\")"
x = 0
@threadmethod()
def async():
global x
sleep(0.25)
x += 1
async()
while x == 0:
pass
@threadmethod()
def foo():
sleep(1.0)
foo().join()
print "ok"
################################################################################
|
Needs docstrings. Wonderful recipe, but please add docstrings.
Re: Needs docstrings. Added docstrings.