最近有幸,公司让我研究了spring-integration,对于这个spring出品的功能强大的工具,功能繁多且复杂。写此博客分享一下心得,也为记录一下最近研究这么久的知识点。理解的不够深,如果有错误的地方,希望各位朋友能批评指出。

一、what

    首先,什么是spring-integration?研究之初,对这根管道有些迷惑,这是队列?这个activeMQ有啥区别?待研究了一段时间之后,才发现,spring-integration越来越像曾经做过的esb组件。那么spring-integration到底是什么呢?

    官网给出的解释是,spring-integration是一个功能强大的EIP(Enterprise Integration Patterns),即企业集成模式。对,spring-integration是一个集大成者。就我自己的理解,集成了众多功能的它,是一种便捷的事件驱动消息框架用来在系统之间做消息传递的。

二、why

    那么,我们为什么用它呢?spring-integration的官网上,给出了以下说法

     spring-integration的目标

  •     提供一个简单的模型来实现复杂的企业集成解决方案
  •     为基于spring的应用添加异步的、消息驱动的行为
  •     让更多的Spring用户来使用他

    看这种解释,我的直观感觉是:啥玩意?不懂啊!接着看到spring-integration的原则

  •     组件之间应该是松散的,模块性的易测的
  •     应用框架应该强迫分离业务逻辑和集成逻辑
  •     扩展节点应该有更好的抽象和可以再使用的能力   

    感觉,这个应该说的是解耦吧。另外看了下其他人的理解,如果你的系统处在各个系统的中间,需要JMS交互,又需要Database/Redis/MongoDB,还需要监听Tcp/UDP等,还有固定的文件转移,分析。还面对着时不时的更改需求的风险。那么,它再适合不过了。

三、how

    那么,重点来了,如何使用呢?在介绍之前,先简单的介绍几个名词。

1.Message

    Message是它的基础构件和核心,所有的流程都围绕着Message运转,如图所示

    Message,就是所说的消息体,用来承载传输的信息用的。Message分为两部分,header和payload。header是头部信息,用来存储传输的一些特性属性参数。payload是用来装载数据的,他可以携带的任何Object对象,放什么都行,随你 。

2.MessageChannel

    消息管道,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。

    对于MessageChannel,有以下几种

    (1).PublishSubscribeChannel

    发布订阅式通道形式,多用于消息广播形式,发送给所有已经订阅了的用户。在3.x版本之前,订阅者如果是0,启动会报错或者发送的时候报错。在4.x版本后,订阅者是0,则仍然会返回true。当然,可以配置最小订阅者数量(min-subscribers)

    (2).QueueChannel

    队列模式通道,最常用的形式。与发布订阅通道不同,此通道实现点对点式的传输方式,管道内部是队列方式,可以设置管道的容量,如果内部的消息已经达到了最大容量,则会阻塞住,直到队列有时间,或者发送的消息被超时处理。

    (3).PriorityChannel

    优先级队列通道,我的理解为QueueChannel的升级版,可以无视排队,根据设置的优先级直接插队。(壕无人性)

    (4).RendezvousChannel

    前方施工,禁止通行!这个是一个强行阻塞的通道,当消息进入通道后,通道禁止通行,直到消息在对方通道receive()后,才能继续使用。

    (5).DirectChannel

    最简单的点对点通道方式,一个简单的单线程通道。是spring-integration的默认通道类型

    (6).ExecutorChannel

    多线程通道模式,开启多线程执行点对点通道形式。这个通道博主还未研究,不敢多说........

3.Message Endpoint

    消息的终点,或者我称他为消息节点,在channel你不能操作消息,只能在endpoint操作。对于常用的消息节点,有以下几种

    (1).Transformer 

    解释者,转换者,翻译者,怎么理解都可以。作用是可以将消息转为你想要的类型。可以将xml形式转换成string类型。

<!-- transformer转换器 -->
<int:channel id="transformerInChannel"/>
<int:transformer input-channel="transformerInChannel" output-channel="transformerOutChannel"
                     expression="payload.name.toUpperCase() + '- [' + T(java.lang.System).currentTimeMillis() + ']'"/>
