Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
################################################################################
#
# Decorator @threadmethod(sec), makes decorated method calls to always 
# execute in a separate new thread with a specified timeout, propagating 
# exceptions, as well as a result. 
# Dmitry Dvoinikov <dmitry@targeted.org>
#
# from threadmethod import *
#
# class NetworkedSomething(object):
#     @threadmethod(10.0)
#     def connect(self, host, port):
#         ... this could take long long time ...
#    
# # the following call throws ThreadMethodTimeoutError upon a 10 sec. timeout
# NetworkedSomething().connect("123.45.67.89", 1234). Similarly, 
#
# @threadmethod()
# def foo():
#     ...
#
# makes foo() an async method, which just executes in a new separate thread
# each time, but that thread is not waited for, it's just launched to execute 
# in parallel. Besides, in the latter case foo() returns a reference to the
# created thread, so that it can be join()ed.
#
################################################################################

__all__ = [ "threadmethod", "ThreadMethodTimeoutError" ]

################################################################################

class ThreadMethodTimeoutError(Exception): pass

################################################################################

from threading import Thread

class ThreadMethodThread(Thread):
    "ThreadMethodThread, daemonic descendant class of threading.Thread which " \
    "simply runs the specified target method with the specified arguments."

    def __init__(self, target, args, kwargs):
        Thread.__init__(self)
        self.setDaemon(True)
        self.target, self.args, self.kwargs = target, args, kwargs
        self.start()

    def run(self):
        try:
            self.result = self.target(*self.args, **self.kwargs)
        except Exception, e:
            self.exception = e
        except:
            self.exception = Exception()
        else:
            self.exception = None

################################################################################

def threadmethod(timeout = None):
    "@threadmethod(timeout), decorator function, returns a method wrapper " \
    "which runs the wrapped method in a separate new thread."

    def threadmethod_proxy(method):
    
        if hasattr(method, "__name__"):
            method_name = method.__name__
        else:
            method_name = "unknown"

        def threadmethod_invocation_proxy(*args, **kwargs):
            worker = ThreadMethodThread(method, args, kwargs)
            if timeout is None:
                return worker
            worker.join(timeout)
            if worker.isAlive():
                raise ThreadMethodTimeoutError("A call to %s() has timed out" 
                                               % method_name)
            elif worker.exception is not None:
                raise worker.exception
            else:
                return worker.result

        threadmethod_invocation_proxy.__name__ = method_name

        return threadmethod_invocation_proxy

    return threadmethod_proxy

################################################################################

if __name__ == "__main__": # run self-tests

    print "self-testing module threadmethod.py:"

    from threading import currentThread

    mainthread = currentThread()
    @threadmethod(5)
    def tryme():
        assert currentThread() is not mainthread
    tryme()

    @threadmethod(5)
    def foo(a, b, c):
        return a + b + c
    assert foo(1, 2, 3) == 6

    @threadmethod(5)
    def foo(*args):
        assert args == ("foo", )
        return args[0]
    assert foo("foo") == "foo"

    @threadmethod(5)
    def foo(**kwargs):
        assert kwargs == { "foo" : "bar" }
        return kwargs["foo"]
    assert foo(foo = "bar") == "bar"

    @threadmethod(5)
    def foo(a, b, *args, **kwargs):
        assert a == 1 and b == "foo" and args == ("bar", ) and kwargs == { "biz" : "baz" }
    assert foo(1, "foo", "bar", biz = "baz") is None

    from time import sleep
    
    class bar(object):
        @threadmethod(3)
        def __init__(self, timeout):
            sleep(timeout)
        @threadmethod(1)
        def throw(self, e):
            raise e

    try:
        bar(5)
    except ThreadMethodTimeoutError:
        pass
    else:
        assert False, "Constructor should have timed out"

    try:
        bar(1).throw(IOError("fatal"))
    except IOError, e:
        assert str(e) == "fatal"
    else:
        assert False, "Expected IOError(\"fatal\")"

    x = 0

    @threadmethod()
    def async():
        global x
        sleep(0.25)
        x += 1

    async()

    while x == 0:
        pass

    @threadmethod()
    def foo():
        sleep(1.0)
        
    foo().join()

    print "ok"

################################################################################

History

  • revision 8 (18 years ago)
  • previous revisions are not available