Let several (unix) processes communicate via a shared sqlite database.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | #!/usr/bin/python
"""Simple message bus implemented on top of a shared sqlite3 database.
We use a single, shared table for messages. Each message has a unique,
incrementing id. When a program connects to the message bus, it reads the
highest id that's in the table - after that it'll "receive" messages that's been
sent after it started.
Each user has a name and messages can be addressed using standard glob notation
to "broadcast" messages. I.e there can be a client1, client2, client3 and then a
message can be addressed to "client*".
Note that each send/recv will hit the database, so it's prudent to only to so at
intervals, e.g if waiting for a response to a message there should be a
microsleep in between calls to recv().
"""
# standard
import cPickle
import fnmatch
import time
# mostly here for documentation purposes, not used in code.
CREATE_TABLE = ('create table if not exists mbus (id integer primary key '
'autoincrement, source, dest, data blob)')
# read all messages higher than the specified id
RECV_MESSAGES = 'select * from mbus where id > ? order by id asc'
SEND_MESSAGE = 'insert into mbus values (NULL, ?, ?, ?)'
# used at startup to find
FIND_LAST_ID = 'select max(id) from mbus'
class MBus:
def __init__ (self, db, name):
self.db = db
self.name = name
self.seen = self._find_last_id()
self.mbox = []
# PRIVATE
def _find_last_id (self):
return self.db.execute(FIND_LAST_ID, ()).fetchone()[0] or 1
def _poll (self):
"""Fetch new messages from database and append to mailbox.
"""
for row in list(self.db.execute(RECV_MESSAGES, (self.seen,))):
self.seen, source, dest, blob = row
if source != self.name and fnmatch.fnmatch(self.name, dest):
tag, data = cPickle.loads(str(blob))
self.mbox.append((self.seen, source, tag, data))
def _filter (self, tag, func):
"""Remove and return matching messages from mailbox and retain the rest.
"""
mbox = []
for t in self.mbox:
if fnmatch.fnmatch(t[2], tag) and func(t):
yield t
else:
mbox.append(t)
self.mbox = mbox
# PUBLIC
def recv (self, tag='*', func=lambda _: True, wait=5, sleep=0.5):
end = time.time() + wait
while True:
self._poll()
for t in self._filter(tag, func):
yield t
if time.time() > end:
break
time.sleep(sleep)
def send (self, dest, tag, **kwargs):
data = (tag, kwargs)
rowid = self.db.execute(SEND_MESSAGE, (self.name, dest,
cPickle.dumps(data))).lastrowid
return rowid
# PBULIC API
def connect (db, name):
"""Create a MBus and populate module environment with it's global methods.
The common case is that we connect to only one message bus. To avoid passing
around a message bus object we can instead simply do:
import mbus
mbus.connect(db, 'client')
mbus.send('*', 'ping')
for rowid, source, tag, data in mbus.recv(tag='pong'):
pass
"""
g = globals()
m = MBus(db, name)
g['send'] = m.send
g['recv'] = m.recv
# TESTING
if __name__ == '__main__':
import os
import sys
import sqlite3
p = 'test.db'
c = not os.path.exists(p)
db = sqlite3.connect(p, isolation_level=None)
if c:
db.execute(CREATE_TABLE)
if sys.argv[1] == 'server':
mb = MBus(db, 'server')
while True:
for _, source, _, data in mb.recv(tag='ping'):
mb.send(source, 'pong', pid=os.getpid())
sys.exit(0)
else:
mb = MBus(db, 'client')
mb.send('server', 'ping')
for _, source, _, data in mb.recv(tag='pong'):
print 'received pong from pid', data['pid']
|
Any db2 database could be used (at a guess only the SQL statements needs to change) but the code has only been tested with sqlite3.
Sqlite isn't really suited for the many-writers scenario but version 3 is _much_ better at avoiding writer starvation and as long as the number of messages is fairly low it works great.
It was developed as a way of synchronizing a number of independent Unix processes.