ActiveMQ是什么?

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

ActiveMQ特性列表

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

  6. 支持通过JDBC和journal提供高速的消息持久化

  7. 从设计上保证了高性能的集群,客户端-服务器,点对点

  8. 支持Ajax

  9. 支持与Axis的整合

  10. 可以很容易得调用内嵌JMS provider,进行测试

ActiveMQ的竞争者

  其他开源JMS供应商

  jbossmq(jboss 4)

  jboss messaging (jboss 5)

  joram-4.3.21 2006-09-22

  openjms-0.7.7-alpha-3.zip December 26, 2005

  mantamq

  ubermq

  SomnifugiJMS 2005-7-27

  开源的JMS Provider大部分都已经停止发展了,剩下的几个都是找到了东家,和某种J2EE 服务器挂钩,比如jboss mq 与jboss,joram与jonas(objectweb组织),ActiveMQ 与Geronimo(ASF APACHE基金组织),而在这3个之间,从网络底层来看,只有ActiveMQ使用了NIO,单从这个角度来看ActiveMQ在性能上会有一定的优势。

  商业JMS供应商

  IBM WebSphere MQ

  BEA WebLogic JMS

  Oracle AQ

  NonStop Server for Java Message Service(JMS)

  Sun Java System Message Queue

  Sonic jms

  TIBCO Enterprise For JMS

  iLinkMQ (国内)

  TongLink/Q(北京东方通科技)

  现在的商业J2EE 应用服务器大部分都会有JMS Provider的实现,毕竟应用服务器都已经花费不薄,也不在乎在里面送一个JMS Provider了,当然还是有独立的比如IBM WebSphere MQ,Sonic JMS ,前者肯定是商用MQ(这个概念不仅仅是JMS Provier了,只能说JMS 只是它提供的一个应用)中间的巨无霸了。

  从这点来看,ActiveMQ明显的竞争者并不多,因为它是作为独立的开源JMS Provider出现的,很容易被用于多种结构设计中,使用ActiveMQ作为默认JMS Provider的开源项目有ServiceMix,Geronimo.