<int:channel id="transformerOutChannel">
   <int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="transformerOutChannel" ref="receiveServiceImpl" method="helloTransformer">
   <int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>

    (2).Filter

    过滤器,顾名思义,过滤用的,用来判断一个消息是否应该被传输。用我的理解看,他就是spring-integration里面的if语句。

<!-- Filter过滤器 -->
<int:channel id="filterAChannel"/>
<int:filter input-channel="filterAChannel" output-channel="filterBChannel" expression="payload.name.equals('haha')"/>
<int:channel id="filterBChannel"/>
<int:service-activator input-channel="filterBChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>

    (3).Router 

    路由器,用来管理一个消息应该被发送到哪个channel中。相当于JAVA里面的switch case语句吧。判断条件很多,可是使用header里面的参数具体值(比如header里面有个定义为testRouter的参数,数值为A,那么消息经过路由会发送到判断为A的通道内,后面使用中再详细讲解)

    

    (4).Service Activator 

    我称他为服务激活器,是一个连接服务实例到消息系统的通用端点。对于服务激活器,可能是因为我理解的不够全面,我总是将他和通道适配器搞混,因为我自己测试发现,激活器和适配器都可以作为一个消息出通道的节点。

    (5).Channel Adapter

    通道适配器是将消息通道连接到某个其他系统或传输的端点。通道适配器可以是入站或出站。通常情况下,通道适配器将在消息与从其他系统(文件,HTTP请求,JMS消息等)接收或发送的任何对象或资源之间进行映射。

    (6).Channel Bridge

    通道桥梁,用来作为管道之间进行通信使用的,常用情景为:在一个输入管道,将管道的内容发送到另外N个管道输出,配置方式如下

    <!-- bridge -->
    <int:channel id="bridgeSendChannel"/>
    <int:bridge input-channel="bridgeSendChannel" output-channel="bridgeReceiveAChannel"/>
    <int:channel id="bridgeReceiveAChannel"/>
    <int:bridge input-channel="bridgeReceiveAChannel" output-channel="bridgeReceiveBChannel"/>
    <int:channel id="bridgeReceiveBChannel">
        <int:queue/>
    </int:channel>
    <int:outbound-channel-adapter channel="bridgeReceiveBChannel"
                                  expression="@receiveServiceImpl.helloBridge(payload.name,payload.age)">
        <int:poller fixed-delay="0"/>
    </int:outbound-channel-adapter>

    另外还有Splitter(分解器),Aggregator(聚合器)等。对于其他的消息节点,博主还没有做过多研究,就不再次误人子弟了。后续会将未研究到的一一补上。

4.Channel Interceptor

    管道拦截器,能够以非常优雅,非常温柔的方式捕获管道传递之间的节点。对于拦截器,spring-integration给了我们六种节点

分别是发送前,邮寄后,发送成功后,接收前,接收后,接受成功后。可以分别在不同的节点进行操作。

四、use(demo地址在本文最后)

下面使用到的Test类为

import lombok.Data;

/**
 * 普通测试dto
 * @author lin
 */
@Data
public class Test {

    private String name;

    private String age;
}

(1)普通方式

xml配置,这里配置了一个通道helloWorldChannel,配置了个接收激活点,即接收方的地址为helloServiceImpl里面的hello方法。(其中ref指对应接收的类名,method指类里面接收的方法)

    <!-- 测试dto模式传输 -->
    <int:channel id="testChannel"/>
    <int:service-activator input-channel="testChannel" ref="receiveServiceImpl" method="hello"/>

发送方Service里面

    /**
     * 测试传输dto
     */
    @Override
    public void testDto() {
        System.out.println("testDto方法");
        Test test = new Test();
        test.setName("testDto");
        test.setAge("18");
        testChannel.send(MessageBuilder.withPayload(test).build());
    }

接收方Service里面

    @Override
    public void hello(Test test) {
        System.out.println(test.getName() + " " + test.getAge());
    }

(2)普通多参数方式

xml配置,这里通过获取payload里面的具体参数来传参的形式

    <!-- 测试多参数传递 -->
    <int:channel id="moreParamChannel"/>
    <int:service-activator input-channel="moreParamChannel"
                           expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>

