Active MQ的使用
Active MQ 是Apache出品,最流行的,能力强劲的开源消息总线。
一、Active MQ特性:
1、多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4、通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5、支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6、支持通过JDBC和journal提供高速的消息持久化
7、从设计上保证了高性能的集群,客户端-服务器,点对点
8、支持Ajax
9、支持与Axis的整合
10、可以很容易的调用内嵌JMS provider,进行测试
二、消息中间件
接下来让我们了解下消息中间件(MOM:Message Orient middleware)的概念:
1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
2. 负责建立网络通信的通道,进行数据的可靠传送。
3. 保证数据不重发,不丢失
4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
三、Active MQ使用实例
/**
* 消息队列辅助类
* 提供从消息队列里读取、写入
* mq地址正式环境从配置文件里读取
*
*/
public class ActiveMqTookit {
private Connection connection = null;
// private String subject = "TOOL.DEFAULT"; "tcp://localhost:61616"
public static String mqBroker = "tcp://localhost:61616/";//mq地址
public static String spiderStatus="spiderStatus";//爬虫状态subject
protected BrokerService brokerService;
/**
* 以下为正式环境数据库配置文件读取
*/
// try {
// InputStream is =
// JdbcUtil.class.getClassLoader().getResourceAsStream("classpath*:mq.properties");
// Properties pro = new Properties();
// pro.load(is);
// mqBroker = pro.getProperty("activemq.url");
// } catch (IOException e1) {
// // TODO Auto-generated catch block
// e1.printStackTrace();
// }
/**
* 服务端active 写入消息队列
*
* @param msgList
* @param subject
* @param socketStr
*/
public void writeMsgToActiveMq(ArrayList<String> msgList, String subject,
String socketStr) {
try {
ActiveMQConnectionFactory connectionFactory = null;
Session session = null;
session = getSession(socketStr, connectionFactory, session);
// 创建destination
Destination destination = session.createQueue(subject);
// 创建producer
MessageProducer producer = session.createProducer(destination);
// 设置JMS的持久性
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (String s : msgList) {
TextMessage message = session.createTextMessage(s);
// 发生消息message
producer.send(message);
// 关闭资源
message.clearProperties();
}
session.close();
connection.stop();
connection.close();
System.out.println("msgList的长度是:"+msgList.size());
System.out.println("关闭资源。。。。");
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 服务端active json数据类型 写入消息队列
* @param msgJsonList
* @param subject
* @param socketStr
*/
public void writeMsgToActiveMqByJson(ArrayList<JSONObject> msgJsonList, String subject,
String socketStr){
ArrayList<String> msgList=new ArrayList<String>();
for(JSONObject obj:msgJsonList){
msgList.add(obj.toString());
}
writeMsgToActiveMq(msgList,subject,socketStr);
}
/**
* 服务端active json数据类型 写入消息队列
* @param msgJsonList
* @param subject
* @param socketStr
*/
public void writeMsgToActiveMqByJson(JSONArray array, String subject,
String socketStr){
ArrayList<String> msgList=new ArrayList<String>();
for(int i=0;i<array.size();i++){
msgList.add(array.getJSONObject(i).toString());
}
writeMsgToActiveMq(msgList,subject,socketStr);
}
/**
* 获取消息队列session对象
*
* @param socketStr
* @param connectionFactory
* @param session
* @return
*/
public Session getSession(String socketStr,
ActiveMQConnectionFactory connectionFactory, Session session) {
try {
if (null == connectionFactory) {
connectionFactory = new ActiveMQConnectionFactory(socketStr);
}
// 创建connection
if (null == connection) {
connection = connectionFactory.createConnection();
}
// 创建session,设置消息确认机制
if (null == session) {
connection.start();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
return session;
} catch (JMSException e) {
e.printStackTrace();
JdbcUtil.executeSql("insert into spi_u_exceptionlog(exceptionType,description ) values ('activateMQ未启动','activateMQ未启动,请启动activateMQ')");
}
return null;
}
/**
* 服务端active 读取消息队列
* @param readCount
* @param socketUrl
* @param subject
* @return
*/
public ArrayList<String> readMsgFromActiveMq(int readCount,
String socketUrl, String subject) {
ArrayList<String> msgList = new ArrayList<String>();
ActiveMQConnectionFactory connectionFactory = null;
Session session = null;
try {
// 创建session
session = getSession(socketUrl, connectionFactory, session);
// 创建destination
Destination destination = session.createQueue(subject);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < readCount; i++) {
Message msg = consumer.receive(3000);
if(msg==null){
break;
}
String str = ((TextMessage) msg).getText();
msgList.add(str);
msg.clearProperties();
}
return msgList;
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
session.close();
connection.stop();
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
return null;
}
/**
* 从mq中读取数据
* @param spider
* @param readCount
* @return
*/
public static ArrayList<String> readFromMq(String subject,int readCount){
ActiveMqTookit activeMqTookit = new ActiveMqTookit();
ArrayList<String> resultList = activeMqTookit.readMsgFromActiveMq(readCount, ActiveMqTookit.mqBroker, subject);
return resultList;
}
/**
* 写入数据到mq
* @param spider
* @param readCount
* @return
*/
public static void writeToMq(JSONArray jsonArray,String subject){
ActiveMqTookit activeMqTookit = new ActiveMqTookit();
activeMqTookit.writeMsgToActiveMqByJson(jsonArray, subject,ActiveMqTookit.mqBroker);
}
public static void writeToMq(JSONObject object,String subject){
ActiveMqTookit activeMqTookit = new ActiveMqTookit();
ArrayList<String> msgList = new ArrayList<String>();
msgList.add(object.toString());
activeMqTookit.writeMsgToActiveMq(msgList, subject, ActiveMqTookit.mqBroker);
}
}
更多推荐
所有评论(0)