{"id":415,"date":"2013-05-29T14:52:22","date_gmt":"2013-05-29T14:52:22","guid":{"rendered":"http:\/\/putridparrot.com\/blog\/?p=415"},"modified":"2013-05-29T15:19:58","modified_gmt":"2013-05-29T15:19:58","slug":"activemq-and-nms","status":"publish","type":"post","link":"https:\/\/putridparrot.com\/blog\/activemq-and-nms\/","title":{"rendered":"ActiveMQ and NMS"},"content":{"rendered":"<p><strong>Getting Started<\/strong><\/p>\n<p>ActiveMQ can be connected to in various ways. In this post I&#8217;m going to concentrate on NMS for .NET integration. <\/p>\n<p>For the sample code I&#8217;ve created a console application and using nu-get have added the packages named Apache.NMS and Apache.NMS.ActiveMQ.<\/p>\n<p>To begin with we need to run up ActiveMQ either locally or remotely (ensure you&#8217;ve noted down the connection string if you&#8217;re running remotely, for now I&#8217;ll assume the ActiveMQ server was started locally).<\/p>\n<p>The first thing our client application must do is create both the connection and session object to allow us to interact with ActiveMQ, this is done using the following code (it&#8217;s pretty self explanatory)<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nIConnectionFactory factory = new ConnectionFactory();\r\nusing (IConnection connection = factory.CreateConnection())\r\n{\r\n   connection.Start();\r\n   using(ISession session = connection.CreateSession())\r\n   {\r\n      \/\/ do something with the session\r\n   }\r\n}\r\n<\/pre>\n<p>Note: both the connection and session are disposable so don;t forget to dispose of them either directly by calling Close() or indirectly using the using statement.<\/p>\n<p>The above code does not specify the endpoint for the connection, to handle a remote server (or one not on the default port on the local machine) we can use the following code as a replacement for the ConnectionFactory() code<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nUri connecturi = new Uri(&quot;activemq:tcp:\/\/myserver:61616&quot;);\r\nIConnectionFactory factory = new NMSConnectionFactory(connecturi);\r\n<\/pre>\n<p><strong>Topics and Queues<\/strong><\/p>\n<p>Best to read the <a href=\"http:\/\/activemq.apache.org\/how-does-a-queue-compare-to-a-topic.html\" title=\"Topics and Queues\" target=\"_blank\">ActiveMQ documentation<\/a> for a full explanation.<\/p>\n<p>But basically a topic works on a publish and subscribe pattern. One or more subscriptions may exist to a topic and all will receive published messages. Whereas a queue is implemented as per load balancer semantics, in that each consumer will get a message until it&#8217;s been acknowledged.<\/p>\n<p><strong>Enumerating Topics and Queues<\/strong><\/p>\n<p>The following is pretty much taken from the source code accompanying the NMS source, I&#8217;ve removed the code for setting up the connection\/session<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\npublic const string QUEUE_ADVISORY_DESTINATION = &quot;ActiveMQ.Advisory.Queue&quot;;\r\npublic const string TOPIC_ADVISORY_DESTINATION = &quot;ActiveMQ.Advisory.Topic&quot;;\r\npublic const string TEMPQUEUE_ADVISORY_DESTINATION = &quot;ActiveMQ.Advisory.TempQueue&quot;;\r\npublic const string TEMPTOPIC_ADVISORY_DESTINATION = &quot;ActiveMQ.Advisory.TempTopic&quot;;\r\n\r\npublic const string ALLDEST_ADVISORY_DESTINATION = QUEUE_ADVISORY_DESTINATION + &quot;,&quot; +\r\n                       TOPIC_ADVISORY_DESTINATION + &quot;,&quot; +\r\n                       TEMPQUEUE_ADVISORY_DESTINATION + &quot;,&quot; +\r\n                       TEMPTOPIC_ADVISORY_DESTINATION;\r\n\r\nprivate void Enumerate(string destination, Action&lt;DestinationInfo&gt; action)\r\n{\r\n   IDestination dest = session.GetTopic(destination);\r\n   using (IMessageConsumer consumer = session.CreateConsumer(dest))\r\n   {\r\n      IMessage advisory;\r\n\r\n      while ((advisory = consumer.Receive(TimeSpan.FromMilliseconds(2000))) != null)\r\n      {\r\n         ActiveMQMessage am = advisory as ActiveMQMessage;\r\n\t if (am != null &amp; am.DataStructure != null)\r\n \t {\r\n\t    DestinationInfo info = am.DataStructure as DestinationInfo;\r\n \t    if (info != null)\r\n\t    {\r\n\t       action(info);\r\n\t    }\r\n\t }\r\n      }\r\n   }\t\t\t\r\n}\r\n\r\npublic void EnumerateQueues()\r\n{\r\n   Console.WriteLine(&quot;Listing all Queues on Broker:&quot;);\r\n   Enumerate(QUEUE_ADVISORY_DESTINATION, info =&gt; \r\n                Console.WriteLine(&quot;   Queue: &quot; + info.Destination));\r\n   Console.WriteLine(&quot;Listing Complete.&quot;);\r\n}\r\n\r\npublic void EnumerateTopics()\r\n{\r\n   Console.WriteLine(&quot;Listing all Topics on Broker:&quot;);\r\n   Enumerate(TOPIC_ADVISORY_DESTINATION, info =&gt; \r\n                Console.WriteLine(&quot;   Topic: &quot; + info.Destination));\r\n   Console.WriteLine(&quot;Listing Complete.&quot;);\r\n}\r\n\r\npublic void EnumerateDestinations()\r\n{\r\n   Console.WriteLine(&quot;Listing all Destinations on Broker:&quot;);\r\n   Enumerate(ALLDEST_ADVISORY_DESTINATION, info =&gt;\r\n   {\r\n      string destType = info.Destination.IsTopic ? &quot;Topic&quot; : &quot;Qeue&quot;;\r\n      destType = info.Destination.IsTemporary ? &quot;Temporary&quot; + destType : destType;\r\n      Console.WriteLine(&quot;   &quot; + destType + &quot;: &quot; + info.Destination);\r\n   });\r\n   Console.WriteLine(&quot;Listing Complete.&quot;);\r\n}\r\n<\/pre>\n<p>I&#8217;ve also added the code for EnumerateDestinations which allows us to enumerate all both Queues and Topics.<\/p>\n<p>Note the use of advisory messages, see <a href=\"http:\/\/activemq.apache.org\/advisory-message.html\" title=\"Advisory Messages\" target=\"_blank\">Advisory Messages<\/a><\/p>\n<p><strong>Connection strings<\/strong><\/p>\n<p>Sample of alternate connection strings<\/p>\n<p>activemq:tcp:\/\/activemqhost:61616<br \/>\nstomp:tcp:\/\/activemqhost:61613<\/p>\n<p><strong>Creating a producer<\/strong><\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nIDestination destination = SessionUtil.GetDestination(session, &quot;queue:\/\/FOO.BAR&quot;);\r\nusing (IMessageConsumer consumer = session.CreateConsumer(destination))\r\n{\r\n   connection.Start();\r\n   producer.DeliveryMode = MsgDeliveryMode.Persistent;\r\n   poducer.RequestTimeout = TimeSpan.FromSeconds(10);\r\n\r\n   \/\/ create message\r\n   ITextMessage request = session.CreateTextMessage(&quot;Hello World!&quot;);\r\n   request.NMSCorrelationID = &quot;abc&quot;;\r\n   request.Properties&#x5B;&quot;myKey&quot;] = &quot;Cheddar&quot;;\r\n\r\n   producer.Send(request);\r\n}\r\n<\/pre>\n<p><strong>Creating a consumer<\/strong><\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nIDestination destination = SessionUtil.GetDestination(session, &quot;queue:\/\/FOO.BAR&quot;);\r\nusing (IMessageConsumer consumer = session.CreateConsumer(destination))\r\n{\r\n   connection.Start();  \r\n   consumer.Listener += msg =&gt; \r\n   {\r\n      ITextMessage message = msg as ITextMessage;\r\n      if(message != null)\r\n      {\r\n         Console.WriteLine(&quot;Message Id: &quot; + message.NMSMessageId);\r\n         Console.WriteLine(&quot;Message text &quot; + message.Text);\r\n      }\r\n   }\r\n}\r\n<\/pre>\n<p>The above consumer code works in an event driven way and thus can be used as an async consumer an alternative would be to request data from the consumer in the following manner<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nITextMessage message = consumer.Receive() as ITextMessage;\r\nif(message != null)\r\n{\r\n   Console.WriteLine(&quot;Message Id: &quot; + message.NMSMessageId);\r\n   Console.WriteLine(&quot;Message text &quot; + message.Text);\r\n}\r\n<\/pre>\n<p>Note that ActiveMQ queues and topics can be created by simply creating a destination to use them, in the above case we&#8217;re creating a queue named FOO.BAR as in the following<\/p>\n<p>queue:\/\/FOO.BAR<\/p>\n<p>Instead of using SessionUtil.GetDestination(session, &#8220;queue:\/\/FOO.BAR&#8221;) we can also use session.GetQueue(&#8220;FOO.BAR&#8221;) and\/or session.GetTopic(&#8220;FOO.BAR&#8221;) or session.GetDestination(&#8220;queue:\/\/FOO.BAR&#8221;) and\/or session.GetDestination(&#8220;topic:\/\/FOO.BAR&#8221;).<\/p>\n<p>Note: if using SessionUtil.GetDestination(session, &#8220;FOO.BAR&#8221;) i.e. without the queue\/topic prefix the call defaults to a queue.<\/p>\n<p><strong>Don&#8217;t forget to cleanup<\/strong><\/p>\n<p>As mentioned earlier, one thing to remember is that the session and connection instances need to be disposed of preferably by calling the Close() method directly when no longer required.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Getting Started ActiveMQ can be connected to in various ways. In this post I&#8217;m going to concentrate on NMS for .NET integration. For the sample code I&#8217;ve created a console application and using nu-get have added the packages named Apache.NMS and Apache.NMS.ActiveMQ. To begin with we need to run up ActiveMQ either locally or remotely [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_jetpack_memberships_contains_paid_content":false,"footnotes":""},"categories":[41],"tags":[],"class_list":["post-415","post","type-post","status-publish","format-standard","hentry","category-activemq"],"jetpack_sharing_enabled":true,"jetpack_featured_media_url":"","_links":{"self":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts\/415","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/comments?post=415"}],"version-history":[{"count":9,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts\/415\/revisions"}],"predecessor-version":[{"id":421,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts\/415\/revisions\/421"}],"wp:attachment":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/media?parent=415"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/categories?post=415"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/tags?post=415"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}