This recipe uses the multiprocessing
module to kill functions that have run longer than intended. Unlike other recipes that use threading, this code is designed to actually kill execution that has continued for too long. This implementation does not rely on signals or anything that is OS specific, so it should work on any system on which you might need generic timeouts.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | import multiprocessing as MP
from sys import exc_info
from time import clock
DEFAULT_TIMEOUT = 60
################################################################################
def timeout(limit=None):
if limit is None:
limit = DEFAULT_TIMEOUT
if limit <= 0:
raise ValueError()
def wrapper(function):
return _Timeout(function, limit)
return wrapper
class TimeoutError(Exception): pass
################################################################################
def _target(queue, function, *args, **kwargs):
try:
queue.put((True, function(*args, **kwargs)))
except:
queue.put((False, exc_info()[1]))
class _Timeout:
def __init__(self, function, limit):
self.__limit = limit
self.__function = function
self.__timeout = clock()
self.__process = MP.Process()
self.__queue = MP.Queue()
def __call__(self, *args, **kwargs):
self.cancel()
self.__queue = MP.Queue(1)
args = (self.__queue, self.__function) + args
self.__process = MP.Process(target=_target, args=args, kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
self.__timeout = self.__limit + clock()
def cancel(self):
if self.__process.is_alive():
self.__process.terminate()
@property
def ready(self):
if self.__queue.full():
return True
elif not self.__queue.empty():
return True
elif self.__timeout < clock():
self.cancel()
else:
return False
@property
def value(self):
if self.ready is True:
flag, load = self.__queue.get()
if flag:
return load
raise load
raise TimeoutError()
def __get_limit(self):
return self.__limit
def __set_limit(self, value):
if value <= 0:
raise ValueError()
self.__limit = value
limit = property(__get_limit, __set_limit)
|
Could you provide an example? If the intended way is to use the @timeout decorator, I could not make it work.
This is your test.py program to see how the library works. Support libraries for this test follow.
More test.py here:
Part 1 of compare.py
Part 2 of compare.py
This is the diff.py module used in the code:
Do you want a practical application example? I can upload the program this code was developed for on YouSentIt.
Steve -- this is an excellent recipe for teaching a number of useful programming techniques and CS concepts. Thanks for posting it!
It is great to hear that someone found it useful! May I suggest looking at a better example of practical application? If you go to Google Code, you can find the finished version of the code above. These three modules are part of a much larger verse-quizzing program:
Could you provide a simple example using map_async() function in multiprocessing?
I am a new to python. From your code, your function timeout(limit=None) is a timeout wrapper?