spring integration-TCP服务器和TCP客户端的注解配置
integration
HACS gives you a powerful UI to handle downloads of all your custom needs.
项目地址:https://gitcode.com/gh_mirrors/in/integration
免费下载资源
·
spring integration的基本概念请参考概述和核心。
tcp&udp模块请参考tcp&udp。
一个基本的spring integration模块有以下几个组件:
- 入站通道适配器
- 入站通道
- 消息处理器
- 网关
- 出站通道
- 出站通道适配器
其中前三个是消息消费和处理过程,后三个是消息生产和发送过程。
TCP服务器配置
- 使用网关(同步,需要应答)
@EnableIntegration
@Configuration
public class TcpServerConfig {
@Value("${tcp.port}")
private int port;
@Bean
public MessageChannel serverIn() { //入站通道
return new DirectChannel();
}
@Bean
public AbstractServerConnectionFactory serverCF() {
return new TcpNetServerConnectionFactory(this.port);
}
@Bean
public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) { //网关
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(connectionFactory);
inGate.setRequestChannelName("serverIn");
return inGate;
}
@ServiceActivator(inputChannel = "serverIn") //消息处理器
public String upCase(String in) {//返回值即应答
return in.toUpperCase();//简单的业务逻辑:变成大写并应答
}
}
发送过来的消息,需要用\r\n分割才能被解析为单个的消息。参考默认的ByteArrayCrLfSerializer。
- 适配器(异步,不需应答)
@EnableIntegration
@Configuration
public class TcpServerConfig {
private final TcpGateway tcpGateway;
@Value("${tcp.port}")
private int port;
public TcpConfig(TcpGateway tcpGateway) { //注入网关,用于发送消息
this.tcpGateway = tcpGateway;
}
@Bean
public MessageChannel serverIn() { //入站通道
return new DirectChannel();
}
@Bean
public AbstractServerConnectionFactory serverCF() {
return new TcpNetServerConnectionFactory(this.port);
}
@Bean
public TcpReceivingChannelAdapter tcpInAdapter(AbstractServerConnectionFactory connectionFactory) { //入站适配器
TcpReceivingChannelAdapter inGate = new TcpReceivingChannelAdapter();
inGate.setConnectionFactory(connectionFactory);
inGate.setOutputChannelName("serverIn");
return inGate;
}
@ServiceActivator(inputChannel = "serverIn") //消息处理器
public void upCase(Message<String> in) { //不直接用String或byte[]的原因是消息头的ip_connectionId字段,用来表明应答由哪个连接发送
System.err.println(in.getHeaders().get("ip_connectionId") + "=" + in.getPayload());
Message<String> reply = MessageBuilder.createMessage(in.getPayload().toUpperCase(), in.getHeaders()); //同样简单逻辑:转成大写应答
tcpGateway.send(reply);
}
@Bean
public MessageChannel serverOut() { //出站通道
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "serverOut")
public MessageHandler tcpOutAdapter(AbstractServerConnectionFactory connectionFactory) { //出站适配器
TcpSendingMessageHandler outGate = new TcpSendingMessageHandler();
outGate.setConnectionFactory(connectionFactory);
return outGate;
}
@MessagingGateway(defaultRequestChannel = "serverOut") //出站消息网关
public interface TcpGateway {
void send(Message<String> out);
}
}
需要注意的是:
- 入站适配器和出站适配器用【同一个】【服务端】连接工厂
- 每个消息负载需要用\r\n分割才能被划分为单个的消息
- 应答消息的消息头必须包含ip_connectionId,不然不知道用哪个连接发送
TCP客户端配置
- 使用网关(同步)
主动发布型客户端
@EnableIntegration
@Configuration
public class TcpClientConfig implements ApplicationListener<ApplicationEvent> {
private final TcpGateway tcpGateway;
@Value("${tcp.host}")
private String host;
@Value("${tcp.port}")
private int port;
public TcpClientConfig(TcpGateway tcpGateway) {
this.tcpGateway = tcpGateway;
}
@Bean
public AbstractClientConnectionFactory clientCF() {
return new TcpNetClientConnectionFactory(this.host, this.port);
}
@MessagingGateway(defaultRequestChannel = "clientOut")
public interface TcpGateway {
String send(String out);
}
@Bean
public MessageChannel clientOut() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "clientOut")
public MessageHandler tcpOutGateway(AbstractClientConnectionFactory connectionFactory) {
TcpOutboundGateway inGate = new TcpOutboundGateway();
inGate.setConnectionFactory(connectionFactory);
inGate.setOutputChannelName("replyTcp");
return inGate;
}
@Bean
public MessageChannel replyTcp() {
return new DirectChannel();
}
@Transformer(inputChannel = "replyTcp") //应答默认是字节数组,需要转换,和TcpGateway.send()返回值一致
public String convertReply(byte[] bytes) {
return new String(bytes);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
String reply = tcpGateway.send("first"); //同步
System.err.println("reply:" + reply);
}
}
被动接收型客户端
@EnableIntegration
@Configuration
public class TcpClientConfig implements ApplicationListener<ApplicationEvent> {
@Value("${tcp.host}")
private String host;
@Value("${tcp.port}")
private int port;
@Bean
public AbstractClientConnectionFactory clientCF() {
return new TcpNetClientConnectionFactory(this.host, this.port);
}
@Bean
public TcpInboundGateway tcpInGateway(AbstractClientConnectionFactory connectionFactory) {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setClientMode(true); //配置客户端模式
inGate.setConnectionFactory(connectionFactory);
inGate.setRequestChannelName("clientIn");
inGate.setReplyChannelName("replyTcp");
return inGate;
}
@Bean
public MessageChannel clientIn() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "clientIn")
public String clientInput(String in) {
System.err.println("in: " + in);
return in.toUpperCase(); //同步
}
@Bean
public MessageChannel replyTcp() {
return new DirectChannel();
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof TcpConnectionOpenEvent) {
TcpConnection connection = (TcpConnection) event.getSource();
System.err.println("connection:" + connection.getConnectionId());
}
}
- 使用适配器(异步)
@EnableIntegration
@Configuration
public class TcpClientConfig implements ApplicationListener<ApplicationEvent> {
private final TcpGateway tcpGateway;
@Value("${tcp.host}")
private String host;
@Value("${tcp.port}")
private int port;
public TcpClientConfig(TcpGateway tcpGateway) {
this.tcpGateway = tcpGateway;
}
@Bean
public MessageChannel clientIn() {
return new DirectChannel();
}
@Bean
public AbstractClientConnectionFactory clientCF() { //使用【客户端】连接工厂
return new TcpNetClientConnectionFactory(this.host, this.port);
}
@Bean
public TcpReceivingChannelAdapter tcpInAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inGate = new TcpReceivingChannelAdapter();
inGate.setConnectionFactory(connectionFactory);
inGate.setClientMode(true); //打开client-mode
inGate.setOutputChannelName("clientIn");
return inGate;
}
@ServiceActivator(inputChannel = "clientIn")
public void upCase(Message<String> in) {
System.err.println(in.getHeaders().get("ip_connectionId") + "=" + in.getPayload());
}
@MessagingGateway(defaultRequestChannel = "clientOut")
public interface TcpGateway {
void send(Message<String> out);
}
@Bean
public MessageChannel clientOut() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "clientOut")
public MessageHandler tcpOutAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler outGate = new TcpSendingMessageHandler();
outGate.setClientMode(true);
outGate.setConnectionFactory(connectionFactory);
return outGate;
}
@Override
public void onApplicationEvent(ApplicationEvent event) { //监听连接打开事件
if (event instanceof TcpConnectionOpenEvent) {
TcpConnection connection = (TcpConnection) event.getSource();
Map<String, Object> map = new HashMap<>();
map.put("ip_connectionId", connection.getConnectionId());
MessageHeaders headers = new MessageHeaders(map);
Message<String> first = MessageBuilder.createMessage("first", headers);
tcpGateway.send(first);
}
}
}
注意:
- 设置clientMode为true
- 使用【客户端】连接工厂
- 入站适配器和出站适配器使用同一个连接工厂
- 监听连接打开事件
GitHub 加速计划 / in / integration
4.97 K
1.24 K
下载
HACS gives you a powerful UI to handle downloads of all your custom needs.
最近提交(Master分支:2 个月前 )
8d999fb4
3 个月前
3cfbe3da
Co-authored-by: Erik Montnemery <erik@montnemery.com> 3 个月前
更多推荐
已为社区贡献1条内容
所有评论(0)