Welcome, guest | Sign In | My Account | Store | Cart

Given that threads shouldn't be killed, multiprocessing is one mechanism for limiting time on a potentially long running computation.

Python, 83 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
Code to timeout with processes.

>>> @timeout(.5)
... def sleep(x):
...     print "ABOUT TO SLEEP {0} SECONDS".format(x)
...     time.sleep(x)
...     return x

>>> sleep(1)
Traceback (most recent call last):
   ...
TimeoutException: timed out after 0 seconds

>>> sleep(.2)
0.2

>>> @timeout(.5)
... def exc():
...     raise Exception('Houston we have problems!')

>>> exc()
Traceback (most recent call last):
   ...
Exception: Houston we have problems!

"""
import multiprocessing
import time
import logging
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)


class TimeoutException(Exception):
    pass


class RunableProcessing(multiprocessing.Process):
    def __init__(self, func, *args, **kwargs):
        self.queue = multiprocessing.Queue(maxsize=1)
        args = (func,) + args
        multiprocessing.Process.__init__(self, target=self.run_func, args=args, kwargs=kwargs)

    def run_func(self, func, *args, **kwargs):
        try:
            result = func(*args, **kwargs)
            self.queue.put((True, result))
        except Exception as e:
            self.queue.put((False, e))

    def done(self):
        return self.queue.full()

    def result(self):
        return self.queue.get()


def timeout(seconds, force_kill=True):
    def wrapper(function):
        def inner(*args, **kwargs):
            now = time.time()
            proc = RunableProcessing(function, *args, **kwargs)
            proc.start()
            proc.join(seconds)
            if proc.is_alive():
                if force_kill:
                    proc.terminate()
                runtime = int(time.time() - now)
                raise TimeoutException('timed out after {0} seconds'.format(runtime))
            assert proc.done()
            success, result = proc.result()
            if success:
                return result
            else:
                raise result
        return inner
    return wrapper


if __name__ == '__main__':
    import doctest
    doctest.testmod()

2 comments

Stephen Chappell 11 years, 10 months ago  # | flag

This recipe appears to be synchronous in nature. For an asynchronous solution, recipe 577028 is available.

Jason 8 years, 2 months ago  # | flag

I tried to use the recipe in windows environment and it raises exception

INFO:root:=================Start test================= ETraceback (most recent call last): File "<string>", line 1, in <module> File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main self = load(from_parent) File "C:\Python27\lib\pickle.py", line 1378, in load return Unpickler(file).load() File "C:\Python27\lib\pickle.py", line 858, in load dispatchkey File "C:\Python27\lib\pickle.py", line 880, in load_eof raise EOFError EOFError [INFO/RunableProcessing-1] process shutting down [INFO/RunableProcessing-1] process shutting down ETraceback (most recent call last): File "<string>", line 1, in <module> File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main self = load(from_parent) File "C:\Python27\lib\pickle.py", line 1378, in load return Unpickler(file).load() File "C:\Python27\lib\pickle.py", line 858, in load dispatchkey File "C:\Python27\lib\pickle.py", line 880, in load_eof raise EOFError EOFError

I am wondering if this recipe is not portable?