""" Code to timeout with processes. >>> @timeout(.5) ... def sleep(x): ... print "ABOUT TO SLEEP {0} SECONDS".format(x) ... time.sleep(x) ... return x >>> sleep(1) Traceback (most recent call last): ... TimeoutException: timed out after 0 seconds >>> sleep(.2) 0.2 >>> @timeout(.5) ... def exc(): ... raise Exception('Houston we have problems!') >>> exc() Traceback (most recent call last): ... Exception: Houston we have problems! """ import multiprocessing import time import logging logger = multiprocessing.log_to_stderr() logger.setLevel(logging.INFO) class TimeoutException(Exception): pass class RunableProcessing(multiprocessing.Process): def __init__(self, func, *args, **kwargs): self.queue = multiprocessing.Queue(maxsize=1) args = (func,) + args multiprocessing.Process.__init__(self, target=self.run_func, args=args, kwargs=kwargs) def run_func(self, func, *args, **kwargs): try: result = func(*args, **kwargs) self.queue.put((True, result)) except Exception as e: self.queue.put((False, e)) def done(self): return self.queue.full() def result(self): return self.queue.get() def timeout(seconds, force_kill=True): def wrapper(function): def inner(*args, **kwargs): now = time.time() proc = RunableProcessing(function, *args, **kwargs) proc.start() proc.join(seconds) if proc.is_alive(): if force_kill: proc.terminate() runtime = int(time.time() - now) raise TimeoutException('timed out after {0} seconds'.format(runtime)) assert proc.done() success, result = proc.result() if success: return result else: raise result return inner return wrapper if __name__ == '__main__': import doctest doctest.testmod()