#!/usr/bin/env python
"""
A heapq-based scheduler for asyncore.
Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
"""
import sys
import time
import heapq
import asyncore
import traceback
import errno
_tasks = []
class CallLater(object):
"""Calls a function at a later time.
It can be used to asynchronously schedule a call within the polling
loop without blocking it. The instance returned is an object that
can be used to cancel or reschedule the call.
"""
def __init__(self, seconds, target, *args, **kwargs):
"""
- (int) seconds: the number of seconds to wait
- (obj) target: the callable object to call later
- args: the arguments to call it with
- kwargs: the keyword arguments to call it with; a special
'_errback' parameter can be passed: it is a callable
called in case target function raises an exception.
"""
assert callable(target), "%s is not callable" % target
assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
"to 0 seconds" % seconds
self._delay = seconds
self._target = target
self._args = args
self._kwargs = kwargs
self._errback = kwargs.pop('_errback', None)
self._repush = False
# seconds from the epoch at which to call the function
self.timeout = time.time() + self._delay
self.cancelled = False
heapq.heappush(_tasks, self)
def __le__(self, other):
return self.timeout <= other.timeout
def call(self):
"""Call this scheduled function."""
assert not self.cancelled, "Already cancelled"
try:
try:
self._target(*self._args, **self._kwargs)
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
raise
except:
if self._errback is not None:
self._errback()
else:
raise
finally:
if not self.cancelled:
self.cancel()
def reset(self):
"""Reschedule this call resetting the current countdown."""
assert not self.cancelled, "Already cancelled"
self.timeout = time.time() + self._delay
self._repush = True
def delay(self, seconds):
"""Reschedule this call for a later time."""
assert not self.cancelled, "Already cancelled."
assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
"to 0 seconds" % seconds
self._delay = seconds
newtime = time.time() + self._delay
if newtime > self.timeout:
self.timeout = newtime
self._repush = True
else:
# XXX - slow, can be improved
self.timeout = newtime
heapq.heapify(_tasks)
def cancel(self):
"""Unschedule this call."""
assert not self.cancelled, "Already cancelled"
self.cancelled = True
del self._target, self._args, self._kwargs, self._errback
if self in _tasks:
pos = _tasks.index(self)
if pos == 0:
heapq.heappop(_tasks)
elif pos == len(_tasks) - 1:
_tasks.pop(pos)
else:
_tasks[pos] = _tasks.pop()
heapq._siftup(_tasks, pos)
class CallEvery(CallLater):
"""Calls a function every x seconds.
It accepts the same arguments as CallLater and shares the same API.
"""
def call(self):
# call this scheduled function and reschedule it right after
assert not self.cancelled, "Already cancelled"
exc = False
try:
try:
self._target(*self._args, **self._kwargs)
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
exc = True
raise
except:
if self._errback is not None:
self._errback()
else:
exc = True
raise
finally:
if not self.cancelled:
if exc:
self.cancel()
else:
self.timeout = time.time() + self._delay
heapq.heappush(_tasks, self)
def _scheduler():
"""Run the scheduled functions due to expire soonest (if any)."""
now = time.time()
while _tasks and now >= _tasks[0].timeout:
call = heapq.heappop(_tasks)
if call._repush:
heapq.heappush(_tasks, call)
call._repush = False
continue
try:
call.call()
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
raise
except:
print traceback.format_exc()
def close_all(map=None, ignore_all=False):
"""Close all scheduled functions and opened sockets."""
if map is None:
map = asyncore.socket_map
for x in map.values():
try:
x.close()
except OSError, x:
if x[0] == errno.EBADF:
pass
elif not ignore_all:
raise
except (asyncore.ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
if not ignore_all:
asyncore.socket_map.clear()
del _tasks[:]
raise
map.clear()
for x in _tasks:
try:
if not x.cancelled:
x.cancel()
except (asyncore.ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
if not ignore_all:
del _tasks[:]
raise
del _tasks[:]
def loop(timeout=0.001, use_poll=False, map=None, count=None):
"""Start asyncore and scheduler loop.
Use this as replacement of the original asyncore.loop() function.
"""
if use_poll and hasattr(asyncore.select, 'poll'):
poll_fun = asyncore.poll2
else:
poll_fun = asyncore.poll
if map is None:
map = asyncore.socket_map
if count is None:
while (map or _tasks):
poll_fun(timeout, map)
_scheduler()
else:
while (map or _tasks) and count > 0:
poll_fun(timeout, map)
_scheduler()
count -= 1
if __name__ == '__main__':
# ==============================================================
# schedule a function
# ==============================================================
def foo():
print "I'm called after 2.5 seconds"
CallLater(2.5, foo)
#loop()
# ==============================================================
# call a function every second
# ==============================================================
def bar():
print "I'm called every second"
CallEvery(1, bar)
#loop()
# ==============================================================
# scheduled functions can be resetted, delayed or cancelled
# ==============================================================
fun = CallLater(1, foo)
fun.reset()
fun.delay(1.5)
fun.cancel()
# ==============================================================
# example of a basic asyncore client shutting down if
# server does not reply for more than 3 seconds
# ==============================================================
import socket
class UselessClient(asyncore.dispatcher):
timeout = 3
def __init__(self, address):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(address)
self.callback = CallLater(self.timeout, self.handle_timeout)
def handle_timeout(self):
print "no response from server; disconnecting"
self.close()
def handle_connect(self):
print "connected"
def writable(self):
return not self.connected
def handle_read(self):
# reset timeout on data received
self.callback.reset()
data = self.recv(8192)
self.in_buffer.append(data)
def handle_close(self):
if self.in_buffer:
print "".join(self.in_buffer)
self.close()
def handle_error(self):
raise
def close(self):
if not self.callback.cancelled:
self.callback.cancel()
asyncore.dispatcher.close(self)
UselessClient(('google.com', 80))
# ==============================================================
# close this demo after 5 seconds
# ==============================================================
CallLater(5, close_all)
#loop()
# ==============================================================
# finally, start the loop to take care of all the functions
# (and connections) scheduled so far
# ==============================================================
loop()
Diff to Previous Revision
--- revision 4 2011-07-25 23:20:29
+++ revision 5 2011-07-25 23:42:21
@@ -4,6 +4,7 @@
A heapq-based scheduler for asyncore.
Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
+License: MIT
"""
import sys