发送方Service里面,将所有的参数通过Map形式装载到payload里面

    /**
     * 测试多参数传输
     */
    @Override
    public void moreParamm() {
        System.out.println("greetMoreParam方法");
        HashMap<String, String> map = new HashMap();
        map.put("name", "moreParam");
        map.put("age", "18");
        helloWorldMoreParamChannel.send(MessageBuilder.withPayload(map).build());
    }

接收方Service里面

    @Override
    public void helloMoreParam(String name, String age) {
        System.out.println(name + " " + age);
    }

(3)JMS方式

xml配置,这里配置了个MQ,将消息放入mq中进行传递

    <!-- 测试Mq配置-->
    <int:channel id="topicChannel"/>
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>tcp://127.0.0.1:61616?trace=true&keepAlive=true</value>
        </property>
        <property name="useAsyncSend" value="true"/>
    </bean>
    <int-jms:outbound-channel-adapter channel="topicChannel" destination-name="topic.myTopic" pub-sub-domain="true"/>
    <int:channel id="listenerChannel"/>
    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="listenerChannel"
                                            destination-name="topic.myTopic" pub-sub-domain="true"/>
    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage"/>

发送方Service里面

    /**
     * 使用mq进行传输发送方法
     */
    @Override
    public void send() {
        HashMap<String,Object> map = new HashMap<>();
        map.put("name","MqService");
        map.put("age","18");
        topicChannel.send(MessageBuilder.withPayload(map).build());
    }

接收方Service里面

public void processMessage(HashMap<String,Object> map) {
        System.out.println("MessageListener::::::Received message: " + map.toString());
    }

(4)订阅方式

xml配置,这里配置了两个订阅者,订阅者分别是两个方法

    <!-- 测试订阅发布 -->
    <!--min-subscribers=""参数为预期最小订阅者,如果必须有订阅者,则这里填写最少数;默认值为0-->
    <int:publish-subscribe-channel id="pubsubChannel"/>
    <int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveOne">
    </int:outbound-channel-adapter>
    <int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveTwo">
    </int:outbound-channel-adapter>

发送方Service里面

    @Override
    public void pubsubSend() {
        Test test = new Test();
        test.setName("pubsubSend");
        test.setAge("18");
        publishSubscribeChannel.send(MessageBuilder.withPayload(test).build());
    }

接收方Service里面

    @Override
    public void helloReceiveOne(Test test){
        System.out.println("One:"+test.getName()+" "+test.getAge());
    }

    @Override
    public void helloReceiveTwo(Test test){
        System.out.println("Two:"+test.getName()+" "+test.getAge());
    }

(5)router方式

xml配置,这里配置了一个入口通道,当消息进入入口后,通过判断header里面的'tsetHeader'参数的值,如果值为A,则进入routerAChannel通道,如果为B则进入routerBChannel通道。进入通道后分别进入两者的接收方法中。其中两种方法用了传递类,和多参数传递的形式。

    <!-- 测试路由 -->
    <!-- 路由入口 -->
    <int:channel id="routingChannel">
        <int:queue/>
    </int:channel>
    <!-- 路由器 -->
    <int:header-value-router input-channel="routingChannel" header-name="testHeader">
        <int:poller fixed-delay="0"/>
        <int:mapping value="A" channel="routerAChannel"/>
        <int:mapping value="B" channel="routerBChannel"/>
    </int:header-value-router>
    <!-- 路由出口 -->
    <int:channel id="routerAChannel">
        <int:queue/>
    </int:channel>
    <int:outbound-channel-adapter channel="routerAChannel" ref="receiveServiceImpl" method="helloRouterTest">
        <int:poller fixed-delay="0"/>
    </int:outbound-channel-adapter>
    <int:channel id="routerBChannel">
        <int:queue/>
    </int:channel>
    <int:outbound-channel-adapter channel="routerBChannel"
                                  expression="@receiveServiceImpl.helloRouterMap(payload.name,payload.age)">
        <int:poller fixed-delay="0"/>
    </int:outbound-channel-adapter>

