This is simple implementation of the observer design pattern. Acting as a registration hub, it fires events when requested. Also i have gevent.Timeout like interface in situations when you need to run event-method in the same greenlet. Example:
e = Observer()
ev = e.wait('kill')
try:
gevent.sleep(3)
except FiredEvent:
print 'Fired!'
else:
print 'Not Fired!'
finally:
ev.cancel()
But rememeber, if you are using subscribe method, event-method will be executed in another greenlet.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | __author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"
import gevent
from gevent import core
from gevent.hub import getcurrent
from gevent.event import Event
from gevent.pool import Pool
import functools
def wrap(method, *args, **kargs):
if method is None:
return None
if args or kargs:
method = functools.partial(method, *args, **kargs)
def wrapper(*args, **kargs):
return method(*args, **kargs)
return wrapper
class FiredEvent(Exception):
pass
class Event(object):
def __init__(self,events,name,callback):
self.events = events
self.name = name.lower()
self.callback = callback
def unsubscribe(self):
if not self.events._events.has_key(self.name):
return False
try:
del self.events._events[self.name][self.events._events[self.name].index(self)]
except:
pass
return True
def cancel(self):
self.unsubscribe()
def run(self):
gevent.spawn(self.callback)
def __del__(self):
self.unsubscribe()
class Observer(object):
def __new__(cls,*args):
if not hasattr(cls,'_instance'):
cls._instance = object.__new__(cls)
cls._instance._events = {}
return cls._instance
def subscribe(self,name,callback):
if not self._events.has_key(name.lower()):
self._events[name] = []
ev = Event(self,name,callback)
self._events[name].append(ev)
return ev
def fire(self,name):
try:
ev = self._events[name.lower()].pop(0)
except:
return False
while ev:
gevent.spawn(ev.run)
try:
ev = self._events[name.lower()].pop(0)
except:
break
return True
def wait(self,name):
if not self._events.has_key(name.lower()):
self._events[name] = []
ev = Event(self,name,wrap(getcurrent().throw,FiredEvent))
self._events[name].append(ev)
return ev
if __name__ == '__main__':
# Testing
def in_another_greenlet():
print '001',getcurrent()
def test_subscribe():
e = Observer()
print '000',getcurrent()
getcurrent().in_another_greenlet = in_another_greenlet
b = e.subscribe('kill',getcurrent().in_another_greenlet)
gevent.sleep(5)
print 'END'
b.unsubscribe()
def test_wait():
e = Observer()
ev = e.wait('kill')
try:
gevent.sleep(3)
except FiredEvent:
print 'Fired!'
else:
print 'Not Fired!'
finally:
ev.cancel()
def fire_event():
e2 = Observer()
gevent.sleep(2)
e2.fire('kill')
p = Pool()
p.spawn(test_wait)
p.spawn(test_subscribe)
p.spawn(fire_event)
p.join()
|