#!/usr/bin/python """Simple message bus implemented on top of a shared sqlite3 database. We use a single, shared table for messages. Each message has a unique, incrementing id. When a program connects to the message bus, it reads the highest id that's in the table - after that it'll "receive" messages that's been sent after it started. Each user has a name and messages can be addressed using standard glob notation to "broadcast" messages. I.e there can be a client1, client2, client3 and then a message can be addressed to "client*". Note that each send/recv will hit the database, so it's prudent to only to so at intervals, e.g if waiting for a response to a message there should be a microsleep in between calls to recv(). """ # standard import cPickle import fnmatch import time # mostly here for documentation purposes, not used in code. CREATE_TABLE = ('create table if not exists mbus (id integer primary key ' 'autoincrement, source, dest, data blob)') # read all messages higher than the specified id RECV_MESSAGES = 'select * from mbus where id > ? order by id asc' SEND_MESSAGE = 'insert into mbus values (NULL, ?, ?, ?)' # used at startup to find FIND_LAST_ID = 'select max(id) from mbus' class MBus: def __init__ (self, db, name): self.db = db self.name = name self.seen = self._find_last_id() self.mbox = [] # PRIVATE def _find_last_id (self): return self.db.execute(FIND_LAST_ID, ()).fetchone()[0] or 1 def _poll (self): """Fetch new messages from database and append to mailbox. """ for row in list(self.db.execute(RECV_MESSAGES, (self.seen,))): self.seen, source, dest, blob = row if source != self.name and fnmatch.fnmatch(self.name, dest): tag, data = cPickle.loads(str(blob)) self.mbox.append((self.seen, source, tag, data)) def _filter (self, tag, func): """Remove and return matching messages from mailbox and retain the rest. """ mbox = [] for t in self.mbox: if fnmatch.fnmatch(t[2], tag) and func(t): yield t else: mbox.append(t) self.mbox = mbox # PUBLIC def recv (self, tag='*', func=lambda _: True, wait=5, sleep=0.5): end = time.time() + wait while True: self._poll() for t in self._filter(tag, func): yield t if time.time() > end: break time.sleep(sleep) def send (self, dest, tag, **kwargs): data = (tag, kwargs) rowid = self.db.execute(SEND_MESSAGE, (self.name, dest, cPickle.dumps(data))).lastrowid return rowid # PBULIC API def connect (db, name): """Create a MBus and populate module environment with it's global methods. The common case is that we connect to only one message bus. To avoid passing around a message bus object we can instead simply do: import mbus mbus.connect(db, 'client') mbus.send('*', 'ping') for rowid, source, tag, data in mbus.recv(tag='pong'): pass """ g = globals() m = MBus(db, name) g['send'] = m.send g['recv'] = m.recv # TESTING if __name__ == '__main__': import os import sys import sqlite3 p = 'test.db' c = not os.path.exists(p) db = sqlite3.connect(p, isolation_level=None) if c: db.execute(CREATE_TABLE) if sys.argv[1] == 'server': mb = MBus(db, 'server') while True: for _, source, _, data in mb.recv(tag='ping'): mb.send(source, 'pong', pid=os.getpid()) sys.exit(0) else: mb = MBus(db, 'client') mb.send('server', 'ping') for _, source, _, data in mb.recv(tag='pong'): print 'received pong from pid', data['pid']