The thing I miss mostly in asyncore is a system for calling a function after a certain amount of time without blocking. This is crucial for simple tasks such as disconnecting a peer after a certain time of inactivity or more advanced use cases such as bandwidth throttling.
This recipe was initially inspired by Twisted's internet.base.DelayedCall class:
http://twistedmatrix.com/trac/browser/tags/last_vfs_and_web2/twisted/internet/base.py#L34
...then included into pyftpdlib:
http://code.google.com/p/pyftpdlib/issues/detail?id=72
...and finally proposed for inclusion into asyncore:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | #!/usr/bin/env python
"""
A heapq-based scheduler for asyncore.
Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
"""
import sys
import time
import heapq
import asyncore
import traceback
import errno
_tasks = []
class CallLater(object):
"""Calls a function at a later time.
It can be used to asynchronously schedule a call within the polling
loop without blocking it. The instance returned is an object that
can be used to cancel or reschedule the call.
"""
def __init__(self, seconds, target, *args, **kwargs):
"""
- (int) seconds: the number of seconds to wait
- (obj) target: the callable object to call later
- args: the arguments to call it with
- kwargs: the keyword arguments to call it with; a special
'_errback' parameter can be passed: it is a callable
called in case target function raises an exception.
"""
assert callable(target), "%s is not callable" % target
assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
"to 0 seconds" % seconds
self._delay = seconds
self._target = target
self._args = args
self._kwargs = kwargs
self._errback = kwargs.pop('_errback', None)
self._repush = False
# seconds from the epoch at which to call the function
self.timeout = time.time() + self._delay
self.cancelled = False
heapq.heappush(_tasks, self)
def __le__(self, other):
return self.timeout <= other.timeout
def call(self):
"""Call this scheduled function."""
assert not self.cancelled, "Already cancelled"
try:
try:
self._target(*self._args, **self._kwargs)
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
raise
except:
if self._errback is not None:
self._errback()
else:
raise
finally:
if not self.cancelled:
self.cancel()
def reset(self):
"""Reschedule this call resetting the current countdown."""
assert not self.cancelled, "Already cancelled"
self.timeout = time.time() + self._delay
self._repush = True
def delay(self, seconds):
"""Reschedule this call for a later time."""
assert not self.cancelled, "Already cancelled."
assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
"to 0 seconds" % seconds
self._delay = seconds
newtime = time.time() + self._delay
if newtime > self.timeout:
self.timeout = newtime
self._repush = True
else:
# XXX - slow, can be improved
self.timeout = newtime
heapq.heapify(_tasks)
def cancel(self):
"""Unschedule this call."""
assert not self.cancelled, "Already cancelled"
self.cancelled = True
del self._target, self._args, self._kwargs, self._errback
if self in _tasks:
pos = _tasks.index(self)
if pos == 0:
heapq.heappop(_tasks)
elif pos == len(_tasks) - 1:
_tasks.pop(pos)
else:
_tasks[pos] = _tasks.pop()
heapq._siftup(_tasks, pos)
class CallEvery(CallLater):
"""Calls a function every x seconds.
It accepts the same arguments as CallLater and shares the same API.
"""
def call(self):
# call this scheduled function and reschedule it right after
assert not self.cancelled, "Already cancelled"
exc = False
try:
try:
self._target(*self._args, **self._kwargs)
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
exc = True
raise
except:
if self._errback is not None:
self._errback()
else:
exc = True
raise
finally:
if not self.cancelled:
if exc:
self.cancel()
else:
self.timeout = time.time() + self._delay
heapq.heappush(_tasks, self)
def _scheduler():
"""Run the scheduled functions due to expire soonest (if any)."""
now = time.time()
while _tasks and now >= _tasks[0].timeout:
call = heapq.heappop(_tasks)
if call._repush:
heapq.heappush(_tasks, call)
call._repush = False
continue
try:
call.call()
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
raise
except:
print traceback.format_exc()
def close_all(map=None, ignore_all=False):
"""Close all scheduled functions and opened sockets."""
if map is None:
map = asyncore.socket_map
for x in map.values():
try:
x.close()
except OSError, x:
if x[0] == errno.EBADF:
pass
elif not ignore_all:
raise
except (asyncore.ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
if not ignore_all:
asyncore.socket_map.clear()
del _tasks[:]
raise
map.clear()
for x in _tasks:
try:
if not x.cancelled:
x.cancel()
except (asyncore.ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
if not ignore_all:
del _tasks[:]
raise
del _tasks[:]
def loop(timeout=0.001, use_poll=False, map=None, count=None):
"""Start asyncore and scheduler loop.
Use this as replacement of the original asyncore.loop() function.
"""
if use_poll and hasattr(asyncore.select, 'poll'):
poll_fun = asyncore.poll2
else:
poll_fun = asyncore.poll
if map is None:
map = asyncore.socket_map
if count is None:
while (map or _tasks):
poll_fun(timeout, map)
_scheduler()
else:
while (map or _tasks) and count > 0:
poll_fun(timeout, map)
_scheduler()
count -= 1
if __name__ == '__main__':
# ==============================================================
# schedule a function
# ==============================================================
def foo():
print "I'm called after 2.5 seconds"
CallLater(2.5, foo)
#loop()
# ==============================================================
# call a function every second
# ==============================================================
def bar():
print "I'm called every second"
CallEvery(1, bar)
#loop()
# ==============================================================
# scheduled functions can be resetted, delayed or cancelled
# ==============================================================
fun = CallLater(1, foo)
fun.reset()
fun.delay(1.5)
fun.cancel()
# ==============================================================
# example of a basic asyncore client shutting down if
# server does not reply for more than 3 seconds
# ==============================================================
import socket
class UselessClient(asyncore.dispatcher):
timeout = 3
def __init__(self, address):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(address)
self.callback = CallLater(self.timeout, self.handle_timeout)
def handle_timeout(self):
print "no response from server; disconnecting"
self.close()
def handle_connect(self):
print "connected"
def writable(self):
return not self.connected
def handle_read(self):
# reset timeout on data received
self.callback.reset()
data = self.recv(8192)
self.in_buffer.append(data)
def handle_close(self):
if self.in_buffer:
print "".join(self.in_buffer)
self.close()
def handle_error(self):
raise
def close(self):
if not self.callback.cancelled:
self.callback.cancel()
asyncore.dispatcher.close(self)
UselessClient(('google.com', 80))
# ==============================================================
# close this demo after 5 seconds
# ==============================================================
CallLater(5, close_all)
#loop()
# ==============================================================
# finally, start the loop to take care of all the functions
# (and connections) scheduled so far
# ==============================================================
loop()
|