Welcome, guest | Sign In | My Account | Store | Cart

Generic JMS Topic Listener using pyactivemq.

Python, 136 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python
# JMS Topic Listener
# FB - 201012116
import time
import threading
import pyactivemq
from pyactivemq import ActiveMQConnectionFactory
# from xml.dom.minidom import parseString # for pretty XML print
# import inspect # for inspect

receivedTotal = 0
receivedMessages = []
receivedTopics = []

class MessageListener(pyactivemq.MessageListener):
    def __init__(self, topic):
        pyactivemq.MessageListener.__init__(self)
        self.topic = topic

    def onMessage(self, message):
        global receivedTotal
        global receivedMessages
        global receivedTopics
        receivedTotal += 1
        # receivedMessages.append(message.text)
        receivedMessages.append(message)
        receivedTopics.append(self.topic.name)        

class JmsTopicListener( threading.Thread ):
    def __init__(self, brokerUrl, username, password, topics, sleep):
        threading.Thread.__init__(self)
        self.sleep = sleep
        global receivedTotal
        global receivedMessages
        global receivedTopics
        self.receivedTotal = 0
        self.receivedMessages = []
        self.receivedTopics = []
        self.f = ActiveMQConnectionFactory(brokerUrl)
        self.f.username = username;
        self.f.password = password;
        self.conn = self.f.createConnection()
        self.session = self.conn.createSession()

        # JMS filter
        messageSelector = ""
        # messageSelector = "destination = 'myDestination'"

        # create a consumer for each topic
        self.tops = []
        self.consumers = []
        self.listeners = []

        i = 0
        for name in topics:
            self.tops.append(self.session.createTopic(name))
            self.consumers.append(self.session.createConsumer(self.tops[i], \
                messageSelector))
            self.listeners.append(MessageListener(self.tops[i]))
            self.consumers[i].messageListener = self.listeners[i]
            i += 1

    def run ( self ):
        self.conn.start()
        time.sleep(self.sleep)
        self.conn.close()

    def getReceivedTotal(self):
        return receivedTotal

    def getReceivedTopics(self):
        return receivedTopics
    
    def getReceivedMessages(self):
        return receivedMessages

# MAIN
if __name__ == "__main__":

    brokerUrl = 'tcp://myPC:61616'
    username = 'myUsername'
    password = 'myPassword'
    topics = ['/topic/myFirstTopic', '/topic/mySecondTopic']
    sleep = 30 # listen messages for 30 seconds

    j = JmsTopicListener(brokerUrl, username, password, topics, sleep)
    j.start()
    j.join() # wait until the thread finished running
    n = j.getReceivedTotal()
    t = j.getReceivedTopics()
    m = j.getReceivedMessages()
    print "Number of messages: " + str(n)
    print

    for k in range(n):
        print "message # " + str(k + 1)
        print
        print "topic: " + t[k]
        print
        # print m[k]
        # print parseString(m[k].text).toprettyxml()
        # print

        message = m[k]
        # print inspect.getmembers(message)
        # print

        print "JMS header:"
        print
        print "correlationID: " + str(message.correlationID)
        print "deliveryMode: "  + str(message.deliveryMode)
        print "destination: "   + str(message.destination)
        print "expiration: "    + str(message.expiration)
        print "messageID: "     + str(message.messageID)
        print "priority: "      + str(message.priority)
        print "redelivered: "   + str(message.redelivered)
        print "replyTo: "       + str(message.replyTo)
        print "timestamp: "     + str(message.timestamp)
        print "type: "          + str(message.type)
        print

        try:
            print "Special message properties:"
            print
            print "Topic: " + str(message.getStringProperty("Topic"))
            print "Priority: " + str(message.getStringProperty("Priority"))
            print "RequestId: " + str(message.getStringProperty("RequestId"))
            print "TimeToLive: " + str(message.getStringProperty("TimeToLive"))
            print
        except Exception as e:
            # print e
            # print
            pass

        print "message text:"
        print message.text