Spring Integration 实例讲解
简介
最近学习到的工具,资料很少,但还是要记录下自己目前的理解,官方的说发就不说了网上都一样,这里我说点自己的理解,其实spring integeration就类似一个水电系统,总闸,各层楼的控制,分流,聚合,过滤,沉淀,消毒,排污这里的每一个环节都类似一个系统服务,可能是jms,可能是redis,可能是MongoDB,可能是Tcp/UDP,可能是job,可能是我们系统服务的任何一个模块
那么Spring Integration扮演的角色就是将这些功能能够连接起来组成一个完整的服务系统,实现企业系统的集成的解决方案 。就像管道一样将各个模块连接到一起,管道能够连接到千家万户需要很多零件水表,分头管,水龙头,管道开关等等这些就是Spring Integration的主要组件
组件介绍
Spring Integration 主要有Message、Channel、Message EndPoint组成
-
Message
-
Message是用来在不同部分之间传递的数据,类似于水
-
Channel
-
在消息系统中,消息发送者发送消息到通道(Channel),消息接受者从通道(Channel)接收消息。
-
channel就好像一个管子,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。这里channel分两种,一种是point-to-point点对点的,一种是publish-subscribe发布订阅形式的,这个和JMS是一样的。(所有消息都是这样的)。
-
Spring Integration中,轮询通道(Pollable Channels)具有在一个队列中缓冲消息的能力。缓冲的优势在于它能够调节接入消息流量,从而防止系统负荷过载。然而,正如其名称所示,这也增加了一些复杂性,只有配置了轮询器后,一个消息消费者才能从这个通道中接收消息。另外来说,订阅通道(Subscribable Channel)要求连接它的消费者依从简单的消息驱动模式
-
QueueChannel实现内部包装了一个队列(需要给出轮训器),DirectChannel有着“点对点”的语义,由于DirectChannel是最简单的一种通道,不需要诸如“调度”、“轮询器线程管理”等方面的任何附加配置,所以在Spring Integration中,该类型通道被认作为默认的消息通道。详细的信道说明文档(点击下载)
-
链接:https://pan.baidu.com/s/1OnLfH31g-sARExnvfADiOQ
提取码:e1aw -
消息终端(Message EndPoin)
-
转换接入的请求到服务层调用,然后转换服务层返回值到接出的响应(endpoint)
-
消息端点(Message EndPoint)是真正处理消息的(Message)组件,它还可以控制通道的路由。就像分水管,电表,水龙头开关,总闸什么的下面我们来揭开神秘的面纱
(1)Channel Adapter
通道适配器(Channel Adapter)是一种连接外部系统或传输协议的端点(EndPoint),可以分为入站(inbound)和出站(outbound)。
通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。
Spring Integration内置了如下的适配器:就像各个地方的水都能接到管子里适配了
RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI
Twitter、XMPP、WebServices(SOAP、REST)、WebSocket
(2) Gateway
消息网关(Gateway)类似于Adapter,但是提供了双向的请求/返回集成方式,也分为入站(inbound)和出站(outbound)。
Spring Integration 对响应的Adapter都提供了Gateway。
(3) Service Activator
Service Activator 可调用Spring的Bean来处理消息,并将处理后的结果输出到指定的消息通道。就是将输入和输出连接好这样就可以运作了
(4) Router(类似分水管,将水送往不同地方)
路由(Router) 可根据消息体内容(Payload Type Router)、消息头的值(Header Value Router) 以及定义好的接收表(Recipient List Router) 作为条件,来决定消息传递到的通道。
(5) Filter(就像工业用水和饮用水,需要过滤到不同地方)
过滤器(Filter) 类似于路由(Router),不同的是过滤器不决定消息路由到哪里,而是决定消息是否可以传递给消息通道。
(6) Splitter(饮用水也要分成多个管口给多人饮用)
拆分器(Splitter)将消息拆分为几个部分单独处理,拆分器处理的返回值是一个集合或者数组。
(7) Aggregator(将废水集中起来处理)
聚合器(Aggregator)与拆分器相反,它接收一个java.util.List作为参数,将多个消息合并为一个消息。
(8) Transformer(就类似于净化水的程序一样)
转换器(Transformer)是对获得的消息进行一定的转换处理(如数据格式转换).
实例演示
买饮料
- 配置文件如下:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans:beans xmlns="http://www.springframework.org/schema/integration"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:beans="http://www.springframework.org/schema/beans"
5 xmlns:context="http://www.springframework.org/schema/context"
6 xmlns:stream="http://www.springframework.org/schema/integration/stream"
7 xsi:schemaLocation="http://www.springframework.org/schema/beans
8 http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
9 http://www.springframework.org/schema/context
10 http://www.springframework.org/schema/context/spring-context-2.5.xsd
11 http://www.springframework.org/schema/integration
12 http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
13 http://www.springframework.org/schema/integration/stream
14 http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd">
15
16 <!-- 开启 Annotation支持 -->
17 <annotation-config />
18 <!-- 设置Spring 扫描 Annotation 包路径 -->
19 <context:component-scan base-package="org.springframework.integration.samples.cafe.annotation" />
20 <!-- 配置一个GateWay组件,提供消息的发送和接收。接口Cafe,提供一个void placeOrder(Order order);方法
21 该方法标记了@Gateway(requestChannel="orders"), 实现向orders队列实现数据的发送
22 -->
23 <gateway id="cafe" service-interface="org.springframework.integration.samples.cafe.Cafe" />
24 <!-- 订单Channel -->
25 <channel id="orders" />
26 <!-- 饮料订单Channel,处理饮料的类别 -->
27 <channel id="drinks" />
28 <!-- 冷饮生产Channel 最大待处理的数据量为 10-->
29 <channel id="coldDrinks">
30 <queue capacity="10" />
31 </channel>
32 <!-- 热饮生产Channel 最大待处理的数据量为 10-->
33 <channel id="hotDrinks">
34 <queue capacity="10" />
35 </channel>
36 <!-- 定义最终进行生产的消息队列 -->
37 <channel id="preparedDrinks" />
38 <!-- 定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕-->
39 <stream:stdout-channel-adapter id="deliveries" />
40
41 </beans:beans>
- 我们来看一下整体服务是怎么启动的
首先我们来看一下CafeDemo这个类,它触发下定单操作
public class CafeDemo {
2
3 public static void main(String[] args) {
4 //加载Spring 配置文件 "cafeDemo.xml"
5 AbstractApplicationContext context = null;
6 if(args.length > 0) {
7 context = new FileSystemXmlApplicationContext(args);
8 }
9 else {
10 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11 }
12 //取得 Cafe实列
13 Cafe cafe = (Cafe) context.getBean("cafe");
14 //准备 发送100条消息(订单)
15 for (int i = 1; i <= 100; i++) {
16 Order order = new Order(i);
17 // 一杯热饮 参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
18 order.addItem(DrinkType.LATTE, 2, false);
19 // 一杯冷饮 参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
20 order.addItem(DrinkType.MOCHA, 3, true);
21 //下发订单,把消息发给 orders 队列
22 cafe.placeOrder(order);
23 }
24 }
25
26 }
下面是Cafe接口的源代码
public interface Cafe {
//定义GateWay, 把消息发送到 orders 队列, Message的payLoad属性,保存 order参数值
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
OrderSplitter 源代码
1 //设置成Spring-integration组件
2 @MessageEndpoint
3 public class OrderSplitter {
4
5 //实现Splitter模式, 接收 orders队列的消息,调用orderSplitter Bean的split方法,进行消息的分解
6 //并把分解后的消息,发送到drinks队列
7 @Splitter(inputChannel="orders", outputChannel="drinks")
8 public List<OrderItem> split(Order order) {
9 return order.getItems();
10 }
11
12 }
OrderSplitter.split把消息拆分后,变成多个消息,发送到drinks队列.由drinkRouter进行消息的接收。
1 //设置成Spring-integration组件
2 @MessageEndpoint
3 public class DrinkRouter {
4
5 //实现Router模式,接收 drinks队列的消息, 并触发 drinkRouter Bean的 resolveOrderItemChannel方法
6 //由在 resolveOrderItemChannel该方法的返回值(String--队列名称)表示把消息路由到那个队列上(coldDrinks或hotDrinks)
7 @Router(inputChannel="drinks")
8 public String resolveOrderItemChannel(OrderItem orderItem) {
9 return (orderItem.isIced()) ? "coldDrinks" : "hotDrinks";
10 }
11
12 }
下面看一下,如果是一杯冷饮,则消息发送到 coldDrinks队列
如果是一杯热饮,则消息发送到 hotDrinks队列
接下来看coldDrinks, hotDrink 的队列由谁来监听:
查看源代码后,是由Barista.java来处理
1 //设置成Spring-integration组件
2 @Component
3 public class Barista {
4
5 private long hotDrinkDelay = 5000;
6
7 private long coldDrinkDelay = 1000;
8
9 private AtomicInteger hotDrinkCounter = new AtomicInteger();
10
11 private AtomicInteger coldDrinkCounter = new AtomicInteger();
12
13
14 public void setHotDrinkDelay(long hotDrinkDelay) {
15 this.hotDrinkDelay = hotDrinkDelay;
16 }
17
18 public void setColdDrinkDelay(long coldDrinkDelay) {
19 this.coldDrinkDelay = coldDrinkDelay;
20 }
21
22 //配置接收"hotDrinks"队列,处理后,把结果发给队列prepareColdDrink
23 @ServiceActivator(inputChannel="hotDrinks", outputChannel="preparedDrinks")
24 public Drink prepareHotDrink(OrderItem orderItem) {
25 try {
26 Thread.sleep(this.hotDrinkDelay);
27 System.out.println(Thread.currentThread().getName()
28 + " prepared hot drink #" + hotDrinkCounter.incrementAndGet() + " for order #"
29 + orderItem.getOrder().getNumber() + ": " + orderItem);
30 return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
31 orderItem.getShots());
32 } catch (InterruptedException e) {
33 Thread.currentThread().interrupt();
34 return null;
35 }
36 }
37
38 //配置接收"coldDrinks"队列,处理后,把结果发给队列prepareColdDrink
39 @ServiceActivator(inputChannel="coldDrinks", outputChannel="preparedDrinks")
40 public Drink prepareColdDrink(OrderItem orderItem) {
41 try {
42 Thread.sleep(this.coldDrinkDelay);
43 System.out.println(Thread.currentThread().getName()
44 + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + " for order #"
45 + orderItem.getOrder().getNumber() + ": " + orderItem);
46 return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
47 orderItem.getShots());
48 } catch (InterruptedException e) {
49 Thread.currentThread().interrupt();
50 return null;
51 }
52 }
53
54 }
接下来,已经把订单需要生产的饮料已经完成,现在可以交给服务员(waier)交给客人了。
这里使用的aggregate模式,让服务器等待这个订单的所有饮料生产完后的,交给客户.
1 //设置成Spring-integration组件
2 @MessageEndpoint
3 public class Waiter {
4
5 //配置 aggregator模式。
6 @Aggregator(inputChannel = "preparedDrinks", outputChannel = "deliveries", timeout = 5 * 60 * 1000)
7 public Delivery prepareDelivery(List<Drink> drinks) {
8 return new Delivery(drinks);
9 }
10
11 }
12
最后我们使用一个 stream channel adaptor把订单生产完成的饮料输出。
<!-- 定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕-->
<stream:stdout-channel-adapter id="deliveries"/>
jms做一个简单的示例。
一个简单的Bean
public class JmsMessageBean implements Serializable {
private String name = null;
private Integer age = null;
private Date birthday = null;
private List<String> manbers = null;
//...getter and setter
}
一个MessageHandler:
public class JmsMessageCustomerHandler implements MessageHandler {
public JmsMessageCustomerHandler() {
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//输出.
Object obj = message.getPayload();
if(obj == null) {
System.out.println("null");
} else if(obj instanceof String) {
System.out.println(obj);
} else if(obj instanceof JmsMessageBean) {
JmsMessageBean bean = (JmsMessageBean)obj;
System.out.println(ReflectionToStringBuilder.reflectionToString(bean));
} else {
System.out.println(ReflectionToStringBuilder.reflectionToString(message));
}
}
}
一个消息的中转,因为我不是把消息传递给下一个系统,我只是把它简单的输出。
public class JmsMessageCustomerTransformer implements Transformer {
public JmsMessageCustomerTransformer() {
}
@Override
public Message<?> transform(Message<?> message) {
//不做任何事,原样返回
return message;
}
}
`<?xml version="1.0" encoding="UTF-8"?>
<!-- jms 连接工厂 -->
<bean id="activeMQJmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="activeMQJmsFactory" />
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>
<!-- jms Topic -->
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic"
autowire="constructor">
<constructor-arg value="jmstopic" />
</bean>
<bean id="messageConverter" class="net.dintegration.jms.JmsMessageConverter" />
<bean id="messageHander" class="net.dintegration.handler.JmsMessageCustomerHandler" />
<bean id="messageTransformer" class="net.dintegration.transformer.JmsMessageCustomerTransformer" />
<!-- jms 模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="myTopic" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<integration:channel id="jmsinchannel"/>
<integration:channel id="jmsoutchannel" />
<jms:inbound-channel-adapter id="jmsIn" destination="myTopic" channel="jmsinchannel" jms-template="jmsTemplate">
<integration:poller fixed-rate="30000"/>
</jms:inbound-channel-adapter>
<integration:transformer ref="messageTransformer"
input-channel="jmsinchannel" output-channel="jmsoutchannel" />
<integration:service-activator ref="messageHander" input-channel="jmsoutchannel" />
`
测试类:
public static void main(String[] args) {
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("jmsintegration/jmsIntSubscribeContext.xml");
context.start();
System.out.println("Please type something and hit return");
}
我在本机开启了一个ActiveMQ,上述测试类运行了之后线程会阻塞,JDK不会退出。使之接收JMS消息。
我另写了一个测试类发送了一条消息。如:
@Test
public void testSendJmsMessage() throws Exception {
System.out.println("=============================================");
JmsMessageBean bean = new JmsMessageBean();
bean.setAge(23);
bean.setBirthday(new Date());
bean.setManbers(Arrays.asList("123", "234", "345"));
bean.setName("Jms");
publisher.sendMessage(bean);
}
接收消息如下:
net.dintegration.jms.JmsMessageBean@49c54f01[name=Jms,age=23,birthday=Thu Nov 01 20:19:35 CST 2012,manbers=[123, 234, 345]]
实例三集成JDBC
一个系统向表写数据,另一个系统定时的扫描新加入的数据,然后把新加入的数据提取出来,做一些处理。然后更新标志或者转移。
我创建一个这样的数据库。DDL SQL如:
CREATE TABLE PUBLIC.PUBLIC.ATTR_MESSAGE (
ATT_CODE VARCHAR(20) NOT NULL,
PARENT_CODE VARCHAR(20),
ATT_TEXT VARCHAR(100),
SEQ NUMERIC(8, 0),
OPT_DATE DATE,
MARK VARCHAR(1) DEFAULT ‘N’,
PRIMARY KEY(ATT_CODE)
);
如上面的表结构,我从别的数据库提取了几个列创建一个表。 MARK就是一个标志列,当新加入的数据为N, 处理后的会置成Y。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:hdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
http://www.springframework.org/schema/integration/jdbc
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.1.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.1.xsd">
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:jdbc/jdbc.properties</value>
</list>
</property>
</bean>
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="${jdbc.driverClass}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="jdbcMessageHandler" class="net.dintegration.handler.JdbcMessageHandler" />
<integration:channel id="jdbcinchannel" />
<hdbc:inbound-channel-adapter channel="jdbcinchannel" data-source="dataSource"
query="SELECT ATT_CODE, PARENT_CODE, ATT_TEXT, SEQ, OPT_DATE, MARK FROM ATTR_MESSAGE WHERE MARK = 'N'"
update="UPDATE ATTR_MESSAGE SET MARK = 'Y' WHERE ATT_CODE IN (:ATT_CODE)">
<integration:poller fixed-rate="10000">
<integration:transactional />
</integration:poller>
</hdbc:inbound-channel-adapter>
<integration:service-activator input-channel="jdbcinchannel" ref="jdbcMessageHandler"/>
</beans>
`请你注意其中的:
query=“SELECT ATT_CODE, PARENT_CODE, ATT_TEXT, SEQ, OPT_DATE, MARK FROM ATTR_MESSAGE WHERE MARK = ‘N’”
update=“UPDATE ATTR_MESSAGE SET MARK = ‘Y’ WHERE ATT_CODE IN (:ATT_CODE)”
它做作用就是把表ATTR_MESSAGE中MARK=‘N’的数据过滤出来, 放到jdbcMessageHandler中处理,然后按照提取时的 ATT_CODE分别把标志位 MARK置成Y。`
如上,我们只需要编写一个 jdbcMessageHandler处理我们的数据就好,其他的一切都让Spring Integration为我们做好了。
public class JdbcMessageHandler implements MessageHandler {
private static Log log = LogFactory.getLog(JdbcMessageHandler.class);
public JdbcMessageHandler() {
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object obj = message.getPayload();
//分别按照各种样式输出obj
if(obj == null) {
log.info("null");
} else if(obj instanceof String) {
log.info(obj);
}else if(obj instanceof List) {
List bean = (List)obj;
log.info(bean);
} else {
log.info(ReflectionToStringBuilder.reflectionToString(message));
}
}
}
OK。我向建立的表中插入2条数据, 然后测试。测试类:
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("jdbc/jdbcIntegrationContext.xml");
context.start(); //让线程在这里阻塞,防止JVM退出
}
测试log如:
11-26 19:27:18 [INFO] [support.DefaultLifecycleProcessor(334)] Starting beans in phase 2147483647
11-26 19:27:19 [INFO] [handler.JdbcMessageHandler(49)] [{ATT_CODE=123456, PARENT_CODE=Root, ATT_TEXT=测试数据, SEQ=1, OPT_DATE=14:17:47, MARK=N}, {ATT_CODE=234567, PARENT_CODE=123456, ATT_TEXT=test, SEQ=2, OPT_DATE=14:20:41, MARK=N}]
很明显它读到了2条数据输出了。请注意,我在Spring中配置的integration:poller fixed-rate=“10000”,也就是说每10秒中扫描一次ATTR_MESSAGE表.我再次用一个SQL把刚处理过的数据置成N。如: UPDATE attr_message set mark = ‘N’
它也再次输出了日志,如:
11-26 19:30:18 [INFO] [handler.JdbcMessageHandler(49)] [{ATT_CODE=123456, PARENT_CODE=Root, ATT_TEXT=测试数据, SEQ=1, OPT_DATE=14:17:47, MARK=N}, {ATT_CODE=234567, PARENT_CODE=123456, ATT_TEXT=test, SEQ=2, OPT_DATE=14:20:41, MARK=N}]
它又读取了MARK为N的数据。就这样几乎不写任何多余的代码就实现了我上面提到的场景。而我们需要做的,仅仅写一个MessageHandler处理我们的数据。
那么他的扩展性呢?
如果你仔细看了,你就发现
<integration:channel id=“jdbcinchannel” />
<integration:service-activator input-channel=“jdbcinchannel” ref=“jdbcMessageHandler”/>
这样的代码在上一个例子JMS也曾出现过类似的.如:
<integration:channel id="jmsinchannel"/>
<integration:channel id="jmsoutchannel" />
<jms:inbound-channel-adapter id="jmsIn" destination="myTopic" channel="jmsinchannel" jms-template="jmsTemplate">
<integration:poller fixed-rate="30000"/>
</jms:inbound-c
hannel-adapter>
<integration:transformer ref="messageTransformer"
input-channel="jmsinchannel" output-channel="jmsoutchannel" />
<integration:service-activator ref="messageHander" input-channel="jmsoutchannel" />
总结:Spring Integration就是通过像上面类似的方式把任何的数据通过管道一样的把数据导向下一个需要的地方。inbound-channel-adapter是在把一个jms的输入源绑定到 jmsinchannel上, transformer使用这个输入源转给 jmsoutchannel, jmsoutchannel又是下一个消费者的输入源。假如增加了从文件系统读取文件到jmsoutchannel呢?或者还有Tcp获得的数据到jmsoutchannel呢?因为inbound-channel-adapter可以把任何输入绑定到jmsinchannel。再加一个inbound-channel-adapter不就可以了
更多推荐
所有评论(0)