Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/python

"""Simple message bus implemented on top of a shared sqlite3 database.

We use a single, shared table for messages. Each message has a unique,
incrementing id. When a program connects to the message bus, it reads the
highest id that's in the table - after that it'll "receive" messages that's been
sent after it started.

Each user has a name and messages can be addressed using standard glob notation
to "broadcast" messages. I.e there can be a client1, client2, client3 and then a
message can be addressed to "client*".

Note that each send/recv will hit the database, so it's prudent to only to so at
intervals, e.g if waiting for a response to a message there should be a
microsleep in between calls to recv().
"""

# standard
import cPickle
import fnmatch
import time



# mostly here for documentation purposes, not used in code.
CREATE_TABLE  = ('create table if not exists mbus (id integer primary key '
                 'autoincrement, source, dest, data blob)')

# read all messages higher than the specified id
RECV_MESSAGES = 'select * from mbus where id > ? order by id asc'

SEND_MESSAGE  = 'insert into mbus values (NULL, ?, ?, ?)'

# used at startup to find 
FIND_LAST_ID  = 'select max(id) from mbus'



    
class MBus:

    def __init__ (self, db, name):
        self.db    = db
        self.name  = name
        self.seen  = self._find_last_id()
        self.mbox  = []

    # PRIVATE
    def _find_last_id (self):
        return self.db.execute(FIND_LAST_ID, ()).fetchone()[0] or 1

    def _poll (self):
        """Fetch new messages from database and append to mailbox.
        """
        for row in list(self.db.execute(RECV_MESSAGES, (self.seen,))):
            self.seen, source, dest, blob = row
            if source != self.name and fnmatch.fnmatch(self.name, dest):
                tag, data = cPickle.loads(str(blob))
                self.mbox.append((self.seen, source, tag, data))

    def _filter (self, tag, func):
        """Remove and return matching messages from mailbox and retain the rest.
        """
        mbox = []
        for t in self.mbox:
            if fnmatch.fnmatch(t[2], tag) and func(t):
                yield t
            else:
                mbox.append(t)
        self.mbox = mbox

    # PUBLIC
    def recv (self, tag='*', func=lambda _: True, wait=5, sleep=0.5):
        end = time.time() + wait
        while True:
            self._poll()
            for t in self._filter(tag, func):
                yield t
            if time.time() > end:
                break
            time.sleep(sleep)

    def send (self, dest, tag, **kwargs):
        data  = (tag, kwargs)
        rowid = self.db.execute(SEND_MESSAGE, (self.name, dest,
                                               cPickle.dumps(data))).lastrowid
        return rowid
    


# PBULIC API    

def connect (db, name):
    """Create a MBus and populate module environment with it's global methods.

    The common case is that we connect to only one message bus. To avoid passing
    around a message bus object we can instead simply do:

    import mbus

    mbus.connect(db, 'client')
    mbus.send('*', 'ping')
    for rowid, source, tag, data in mbus.recv(tag='pong'):
        pass
    """
    g = globals()
    m = MBus(db, name)
    g['send'] = m.send
    g['recv'] = m.recv



# TESTING
    
if __name__ == '__main__':
    import os
    import sys
    import sqlite3

    p  = 'test.db'
    c  = not os.path.exists(p)
    db = sqlite3.connect(p, isolation_level=None)
    if c:
        db.execute(CREATE_TABLE)

    if sys.argv[1] == 'server':
        mb = MBus(db, 'server')
        while True:
            for _, source, _, data in mb.recv(tag='ping'):
                mb.send(source, 'pong', pid=os.getpid())
                sys.exit(0)
    else:
        mb = MBus(db, 'client')
        mb.send('server', 'ping')
        for _, source, _, data in mb.recv(tag='pong'):
            print 'received pong from pid', data['pid']

History