Category Archives: ActiveMQ

ActiveMQ and NMS

Getting Started

ActiveMQ can be connected to in various ways. In this post I’m going to concentrate on NMS for .NET integration.

For the sample code I’ve created a console application and using nu-get have added the packages named Apache.NMS and Apache.NMS.ActiveMQ.

To begin with we need to run up ActiveMQ either locally or remotely (ensure you’ve noted down the connection string if you’re running remotely, for now I’ll assume the ActiveMQ server was started locally).

The first thing our client application must do is create both the connection and session object to allow us to interact with ActiveMQ, this is done using the following code (it’s pretty self explanatory)

IConnectionFactory factory = new ConnectionFactory();
using (IConnection connection = factory.CreateConnection())
{
   connection.Start();
   using(ISession session = connection.CreateSession())
   {
      // do something with the session
   }
}

Note: both the connection and session are disposable so don;t forget to dispose of them either directly by calling Close() or indirectly using the using statement.

The above code does not specify the endpoint for the connection, to handle a remote server (or one not on the default port on the local machine) we can use the following code as a replacement for the ConnectionFactory() code

Uri connecturi = new Uri("activemq:tcp://myserver:61616");
IConnectionFactory factory = new NMSConnectionFactory(connecturi);

Topics and Queues

Best to read the ActiveMQ documentation for a full explanation.

But basically a topic works on a publish and subscribe pattern. One or more subscriptions may exist to a topic and all will receive published messages. Whereas a queue is implemented as per load balancer semantics, in that each consumer will get a message until it’s been acknowledged.

Enumerating Topics and Queues

The following is pretty much taken from the source code accompanying the NMS source, I’ve removed the code for setting up the connection/session

public const string QUEUE_ADVISORY_DESTINATION = "ActiveMQ.Advisory.Queue";
public const string TOPIC_ADVISORY_DESTINATION = "ActiveMQ.Advisory.Topic";
public const string TEMPQUEUE_ADVISORY_DESTINATION = "ActiveMQ.Advisory.TempQueue";
public const string TEMPTOPIC_ADVISORY_DESTINATION = "ActiveMQ.Advisory.TempTopic";

public const string ALLDEST_ADVISORY_DESTINATION = QUEUE_ADVISORY_DESTINATION + "," +
                       TOPIC_ADVISORY_DESTINATION + "," +
                       TEMPQUEUE_ADVISORY_DESTINATION + "," +
                       TEMPTOPIC_ADVISORY_DESTINATION;

private void Enumerate(string destination, Action<DestinationInfo> action)
{
   IDestination dest = session.GetTopic(destination);
   using (IMessageConsumer consumer = session.CreateConsumer(dest))
   {
      IMessage advisory;

      while ((advisory = consumer.Receive(TimeSpan.FromMilliseconds(2000))) != null)
      {
         ActiveMQMessage am = advisory as ActiveMQMessage;
	 if (am != null & am.DataStructure != null)
 	 {
	    DestinationInfo info = am.DataStructure as DestinationInfo;
 	    if (info != null)
	    {
	       action(info);
	    }
	 }
      }
   }			
}

public void EnumerateQueues()
{
   Console.WriteLine("Listing all Queues on Broker:");
   Enumerate(QUEUE_ADVISORY_DESTINATION, info => 
                Console.WriteLine("   Queue: " + info.Destination));
   Console.WriteLine("Listing Complete.");
}

public void EnumerateTopics()
{
   Console.WriteLine("Listing all Topics on Broker:");
   Enumerate(TOPIC_ADVISORY_DESTINATION, info => 
                Console.WriteLine("   Topic: " + info.Destination));
   Console.WriteLine("Listing Complete.");
}

public void EnumerateDestinations()
{
   Console.WriteLine("Listing all Destinations on Broker:");
   Enumerate(ALLDEST_ADVISORY_DESTINATION, info =>
   {
      string destType = info.Destination.IsTopic ? "Topic" : "Qeue";
      destType = info.Destination.IsTemporary ? "Temporary" + destType : destType;
      Console.WriteLine("   " + destType + ": " + info.Destination);
   });
   Console.WriteLine("Listing Complete.");
}

I’ve also added the code for EnumerateDestinations which allows us to enumerate all both Queues and Topics.

Note the use of advisory messages, see Advisory Messages

Connection strings

Sample of alternate connection strings

activemq:tcp://activemqhost:61616
stomp:tcp://activemqhost:61613

Creating a producer

IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");
using (IMessageConsumer consumer = session.CreateConsumer(destination))
{
   connection.Start();
   producer.DeliveryMode = MsgDeliveryMode.Persistent;
   poducer.RequestTimeout = TimeSpan.FromSeconds(10);

   // create message
   ITextMessage request = session.CreateTextMessage("Hello World!");
   request.NMSCorrelationID = "abc";
   request.Properties["myKey"] = "Cheddar";

   producer.Send(request);
}

Creating a consumer

IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");
using (IMessageConsumer consumer = session.CreateConsumer(destination))
{
   connection.Start();  
   consumer.Listener += msg => 
   {
      ITextMessage message = msg as ITextMessage;
      if(message != null)
      {
         Console.WriteLine("Message Id: " + message.NMSMessageId);
         Console.WriteLine("Message text " + message.Text);
      }
   }
}

The above consumer code works in an event driven way and thus can be used as an async consumer an alternative would be to request data from the consumer in the following manner

ITextMessage message = consumer.Receive() as ITextMessage;
if(message != null)
{
   Console.WriteLine("Message Id: " + message.NMSMessageId);
   Console.WriteLine("Message text " + message.Text);
}

Note that ActiveMQ queues and topics can be created by simply creating a destination to use them, in the above case we’re creating a queue named FOO.BAR as in the following

queue://FOO.BAR

Instead of using SessionUtil.GetDestination(session, “queue://FOO.BAR”) we can also use session.GetQueue(“FOO.BAR”) and/or session.GetTopic(“FOO.BAR”) or session.GetDestination(“queue://FOO.BAR”) and/or session.GetDestination(“topic://FOO.BAR”).

Note: if using SessionUtil.GetDestination(session, “FOO.BAR”) i.e. without the queue/topic prefix the call defaults to a queue.

Don’t forget to cleanup

As mentioned earlier, one thing to remember is that the session and connection instances need to be disposed of preferably by calling the Close() method directly when no longer required.