#!/usr/bin/env python
# JMS Topic Listener
# FB - 201012116
import time
import threading
import pyactivemq
from pyactivemq import ActiveMQConnectionFactory
# from xml.dom.minidom import parseString # for pretty XML print
# import inspect # for inspect
receivedTotal = 0
receivedMessages = []
receivedTopics = []
class MessageListener(pyactivemq.MessageListener):
def __init__(self, topic):
pyactivemq.MessageListener.__init__(self)
self.topic = topic
def onMessage(self, message):
global receivedTotal
global receivedMessages
global receivedTopics
receivedTotal += 1
# receivedMessages.append(message.text)
receivedMessages.append(message)
receivedTopics.append(self.topic.name)
class JmsTopicListener( threading.Thread ):
def __init__(self, brokerUrl, username, password, topics, sleep):
threading.Thread.__init__(self)
self.sleep = sleep
global receivedTotal
global receivedMessages
global receivedTopics
self.receivedTotal = 0
self.receivedMessages = []
self.receivedTopics = []
self.f = ActiveMQConnectionFactory(brokerUrl)
self.f.username = username;
self.f.password = password;
self.conn = self.f.createConnection()
self.session = self.conn.createSession()
# JMS filter
messageSelector = ""
# messageSelector = "destination = 'myDestination'"
# create a consumer for each topic
self.tops = []
self.consumers = []
self.listeners = []
i = 0
for name in topics:
self.tops.append(self.session.createTopic(name))
self.consumers.append(self.session.createConsumer(self.tops[i], \
messageSelector))
self.listeners.append(MessageListener(self.tops[i]))
self.consumers[i].messageListener = self.listeners[i]
i += 1
def run ( self ):
self.conn.start()
time.sleep(self.sleep)
self.conn.close()
def getReceivedTotal(self):
return receivedTotal
def getReceivedTopics(self):
return receivedTopics
def getReceivedMessages(self):
return receivedMessages
# MAIN
if __name__ == "__main__":
brokerUrl = 'tcp://myPC:61616'
username = 'myUsername'
password = 'myPassword'
topics = ['/topic/myFirstTopic', '/topic/mySecondTopic']
sleep = 30 # listen messages for 30 seconds
j = JmsTopicListener(brokerUrl, username, password, topics, sleep)
j.start()
j.join() # wait until the thread finished running
n = j.getReceivedTotal()
t = j.getReceivedTopics()
m = j.getReceivedMessages()
print "Number of messages: " + str(n)
print
for k in range(n):
print "message # " + str(k + 1)
print
print "topic: " + t[k]
print
# print m[k]
# print parseString(m[k].text).toprettyxml()
# print
message = m[k]
# print inspect.getmembers(message)
# print
print "JMS header:"
print
print "correlationID: " + str(message.correlationID)
print "deliveryMode: " + str(message.deliveryMode)
print "destination: " + str(message.destination)
print "expiration: " + str(message.expiration)
print "messageID: " + str(message.messageID)
print "priority: " + str(message.priority)
print "redelivered: " + str(message.redelivered)
print "replyTo: " + str(message.replyTo)
print "timestamp: " + str(message.timestamp)
print "type: " + str(message.type)
print
try:
print "Special message properties:"
print
print "Topic: " + str(message.getStringProperty("Topic"))
print "Priority: " + str(message.getStringProperty("Priority"))
print "RequestId: " + str(message.getStringProperty("RequestId"))
print "TimeToLive: " + str(message.getStringProperty("TimeToLive"))
print
except Exception as e:
# print e
# print
pass
print "message text:"
print message.text