发送方Service里面

    @Override
    public void routerA(String name, String age) {
        Test test = new Test();
        test.setAge(age);
        test.setName(name);
        routingChannel.send(MessageBuilder.withPayload(test).setHeader("testHeader", "A").build());
    }

    @Override
    public void routerB(String name, String age) {
        HashMap<String,String> map = new HashMap<>();
        map.put("name", name);
        map.put("age", age);
        routingChannel.send(MessageBuilder.withPayload(map).setHeader("testHeader", "B").build());
    }

接收方Service里面

    @Override
    public void helloRouterTest(Test test){
        System.out.println("routerA方法");
        System.out.println("helloRouterTest:"+test.getName()+" "+test.getAge());
    }

    @Override
    public void helloRouterMap(String name,String age){
        System.out.println("routerB方法");
        System.out.println("helloRouterMap:"+name+" "+age);
    }

(6)网关方式

xml配置,在这里面配置了一个接口类,当调用这个接口的方法时,就会进入网关配置的通道

    <!-- 网关通道口模式,dto -->
    <int:channel id="getWayChannel">
        <int:queue/>
    </int:channel>
    <int:gateway service-interface="com.lin.integration.service.interfaces.UseGetWaySender" id="helloGetWaySender"
                 default-request-channel="getWayChannel"/>
    <int:outbound-channel-adapter channel="getWayChannel" ref="receiveServiceImpl" method="hello">
        <int:poller fixed-delay="0"></int:poller>
    </int:outbound-channel-adapter>

    <!-- 网关通道口模式,多参数传递 -->
    <int:channel id="getWayMoreParamChannel">
        <int:queue/>
    </int:channel>
    <int:gateway service-interface="com.lin.integration.service.interfaces.MoreParamSender" id="getWayMoreParamSender"
                 default-request-channel="getWayMoreParamChannel"/>
    <int:outbound-channel-adapter channel="getWayMoreParamChannel"
                                  expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)">
        <int:poller fixed-delay="0"></int:poller>
    </int:outbound-channel-adapter>

网关interface里面

public interface UseGetWaySender {

    void sendMessage(Test test);
	
}
public interface MoreParamSender {

    void sendMessage(Map map);
	
}

发送方Service里面

    /**
     * 测试网关dto
     */
    @Override
    public void getWay() {
        Test test = new Test();
        test.setAge("18");
        test.setName("getWay");
        useGetWaySender.sendMessage(test);
    }

    /**
     * 测试网关多参数
     */
    @Override
    public void getWayMoreParam() {
        HashMap<String, String> map = new HashMap();
        map.put("name", "getWayMoreParam");
        map.put("age", "18");
        moreParamSender.sendMessage(map);
    }

 (7)全局拦截器

拦截器中,将需要拦截的管道进行拦截,拦截之后就会对这个管道的发送端,接收端进行拦截,拦截的接口在上文已经提到过,拦截的配置如下

    <!-- 全局拦截器 -->
    <int:channel-interceptor pattern="testInterceptorChannel" order="3" ref="countingChannelInterceptor">
    </int:channel-interceptor>
    <int:channel id="testInterceptorChannel"/>
    <int:service-activator input-channel="testInterceptorChannel" ref="receiveServiceImpl" method="hello"/>

    

    对于近期的spring-integration研究,这些只是“初探”,如此好的一个框架模式,我也将在今后进行深入研究,会将文章进行补充,希望各位对于我文章里面的不足与错误的地方进行批评指出,从而能互相交流研究,多谢。

参考文献:

https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/

https://www.aliyun.com/jiaocheng/301276.html

https://blog.csdn.net/xiayutai1/article/details/53302652?locationNum=4&fps=1

http://www.iteye.com/topic/744524

https://blog.csdn.net/slivefox/article/details/3740541

https://my.oschina.net/zhzhenqin/blog/86586

http://www.importnew.com/16538.html

demo码云地址(10.21更新,增加了java dsl):

DoUbLE_tree/spring-integration-mydemo

GitHub 加速计划 / in / integration
4.97 K
1.24 K
下载
HACS gives you a powerful UI to handle downloads of all your custom needs.
最近提交(Master分支:2 个月前 )
8d999fb4 3 个月前
3cfbe3da Co-authored-by: Erik Montnemery <erik@montnemery.com> 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