Welcome, guest | Sign In | My Account | Store | Cart

Let several (unix) processes communicate via a shared sqlite database.

Python, 137 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/python

"""Simple message bus implemented on top of a shared sqlite3 database.

We use a single, shared table for messages. Each message has a unique,
incrementing id. When a program connects to the message bus, it reads the
highest id that's in the table - after that it'll "receive" messages that's been
sent after it started.

Each user has a name and messages can be addressed using standard glob notation
to "broadcast" messages. I.e there can be a client1, client2, client3 and then a
message can be addressed to "client*".

Note that each send/recv will hit the database, so it's prudent to only to so at
intervals, e.g if waiting for a response to a message there should be a
microsleep in between calls to recv().
"""

# standard
import cPickle
import fnmatch
import time



# mostly here for documentation purposes, not used in code.
CREATE_TABLE  = ('create table if not exists mbus (id integer primary key '
                 'autoincrement, source, dest, data blob)')

# read all messages higher than the specified id
RECV_MESSAGES = 'select * from mbus where id > ? order by id asc'

SEND_MESSAGE  = 'insert into mbus values (NULL, ?, ?, ?)'

# used at startup to find 
FIND_LAST_ID  = 'select max(id) from mbus'



    
class MBus:

    def __init__ (self, db, name):
        self.db    = db
        self.name  = name
        self.seen  = self._find_last_id()
        self.mbox  = []

    # PRIVATE
    def _find_last_id (self):
        return self.db.execute(FIND_LAST_ID, ()).fetchone()[0] or 1

    def _poll (self):
        """Fetch new messages from database and append to mailbox.
        """
        for row in list(self.db.execute(RECV_MESSAGES, (self.seen,))):
            self.seen, source, dest, blob = row
            if source != self.name and fnmatch.fnmatch(self.name, dest):
                tag, data = cPickle.loads(str(blob))
                self.mbox.append((self.seen, source, tag, data))

    def _filter (self, tag, func):
        """Remove and return matching messages from mailbox and retain the rest.
        """
        mbox = []
        for t in self.mbox:
            if fnmatch.fnmatch(t[2], tag) and func(t):
                yield t
            else:
                mbox.append(t)
        self.mbox = mbox

    # PUBLIC
    def recv (self, tag='*', func=lambda _: True, wait=5, sleep=0.5):
        end = time.time() + wait
        while True:
            self._poll()
            for t in self._filter(tag, func):
                yield t
            if time.time() > end:
                break
            time.sleep(sleep)

    def send (self, dest, tag, **kwargs):
        data  = (tag, kwargs)
        rowid = self.db.execute(SEND_MESSAGE, (self.name, dest,
                                               cPickle.dumps(data))).lastrowid
        return rowid
    


# PBULIC API    

def connect (db, name):
    """Create a MBus and populate module environment with it's global methods.

    The common case is that we connect to only one message bus. To avoid passing
    around a message bus object we can instead simply do:

    import mbus

    mbus.connect(db, 'client')
    mbus.send('*', 'ping')
    for rowid, source, tag, data in mbus.recv(tag='pong'):
        pass
    """
    g = globals()
    m = MBus(db, name)
    g['send'] = m.send
    g['recv'] = m.recv



# TESTING
    
if __name__ == '__main__':
    import os
    import sys
    import sqlite3

    p  = 'test.db'
    c  = not os.path.exists(p)
    db = sqlite3.connect(p, isolation_level=None)
    if c:
        db.execute(CREATE_TABLE)

    if sys.argv[1] == 'server':
        mb = MBus(db, 'server')
        while True:
            for _, source, _, data in mb.recv(tag='ping'):
                mb.send(source, 'pong', pid=os.getpid())
                sys.exit(0)
    else:
        mb = MBus(db, 'client')
        mb.send('server', 'ping')
        for _, source, _, data in mb.recv(tag='pong'):
            print 'received pong from pid', data['pid']

Any db2 database could be used (at a guess only the SQL statements needs to change) but the code has only been tested with sqlite3.

Sqlite isn't really suited for the many-writers scenario but version 3 is _much_ better at avoiding writer starvation and as long as the number of messages is fairly low it works great.

It was developed as a way of synchronizing a number of independent Unix processes.