Welcome, guest | Sign In | My Account | Store | Cart
NOTE: Recipes have moved! Please visit GitHub.com/activestate/code for the current versions.

Makes it easier to execute async calls or deal with external systems calls to which can block forever or occassionally take long time to complete.

Python, 174 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
################################################################################
#
# Decorator @threadmethod(sec), makes decorated method calls to always 
# execute in a separate new thread with a specified timeout, propagating 
# exceptions, as well as a result. 
# Dmitry Dvoinikov <dmitry@targeted.org>
#
# from threadmethod import *
#
# class NetworkedSomething(object):
#     @threadmethod(10.0)
#     def connect(self, host, port):
#         ... this could take long long time ...
#    
# # the following call throws ThreadMethodTimeoutError upon a 10 sec. timeout
# NetworkedSomething().connect("123.45.67.89", 1234). Similarly, 
#
# @threadmethod()
# def foo():
#     ...
#
# makes foo() an async method, which just executes in a new separate thread
# each time, but that thread is not waited for, it's just launched to execute 
# in parallel. Besides, in the latter case foo() returns a reference to the
# created thread, so that it can be join()ed.
#
################################################################################

__all__ = [ "threadmethod", "ThreadMethodTimeoutError" ]

################################################################################

class ThreadMethodTimeoutError(Exception): pass

################################################################################

from threading import Thread

class ThreadMethodThread(Thread):
    "ThreadMethodThread, daemonic descendant class of threading.Thread which " \
    "simply runs the specified target method with the specified arguments."

    def __init__(self, target, args, kwargs):
        Thread.__init__(self)
        self.setDaemon(True)
        self.target, self.args, self.kwargs = target, args, kwargs
        self.start()

    def run(self):
        try:
            self.result = self.target(*self.args, **self.kwargs)
        except Exception, e:
            self.exception = e
        except:
            self.exception = Exception()
        else:
            self.exception = None

################################################################################

def threadmethod(timeout = None):
    "@threadmethod(timeout), decorator function, returns a method wrapper " \
    "which runs the wrapped method in a separate new thread."

    def threadmethod_proxy(method):
    
        if hasattr(method, "__name__"):
            method_name = method.__name__
        else:
            method_name = "unknown"

        def threadmethod_invocation_proxy(*args, **kwargs):
            worker = ThreadMethodThread(method, args, kwargs)
            if timeout is None:
                return worker
            worker.join(timeout)
            if worker.isAlive():
                raise ThreadMethodTimeoutError("A call to %s() has timed out" 
                                               % method_name)
            elif worker.exception is not None:
                raise worker.exception
            else:
                return worker.result

        threadmethod_invocation_proxy.__name__ = method_name

        return threadmethod_invocation_proxy

    return threadmethod_proxy

################################################################################

if __name__ == "__main__": # run self-tests

    print "self-testing module threadmethod.py:"

    from threading import currentThread

    mainthread = currentThread()
    @threadmethod(5)
    def tryme():
        assert currentThread() is not mainthread
    tryme()

    @threadmethod(5)
    def foo(a, b, c):
        return a + b + c
    assert foo(1, 2, 3) == 6

    @threadmethod(5)
    def foo(*args):
        assert args == ("foo", )
        return args[0]
    assert foo("foo") == "foo"

    @threadmethod(5)
    def foo(**kwargs):
        assert kwargs == { "foo" : "bar" }
        return kwargs["foo"]
    assert foo(foo = "bar") == "bar"

    @threadmethod(5)
    def foo(a, b, *args, **kwargs):
        assert a == 1 and b == "foo" and args == ("bar", ) and kwargs == { "biz" : "baz" }
    assert foo(1, "foo", "bar", biz = "baz") is None

    from time import sleep
    
    class bar(object):
        @threadmethod(3)
        def __init__(self, timeout):
            sleep(timeout)
        @threadmethod(1)
        def throw(self, e):
            raise e

    try:
        bar(5)
    except ThreadMethodTimeoutError:
        pass
    else:
        assert False, "Constructor should have timed out"

    try:
        bar(1).throw(IOError("fatal"))
    except IOError, e:
        assert str(e) == "fatal"
    else:
        assert False, "Expected IOError(\"fatal\")"

    x = 0

    @threadmethod()
    def async():
        global x
        sleep(0.25)
        x += 1

    async()

    while x == 0:
        pass

    @threadmethod()
    def foo():
        sleep(1.0)
        
    foo().join()

    print "ok"

################################################################################

Note that a separate thread is launched on each call, and is marked as daemonic. Therefore be careful as not to make thousands of calls and beware of threads dying in the middle of something when the application terminates. In short, this recipe should be used with conscious care.

2 comments

Michael Chermside 12 years ago  # | flag

Needs docstrings. Wonderful recipe, but please add docstrings.

Re: Needs docstrings. Added docstrings.