安装ActiveMQ

  首先去http://activemq.apache.org/download.html 下载最新版本4.1.0release, 解压apache-activemq-4.1-incubator.zip(或者apache-activemq-4.1-incubator.tar.gz)目录如下:

  +bin (windows下面的bat和unix/linux下面的sh)

  +conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

  +data (默认是空的)

  +docs (index,replease版本里面没有文档,-.-b不知道为啥不带)

  +example (几个例子

  +lib (activemMQ使用到的lib)

  -apache-activemq-4.1-incubator.jar (ActiveMQ的binary)

  -LICENSE.txt

  -NOTICE.txt

  -README.txt

  -user-guide.html

  你可以使用bin/activemq.bat(activemq) 启动,如果一切顺利,你就会看见类似下面的信息:

  

运行信息

 

  几个小提示

  1. 这个仅仅是最基础的ActiveMQ的配置,很多地方都没有配置因此不要直接使用这个配置用于生产系统

  2. 有的时候由于端口被占用,导致ActiveMQ错误,ActiveMQ可能需要以下端口1099(JMX),61616(默认的TransportConnector)

  3. 如果没有物理网卡,或者MS的LoopBackAdpater Multicast会报一个错误

测试你的ActiveMQ

  由于ActiveMQ是一个独立的jms provider,所以我们不需要其他任何第三方服务器就可以马上做我们的测试了.编译example目录下面的程序 ProducerTool/ConsumerTool 是JMS参考里面提到的典型应用,Producer产生消息,Consumer消费消息,而且这个例子还可以加入参数帮助你测试刚才启动的本地 ActiveMQ或者是远程的ActiveMQ

  ProducerTool broker的地址,默认的是tcp://localhost:61616

  [true|flase] 是否使用topic,默认是false

  [subject] subject的名字,默认是TOOL.DEFAULT

  [durabl] 是否持久化消息,默认是false

  [messagecount] 发送消息数量,默认是10

  [messagesize] 消息长度,默认是255

  [clientID] durable为true的时候,需要配置clientID

  [timeToLive] 消息存活时间

  [sleepTime] 发送消息中间的休眠时间

  [transacte] 是否采用事务

  ConsumerTool broker的地址,默认的是tcp://localhost:61616

  [true|flase] 是否使用topic,默认是false

  [subject] subject的名字,默认是TOOL.DEFAULT

  [durabl] 是否持久化消息,默认是false

  [maxiumMessages] 接受最大消息数量,0表示不限制

  [clientID] durable为true的时候,需要配置clientID

  [transacte] 是否采用事务

  [sleepTime] 接受消息中间的休眠时间,默认是0,onMeesage方法不休眠

  [receiveTimeOut] 接受超时

  我们可以这样使用:

  先启动activeMQ,再打开两个命令窗口,都进入D:/activemq/example,一个运行:ant consumer,一个运行:ant producer,如果成功发送/接收了消息就OK了。
****************************************************************************************

 

C#中通过NMS调用ActiveMQ
2008年08月05日 星期二 16:17

   1.下载ActiveMQ
    下载地址:http://activemq.apache.org/nms/download.html
    运行 启动 :
    在解压缩的路径下 输入bin/activemq
    (查看状态 : netstat -an|find "61616")

    2.下载NMS
    下载地址:http://www.springframework.net/downloads/Spring.Messaging.Nms/
    我们需要用到 Spring.Core
              ActiveMQ
             NMS
             Spring.Messaging.NMS
   几个DLL

3.监听代码
  

using System;

using System.Collections.Generic;

using System.Text;

using System.Threading;

 

using ActiveMQ;

using Spring.Messaging.Nms;

using Spring.Messaging.Nms.Listener;

 

namespace ListenerConsole

{

    class Program

    {

        private const string URI = "tcp://localhost:61616";

        private const string DESTINATION = "test.queue";

 

        static void Main(string[] args)

        {

            try

            {

                ConnectionFactory connectionFactory = new ConnectionFactory(URI);

 

                using (SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer())

                {

                    listenerContainer.ConnectionFactory = connectionFactory;

                    listenerContainer.DestinationName = DESTINATION;

                    listenerContainer.MessageListener = new Listener();

                    listenerContainer.AfterPropertiesSet();

                    Console.WriteLine("Listener started.");

                    Console.WriteLine("Press <ENTER> to exit.");

                    Console.ReadLine();

                }

            }

            catch (Exception ex)

            {

                Console.WriteLine(ex);

                Console.WriteLine("Press <ENTER> to exit.");

                Console.Read();

            }

 

        }

    }

}

using System;

using Spring.Messaging.Nms;

using NMS;

namespace ListenerConsole

{

    class Listener : IMessageListener

    {

        public Listener()

        {

            Console.WriteLine("Listener created.rn");

        }

        #region IMessageListener Members

 

        public void OnMessage(NMS.IMessage message)

        {

            ITextMessage textMessage = message as ITextMessage;

            Console.WriteLine(textMessage.Text);

        }

 

        #endregion

    }

}

4.发送代码
    using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using Spring.Messaging.Nms;
using NMS;
using ActiveMQ.Commands;


namespace SenderConsole
{
    class Program
    {

       static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            using (IConnection connection = factory.CreateConnection())
            {
                Console.WriteLine("Created a connection!");

                ISession session = connection.CreateSession();

                ActiveMQTopic destination =(ActiveMQTopic) session.GetTopic("DTS");

                IMessageProducer producer = session.CreateProducer(destination);
                producer.Persistent = true;

                // lets send a message
                ITextMessage request = session.CreateTextMessage("DTS_SUCCESS");               

                producer.Send(destination, request);

                // lets consume a message
                //ActiveMQTextMessage message = (ActiveMQTextMessage)consumer.Receive();
                //if (message == null)
                //{
                //    Console.WriteLine("No message received!");
                //}
                //else
                //{
                //    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                //    Console.WriteLine("Received message with text: " + message.Text);
                //}
            }

            Console.ReadLine();

        }
    }
}

****************************************************************************************

 

http://d.download.csdn.net/down/1887348/ltyong

 

****************************************************************************************

 

You’ve probably heard of Java Message Service (JMS). It’s a standard Java API for creating, sending, receiving and reading messages. ActiveMQ, an Apache project, is an open source message broker that supports JMS 1.1. In addition to supporting JMS, it supports other protocols that allow clients to be written in a variety of languages. Look at this page for more information. Sounds good, doesn’t it? Let’s try it out using .NET, C# and Visual Studio 2005.

Download and install the JDK

ActiveMQ is written in Java, so you’ll need a Java Runtime Environment (JRE) to be able to run it. The Java Development Kit (JDK) comes with extra utilities that you’ll find useful later on. I used the Java Development Kit SE 6 update 1, which you can find here.

Download and install ActiveMQ

Get the latest release of ActiveMQ from the downloads section. I used version 4.1.1. Once you have downloaded the zip file, extract the contents to a suitable folder. To test the installation, open a command prompt and use the cd command to set the current folder to be the installation folder (in which you extracted the ActiveMQ files.) Then type the following command:

bin/activemq

All being well, you should see a number of lines of information – the last of which should be something like:

