Welcome, guest | Sign In | My Account | Store | Cart

Simple example of setting up a distributed publish/subscribe system. Shows creation of a central exchange service which all participating processes connect to. Services can then set themselves up as publishers, with other services able to subscribe to what is being published.

Python, 70 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# The exchange process which everything connects to.

import netsvc
import signal
 
dispatcher = netsvc.Dispatcher()
dispatcher.monitor(signal.SIGINT)
 
exchange = netsvc.Exchange(netsvc.EXCHANGE_SERVER)
exchange.listen(11111)
 
dispatcher.run()

# Service which periodically publishes information.

import netsvc
import signal
import random
 
class Publisher(netsvc.Service):
 
  def __init__(self):
    netsvc.Service.__init__(self,"SEED")
    self._count = 0
    time = netsvc.DateTime()
    data = { "time": time }
    self.publishReport("init",data,-1)
    self.startTimer(self.publish,1,"1")
 
  def publish(self,name):
    self._count = self._count + 1
    time = netsvc.DateTime()
    value = int(0xFFFF*random.random())
    data = { "time": time, "count": self._count, "value": value }
    self.publishReport("next",data)
    self.startTimer(self.publish,1,"1")
 
dispatcher = netsvc.Dispatcher()
dispatcher.monitor(signal.SIGINT)
 
exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
exchange.connect("localhost",11111,5)
 
publisher = Publisher()
 
dispatcher.run()

# Service which subscribes to published information.

import netsvc
import signal
 
class Subscriber(netsvc.Service):
 
  def __init__(self):
    netsvc.Service.__init__(self)
    self.monitorReports(self.seed,"SEED","next")
 
  def seed(self,service,subjectName,content):
    print "%s - %s" % (content["time"],content["value"])
 
dispatcher = netsvc.Dispatcher()
dispatcher.monitor(signal.SIGINT)
 
exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
exchange.connect("localhost",11111,5)
 
subscriber = Subscriber()
 
dispatcher.run()

This recipe can form the basis of many different types of applications ranging from instant messaging up to alarm systems for monitoring IT or TELCO network equipment and stock market data feeds.

Partly because of Python being used at various levels, but also because of how the underlying architecture is designed, one shouldn't actually expect to be able to pass a complete stock market feed through this, but such applications generally only deal with a subset of such data anyway.

The "netsvc" module comes as part of OSE which is available from:

http://ose.sourceforge.net

This example only shows a small subset of the actual functionality available in OSE. Other middleware like functionality such as a system for message oriented request/reply is also available.