Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
# JMS Topic Listener
# FB - 201012116
import time
import threading
import pyactivemq
from pyactivemq import ActiveMQConnectionFactory
# from xml.dom.minidom import parseString # for pretty XML print
# import inspect # for inspect

receivedTotal = 0
receivedMessages = []
receivedTopics = []

class MessageListener(pyactivemq.MessageListener):
    def __init__(self, topic):
        pyactivemq.MessageListener.__init__(self)
        self.topic = topic

    def onMessage(self, message):
        global receivedTotal
        global receivedMessages
        global receivedTopics
        receivedTotal += 1
        # receivedMessages.append(message.text)
        receivedMessages.append(message)
        receivedTopics.append(self.topic.name)        

class JmsTopicListener( threading.Thread ):
    def __init__(self, brokerUrl, username, password, topics, sleep):
        threading.Thread.__init__(self)
        self.sleep = sleep
        global receivedTotal
        global receivedMessages
        global receivedTopics
        self.receivedTotal = 0
        self.receivedMessages = []
        self.receivedTopics = []
        self.f = ActiveMQConnectionFactory(brokerUrl)
        self.f.username = username;
        self.f.password = password;
        self.conn = self.f.createConnection()
        self.session = self.conn.createSession()

        # JMS filter
        messageSelector = ""
        # messageSelector = "destination = 'myDestination'"

        # create a consumer for each topic
        self.tops = []
        self.consumers = []
        self.listeners = []

        i = 0
        for name in topics:
            self.tops.append(self.session.createTopic(name))
            self.consumers.append(self.session.createConsumer(self.tops[i], \
                messageSelector))
            self.listeners.append(MessageListener(self.tops[i]))
            self.consumers[i].messageListener = self.listeners[i]
            i += 1

    def run ( self ):
        self.conn.start()
        time.sleep(self.sleep)
        self.conn.close()

    def getReceivedTotal(self):
        return receivedTotal

    def getReceivedTopics(self):
        return receivedTopics
    
    def getReceivedMessages(self):
        return receivedMessages

# MAIN
if __name__ == "__main__":

    brokerUrl = 'tcp://myPC:61616'
    username = 'myUsername'
    password = 'myPassword'
    topics = ['/topic/myFirstTopic', '/topic/mySecondTopic']
    sleep = 30 # listen messages for 30 seconds

    j = JmsTopicListener(brokerUrl, username, password, topics, sleep)
    j.start()
    j.join() # wait until the thread finished running
    n = j.getReceivedTotal()
    t = j.getReceivedTopics()
    m = j.getReceivedMessages()
    print "Number of messages: " + str(n)
    print

    for k in range(n):
        print "message # " + str(k + 1)
        print
        print "topic: " + t[k]
        print
        # print m[k]
        # print parseString(m[k].text).toprettyxml()
        # print

        message = m[k]
        # print inspect.getmembers(message)
        # print

        print "JMS header:"
        print
        print "correlationID: " + str(message.correlationID)
        print "deliveryMode: "  + str(message.deliveryMode)
        print "destination: "   + str(message.destination)
        print "expiration: "    + str(message.expiration)
        print "messageID: "     + str(message.messageID)
        print "priority: "      + str(message.priority)
        print "redelivered: "   + str(message.redelivered)
        print "replyTo: "       + str(message.replyTo)
        print "timestamp: "     + str(message.timestamp)
        print "type: "          + str(message.type)
        print

        try:
            print "Special message properties:"
            print
            print "Topic: " + str(message.getStringProperty("Topic"))
            print "Priority: " + str(message.getStringProperty("Priority"))
            print "RequestId: " + str(message.getStringProperty("RequestId"))
            print "TimeToLive: " + str(message.getStringProperty("TimeToLive"))
            print
        except Exception as e:
            # print e
            # print
            pass

        print "message text:"
        print message.text

History