Active MQ 是Apache出品,最流行的,能力强劲的开源消息总线。

一、Active MQ特性:

1、多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4、通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5、支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6、支持通过JDBC和journal提供高速的消息持久化
7、从设计上保证了高性能的集群,客户端-服务器,点对点
8、支持Ajax
9、支持与Axis的整合
10、可以很容易的调用内嵌JMS provider,进行测试

二、消息中间件

接下来让我们了解下消息中间件(MOM:Message Orient middleware)的概念: 
1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块; 
2. 负责建立网络通信的通道,进行数据的可靠传送。 
3. 保证数据不重发,不丢失 
4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

三、Active MQ使用实例

/**
 * 消息队列辅助类 
 * 提供从消息队列里读取、写入
 * mq地址正式环境从配置文件里读取
 * 
 */
public class ActiveMqTookit {
	
	private   Connection connection = null;

	// private String subject = "TOOL.DEFAULT"; "tcp://localhost:61616"

	public static String mqBroker = "tcp://localhost:61616/";//mq地址
	public static String spiderStatus="spiderStatus";//爬虫状态subject
	protected BrokerService brokerService;
	
	/**
	 * 以下为正式环境数据库配置文件读取
	 */
	// try {
	// InputStream is =
	// JdbcUtil.class.getClassLoader().getResourceAsStream("classpath*:mq.properties");
	// Properties pro = new Properties();
	// pro.load(is);
	// mqBroker = pro.getProperty("activemq.url");
	// } catch (IOException e1) {
	// // TODO Auto-generated catch block
	// e1.printStackTrace();
	// }	
				
		
	/**
	 * 服务端active 写入消息队列
	 * 
	 * @param msgList
	 * @param subject
	 * @param socketStr
	 */
	public void writeMsgToActiveMq(ArrayList<String> msgList, String subject,
			String socketStr) {
		try {
			ActiveMQConnectionFactory connectionFactory = null;
			Session session = null;
			session = getSession(socketStr, connectionFactory, session);
			// 创建destination
			Destination destination = session.createQueue(subject);
			// 创建producer
			MessageProducer producer = session.createProducer(destination);
			// 设置JMS的持久性
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			for (String s : msgList) {
				TextMessage message = session.createTextMessage(s);
				// 发生消息message
				producer.send(message);
				// 关闭资源
				message.clearProperties();
			}
			session.close();
			connection.stop();
			connection.close();
			System.out.println("msgList的长度是:"+msgList.size());
			System.out.println("关闭资源。。。。");
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 *  服务端active json数据类型  写入消息队列
	 * @param msgJsonList
	 * @param subject
	 * @param socketStr
	 */
	public void writeMsgToActiveMqByJson(ArrayList<JSONObject> msgJsonList, String subject,
			String socketStr){
		ArrayList<String> msgList=new ArrayList<String>();
		for(JSONObject obj:msgJsonList){
			msgList.add(obj.toString());
		}
		writeMsgToActiveMq(msgList,subject,socketStr);
	}
	
	/**
	 *  服务端active json数据类型  写入消息队列
	 * @param msgJsonList
	 * @param subject
	 * @param socketStr
	 */
	public  void writeMsgToActiveMqByJson(JSONArray array, String subject,
			String socketStr){
		ArrayList<String> msgList=new ArrayList<String>();
		for(int i=0;i<array.size();i++){
			msgList.add(array.getJSONObject(i).toString());
		}
		writeMsgToActiveMq(msgList,subject,socketStr);
	}

	/**
	 * 获取消息队列session对象
	 * 
	 * @param socketStr
	 * @param connectionFactory
	 * @param session
	 * @return
	 */
	public Session getSession(String socketStr,
			ActiveMQConnectionFactory connectionFactory, Session session) {
		try {
			if (null == connectionFactory) {
				connectionFactory = new ActiveMQConnectionFactory(socketStr);
			}
			// 创建connection
			if (null == connection) {
				connection = connectionFactory.createConnection();
				
			}
			// 创建session,设置消息确认机制
			if (null == session) {
				connection.start();
				session = connection.createSession(false,
						Session.AUTO_ACKNOWLEDGE);
			}
			return session;
		} catch (JMSException e) {
			e.printStackTrace();
			JdbcUtil.executeSql("insert into spi_u_exceptionlog(exceptionType,description ) values ('activateMQ未启动','activateMQ未启动,请启动activateMQ')");
		}
		return null;
	}

	/**
	 * 服务端active 读取消息队列
	 * @param readCount
	 * @param socketUrl
	 * @param subject
	 * @return
	 */
	public   ArrayList<String> readMsgFromActiveMq(int readCount,
			String socketUrl, String subject) {
		ArrayList<String> msgList = new ArrayList<String>();
		ActiveMQConnectionFactory connectionFactory = null;
		Session session = null;
		try {
			// 创建session
			session = getSession(socketUrl, connectionFactory, session);
			// 创建destination
			Destination destination = session.createQueue(subject);
			MessageConsumer consumer = session.createConsumer(destination);
			for (int i = 0; i < readCount; i++) {
				Message msg = consumer.receive(3000);
				if(msg==null){
					break;
				}
				String str = ((TextMessage) msg).getText();
				msgList.add(str);
				msg.clearProperties();
			}
			return msgList;
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				session.close();
				connection.stop();
				connection.close();
			} catch (JMSException ex) {
				ex.printStackTrace();
			}
		}
		return null;
	}
	
	/**
	 * 从mq中读取数据
	 * @param spider
	 * @param readCount
	 * @return
	 */
	public static ArrayList<String> readFromMq(String subject,int readCount){
		ActiveMqTookit activeMqTookit = new ActiveMqTookit();
		ArrayList<String> resultList = activeMqTookit.readMsgFromActiveMq(readCount, ActiveMqTookit.mqBroker, subject);
		return resultList;
	}
	
	/**
	 * 写入数据到mq
	 * @param spider
	 * @param readCount
	 * @return
	 */
	public static void writeToMq(JSONArray jsonArray,String subject){
		ActiveMqTookit activeMqTookit = new ActiveMqTookit();
		activeMqTookit.writeMsgToActiveMqByJson(jsonArray, subject,ActiveMqTookit.mqBroker);
	}
	
	public static void writeToMq(JSONObject object,String subject){
		ActiveMqTookit activeMqTookit = new ActiveMqTookit();
		ArrayList<String> msgList = new ArrayList<String>();
		msgList.add(object.toString());
		activeMqTookit.writeMsgToActiveMq(msgList, subject, ActiveMqTookit.mqBroker);
	}
	
}
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