INFO  ActiveMQ JMS Message Broker (localhost, ID:your-PC-51222-1140729837569-0:0) has started

The installation notes on the ActiveMQ site point out that there are working directories that get created relative to the current working folder. This means that for ActiveMQ to work correctly, you must start it from the installation folder. To double check, start a new command prompt and type the following command:

netstat -an|find "61616"

The response should be something like the following:

  TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING

When you want to stop ActiveMQ, enter <CTRL+C>. For now leave it running.

Download Spring.Messaging.NMS

NMS is the .NET Messaging API. Spring .NET has a messaging library built on NMS. It’s under development, but you can get it here. I used version 20070320-1632. Extract the files from the zip file to somewhere sensible.

The Listener

Create a new console application. I called mine ListenerConsole. You need to add 4 references:

  1. Spring.Core
  2. ActiveMQ
  3. NMS
  4. Spring.Messaging.NMS

These dlls can all be found in the bin/net/2.0/debug folder of Spring.Messaging.NMS. Here’s the code for the Program class in the listener console:

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;

namespace ListenerConsole
{
    class Program
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory connectionFactory = new ConnectionFactory(URI);

                using (SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer())
                {
                    listenerContainer.ConnectionFactory = connectionFactory;
                    listenerContainer.DestinationName = DESTINATION;
                    listenerContainer.MessageListener = new Listener();
                    listenerContainer.AfterPropertiesSet();
                    Console.WriteLine("Listener started.");
                    Console.WriteLine("Press <ENTER> to exit.");
                    Console.ReadLine();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                Console.WriteLine("Press <ENTER> to exit.");
                Console.Read();
            }

        }
    }
}

The interesting part of the code is the creation and set up of the SimpleMessageListenerContainer. ConnectionFactory is from the ActiveMQ namespace and implements the IConnectionFactory interface defined in NMS. The Uri of the message broker is passed to the constructor. SimpleMessageListenerContainer is part of Spring.Messaging.NMS (in the Listener namespace.) Note that SimpleMessageListenerContainer implements IDisposable. The missing part of this puzzle is the Listener class. Create a new class in the project, call it Listener and insert the following code:

using System;
using Spring.Messaging.Nms;
using NMS;
namespace ListenerConsole
{
    class Listener : IMessageListener
    {
        public Listener()
        {
            Console.WriteLine("Listener created.rn");
        }
        #region IMessageListener Members

        public void OnMessage(NMS.IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            Console.WriteLine(textMessage.Text);
        }

        #endregion
    }
}

IMessageListener is defined in Spring.Messaging.NMS. Build and run the console.

Start jconsole

The JDK comes with a utility called jconsole. You’ll find it in the bin folder. So, launch a command prompt and cd to the bin folder of the JDK. Then type:

jconsole

This will launch the Java Monitoring & Management Console. To connect to the running instance of ActiveMQ, select Remote Process as shown below:

jconsole_activemq

Enter the following in the Remote Process text box: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi It’s easier to work out what to enter here than you might think. If you go back to the command prompt in which you launched ActiveMQ and look towards the top of the output, you’ll find a line that reads:

INFO  ManagementContext - JMX consoles can connect to service:jmx:ri:///jndi/rmi://localhost:1099/jmxrmi

Click Connect. Select the MBeans tab. Navigate to org.apache.activemq->localhost->Queue->test.queue->Operations->sendTextMessage as shown below:

Sending a text message from jconsole.

In the text box next to the sendTextMessage button, enter some text. I entered (somewhat unimaginatively) “Hello”. Now, click sendTextMessage. In the .NET console window for the listener, you should see the text you entered. So what just happened? jconsole put a message on the queue using JMS and our console application read it using NMS. Why not send yourself a couple more messages?

The Sender

To complete our foray into messaging with ActiveMQ, let’s create an application that can send messages. Create another Console Application. I called mine SenderConsole. You may have started to notice a pattern in my naming conventions. Add the same references you added to the listener console. Here’s the code for the sender console:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using Spring.Messaging.Nms;

namespace SenderConsole
{
    class Program
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            ConnectionFactory connectionFactory = new ConnectionFactory(URI);
            NmsTemplate template = new NmsTemplate(connectionFactory);
            template.ConvertAndSend(DESTINATION, "Hello from the sender.");
        }
    }
}

Run the sender console and you should find that the message “Hello from the sender.” has appeared in the console window of the listener.

Conclusion

Sending and receiving messages using ActiveMQ is enabled by NMS and Spring.Messaging.NMS. We’ve seen how to create a simple set up using .NET that can be extended for real-world needs. JMS is no longer the preserve of Java developers.

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