Welcome, guest | Sign In | My Account | Store | Cart

This is simple implementation of the observer design pattern. Acting as a registration hub, it fires events when requested. Also i have gevent.Timeout like interface in situations when you need to run event-method in the same greenlet. Example:

e = Observer()
ev = e.wait('kill')
try:
    gevent.sleep(3)
except FiredEvent:
    print 'Fired!'
else:
    print 'Not Fired!'
finally:
    ev.cancel()

But rememeber, if you are using subscribe method, event-method will be executed in another greenlet.

Python, 119 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
__author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"

import gevent
from gevent import core
from gevent.hub import getcurrent
from gevent.event import Event
from gevent.pool import Pool
import functools
 
def wrap(method, *args, **kargs):
    if method is None:
        return None
    if args or kargs:
        method = functools.partial(method, *args, **kargs)
    def wrapper(*args, **kargs):
        return method(*args, **kargs)
    return wrapper
 
class FiredEvent(Exception):
    pass
 
class Event(object):
 
    def __init__(self,events,name,callback):
        self.events = events
        self.name = name.lower()
        self.callback = callback
 
    def unsubscribe(self):
        if not self.events._events.has_key(self.name):
            return False
        try:
            del self.events._events[self.name][self.events._events[self.name].index(self)]
        except:
            pass
        return True
 
    def cancel(self):
        self.unsubscribe()
 
    def run(self):
        gevent.spawn(self.callback)
 
    def __del__(self):
        self.unsubscribe()
 
class Observer(object):
 
    def __new__(cls,*args):
        if not hasattr(cls,'_instance'):
            cls._instance = object.__new__(cls)
            cls._instance._events = {}
        return cls._instance
 
    def subscribe(self,name,callback):
        if not self._events.has_key(name.lower()):
            self._events[name] = []
        ev = Event(self,name,callback)
        self._events[name].append(ev)
        return ev
 
    def fire(self,name):
        try:
            ev = self._events[name.lower()].pop(0)
        except:
            return False
        while ev:
            gevent.spawn(ev.run)
            try:
                ev = self._events[name.lower()].pop(0)
            except:
                break
        return True
 
    def wait(self,name):
        if not self._events.has_key(name.lower()):
            self._events[name] = []
        ev = Event(self,name,wrap(getcurrent().throw,FiredEvent))
        self._events[name].append(ev)
        return ev
 
if __name__ == '__main__': 
    # Testing
    def in_another_greenlet():
        print '001',getcurrent()
     
    def test_subscribe():
        e = Observer()
        print '000',getcurrent()
        getcurrent().in_another_greenlet = in_another_greenlet
        b = e.subscribe('kill',getcurrent().in_another_greenlet)
        gevent.sleep(5)
        print 'END'
        b.unsubscribe()
     
    def test_wait():
        e = Observer()
        ev = e.wait('kill')
        try:
            gevent.sleep(3)
        except FiredEvent:
            print 'Fired!'
        else:
            print 'Not Fired!'
        finally:
            ev.cancel()
     
    def fire_event():
        e2 = Observer()
        gevent.sleep(2)
        e2.fire('kill')
     
    p = Pool()
    p.spawn(test_wait)
    p.spawn(test_subscribe)
    p.spawn(fire_event)
     
    p.join()