Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""
A heapq-based scheduler for asyncore.

Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
"""


import sys
import time
import heapq
import asyncore
import traceback
import errno

_tasks
= []

class CallLater(object):
   
"""Calls a function at a later time.

    It can be used to asynchronously schedule a call within the polling
    loop without blocking it. The instance returned is an object that
    can be used to cancel or reschedule the call.
    """


   
def __init__(self, seconds, target, *args, **kwargs):
       
"""
         - (int) seconds: the number of seconds to wait
         - (obj) target: the callable object to call later
         - args: the arguments to call it with
         - kwargs: the keyword arguments to call it with; a special
           '_errback' parameter can be passed: it is a callable
           called in case target function raises an exception.
        """

       
assert callable(target), "%s is not callable" % target
       
assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
                                           
"to 0 seconds" % seconds
       
self._delay = seconds
       
self._target = target
       
self._args = args
       
self._kwargs = kwargs
       
self._errback = kwargs.pop('_errback', None)
       
self._repush = False
       
# seconds from the epoch at which to call the function
       
self.timeout = time.time() + self._delay
       
self.cancelled = False
        heapq
.heappush(_tasks, self)

   
def __le__(self, other):
       
return self.timeout <= other.timeout

   
def call(self):
       
"""Call this scheduled function."""
       
assert not self.cancelled, "Already cancelled"
       
try:
           
try:
               
self._target(*self._args, **self._kwargs)
           
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
               
raise
           
except:
               
if self._errback is not None:
                   
self._errback()
               
else:
                   
raise
       
finally:
           
if not self.cancelled:
               
self.cancel()

   
def reset(self):
       
"""Reschedule this call resetting the current countdown."""
       
assert not self.cancelled, "Already cancelled"
       
self.timeout = time.time() + self._delay
       
self._repush = True

   
def delay(self, seconds):
       
"""Reschedule this call for a later time."""
       
assert not self.cancelled, "Already cancelled."
       
assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
                                           
"to 0 seconds" % seconds
       
self._delay = seconds
        newtime
= time.time() + self._delay
       
if newtime > self.timeout:
           
self.timeout = newtime
           
self._repush = True
       
else:
           
# XXX - slow, can be improved
           
self.timeout = newtime
            heapq
.heapify(_tasks)

   
def cancel(self):
       
"""Unschedule this call."""
       
assert not self.cancelled, "Already cancelled"
       
self.cancelled = True
       
del self._target, self._args, self._kwargs, self._errback
       
if self in _tasks:
            pos
= _tasks.index(self)
           
if pos == 0:
                heapq
.heappop(_tasks)
           
elif pos == len(_tasks) - 1:
                _tasks
.pop(pos)
           
else:
                _tasks
[pos] = _tasks.pop()
                heapq
._siftup(_tasks, pos)


class CallEvery(CallLater):
   
"""Calls a function every x seconds.
    It accepts the same arguments as CallLater and shares the same API.
    """


   
def call(self):
       
# call this scheduled function and reschedule it right after
       
assert not self.cancelled, "Already cancelled"
        exc
= False
       
try:
           
try:
               
self._target(*self._args, **self._kwargs)
           
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
                exc
= True
               
raise
           
except:
               
if self._errback is not None:
                   
self._errback()
               
else:
                    exc
= True
                   
raise
       
finally:
           
if not self.cancelled:
               
if exc:
                   
self.cancel()
               
else:
                   
self.timeout = time.time() + self._delay
                    heapq
.heappush(_tasks, self)


def _scheduler():
   
"""Run the scheduled functions due to expire soonest (if any)."""
    now
= time.time()
   
while _tasks and now >= _tasks[0].timeout:
        call
= heapq.heappop(_tasks)
       
if call._repush:
            heapq
.heappush(_tasks, call)
            call
._repush = False
           
continue
       
try:
            call
.call()
       
except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
           
raise
       
except:
           
print traceback.format_exc()

def close_all(map=None, ignore_all=False):
   
"""Close all scheduled functions and opened sockets."""
   
if map is None:
        map
= asyncore.socket_map
   
for x in map.values():
       
try:
            x
.close()
       
except OSError, x:
           
if x[0] == errno.EBADF:
               
pass
           
elif not ignore_all:
               
raise
       
except (asyncore.ExitNow, KeyboardInterrupt, SystemExit):
           
raise
       
except:
           
if not ignore_all:
                asyncore
.socket_map.clear()
               
del _tasks[:]
               
raise
    map
.clear()

   
for x in _tasks:
       
try:
           
if not x.cancelled:
                x
.cancel()
       
except (asyncore.ExitNow, KeyboardInterrupt, SystemExit):
           
raise
       
except:
           
if not ignore_all:
               
del _tasks[:]
               
raise
   
del _tasks[:]


def loop(timeout=0.001, use_poll=False, map=None, count=None):
   
"""Start asyncore and scheduler loop.
    Use this as replacement of the original asyncore.loop() function.
    """

   
if use_poll and hasattr(asyncore.select, 'poll'):
        poll_fun
= asyncore.poll2
   
else:
        poll_fun
= asyncore.poll
   
if map is None:
        map
= asyncore.socket_map
   
if count is None:
       
while (map or _tasks):
            poll_fun
(timeout, map)
            _scheduler
()
   
else:
       
while (map or _tasks) and count > 0:
            poll_fun
(timeout, map)
            _scheduler
()
            count
-= 1


if __name__ == '__main__':

   
# ==============================================================
   
# schedule a function
   
# ==============================================================

   
def foo():
       
print "I'm called after 2.5 seconds"

   
CallLater(2.5, foo)
   
#loop()


   
# ==============================================================
   
# call a function every second
   
# ==============================================================

   
def bar():
       
print "I'm called every second"

   
CallEvery(1, bar)
   
#loop()

   
# ==============================================================
   
# scheduled functions can be resetted, delayed or cancelled
   
# ==============================================================

    fun
= CallLater(1, foo)
    fun
.reset()
    fun
.delay(1.5)
    fun
.cancel()

   
# ==============================================================
   
# example of a basic asyncore client shutting down if
   
# server does not reply for more than 3 seconds
   
# ==============================================================

   
import socket

   
class UselessClient(asyncore.dispatcher):

        timeout
= 3

       
def __init__(self, address):
            asyncore
.dispatcher.__init__(self)
           
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
           
self.connect(address)
           
self.callback = CallLater(self.timeout, self.handle_timeout)

       
def handle_timeout(self):
           
print "no response from server; disconnecting"
           
self.close()

       
def handle_connect(self):
           
print "connected"

       
def writable(self):
           
return not self.connected

       
def handle_read(self):
           
# reset timeout on data received
           
self.callback.reset()
            data
= self.recv(8192)
           
self.in_buffer.append(data)

       
def handle_close(self):
           
if self.in_buffer:
               
print "".join(self.in_buffer)
           
self.close()

       
def handle_error(self):
           
raise

       
def close(self):
           
if not self.callback.cancelled:
               
self.callback.cancel()
            asyncore
.dispatcher.close(self)

   
UselessClient(('google.com', 80))

   
# ==============================================================
   
# close this demo after 5 seconds
   
# ==============================================================

   
CallLater(5, close_all)
   
#loop()


   
# ==============================================================
   
# finally, start the loop to take care of all the functions
   
# (and connections) scheduled so far
   
# ==============================================================
    loop
()

Diff to Previous Revision

--- revision 4 2011-07-25 23:20:29
+++ revision 5 2011-07-25 23:42:21
@@ -4,6 +4,7 @@
 A heapq
-based scheduler for asyncore.
 
 
Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
+License: MIT
 """
 
 import sys

History