Netty 实现客户端和服务端的双向通信

后端主动向前端发送消息的实现方式

  • websocket : 客户端和服务端的双向通信,基于ws协议
  • sse: 实现服务端向客户端主动发送消息,基于http协议
  • netty: 服务端和客户端的双向通信,支持tcp,ws 等 并发性能很强

BIO/NIO/AIO

  • bio: 同步阻塞IO
  • NIO: 同步非阻塞IO
  • AIO: 异步非阻塞IO

同步与异步: 关注的是消息通知的机制。

  • 同步: 调用者需要主动等待或轮询结果。
  • 异步: 被调用者通过回调、事件通知等方式,主动通知调用者结果。

阻塞和非阻塞: 关注的是调用者的状态

  • 调用结果返回前,调用者线程被挂起,不能做其他事。
  • 调用结果返回前,调用者线程可以立即返回(去做其他事),不会被挂起。

Netty

netty是一个异步的,基于事件驱动的网络应用框架

用于快速开发可维护的,高性能

Netty基于Reactor模式

Reactor模式是一种典型的事件驱动+非阻塞I/O(NIO)的高并发网络编程模型,核心是利用I/O多路复用集中监听事件,并通过事件分发器调用对应的事件处理器完成业务逻辑,从而避免为每个连接创建线程的高开销

IO多路复用是一种高效的网络编程技术,允许单个线程同时监控多个文件描述符的状态。当某个描述符就绪时,程序可以立即对其进行读写操作。这种机制广泛应用于高并发场景,如网络服务器和数据库系统。

应用场景:

  • 物联网
  • 高并发互联网系统
  • 分布式系统: RPC的通信框架,DUbbo,Zookeeper,RocketMQ
  • 游戏行业

技术特征:

  • 高并发:基于 NIO(非阻塞 IO)开发,相比于 BIO(阻塞 IO),并发性能大大提高。
  • 传输快:依赖于零拷贝特性,减少不必要的内存拷贝,实现更高效的传输。
  • 封装好:封装了 NIO 操作的很多细节,提供了易于使用的调用接口。
  • 扩展性强:通过 ChannelHandler 可以灵活地扩展通信框架。
  • 性能优异:与其他 NIO 框架相比,Netty 的综合性能最优。
  • 运行稳定:修复了所有已知的 NIO bug,让开发人员可以专注于业务本身。

核心组件:

  • Bootstrap:Netty 提供的一个工厂类,可以通过它来完成 Netty 的客户端或服务端组件的组装,以及程序的初始化和启动。Netty 提供了两个引导类:ServerBootstrap 用于服务端,Bootstrap 用于客户端。
  • Boss Group:一个或多个 EventLoop,专门负责接收客户端连接(处理 Accept 事件)。它接收到连接后,会将连接注册到 Worker Group。
  • Worker Group:一组 EventLoop,负责处理连接的读写 I/O 事件以及执行 ChannelHandler 中的业务逻辑
  • EventLoop:本质是一个死循环,用于监听 I/O 事件和分配任务。一个 Channel 生命周期内的所有操作(连接、读、写)都由同一个 EventLoop 线程处理,保证了线程安全性 。
  • Channel:代表一个网络连接(Socket)。Netty 提供了 NioServerSocketChannel(服务端)、NioSocketChannel(客户端)等实现 。
  • ChannelHandler:业务逻辑的容器。例如处理入站数据的 ChannelInboundHandler 和处理出站数据的 ChannelOutboundHandler 。
  • ChannelPipeline:一个双向链表,负责将多个 ChannelHandler 串联起来。数据流经 Pipeline 时会被每个 Handler 处理

Netty服务端工作流程图

在这里插入图片描述

简单演示:

服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        // 1. 创建线程组: boss 处理连接,worker 处理读写
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 单线程处理连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认线程数为 CPU核心数*2
        try {
            // 2. 创建服务端启动助手
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 3. 指定 Channel 类型
                    .option(ChannelOption.SO_BACKLOG, 128) // 4. 设置 TCP 参数,等待队列大小
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 保持长连接
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 5. 配置 Pipeline
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            // 添加自定义的业务处理器
                            p.addLast(new EchoServerHandler());
                        }
                    });

            // 6. 绑定端口并启动服务 (同步等待)
            ChannelFuture f = b.bind(port).sync();
            System.out.println("服务器启动,监听端口: " + port);

            // 7. 等待服务端监听通道关闭
            f.channel().closeFuture().sync();
        } finally {
            // 8. 优雅关闭线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(8080).start();
    }
}
服务端ChannelHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

// 继承 ChannelInboundHandlerAdapter 来处理入站 I/O 事件
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    // 当有数据可读时被调用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("服务器收到: " + in.toString(CharsetUtil.UTF_8));

        // 将收到的消息写回客户端(注意:此时只是写入缓冲区,并未真正发送)
        ctx.write(in);
        // ctx.flush();
        // ctx.writeAndFlush(in); // 写入并刷新缓冲区
    }

    // 当数据读取完成时被调用
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        // 将缓冲区中的数据刷新到 Socket 并关闭通道
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    // 处理异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
客户端:
package com.yp.app1;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class EchoClient {
    // 服务端ip和端口
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new EchoClientHandler()); // 添加客户端处理器
                        }
                    });

            // 连接服务端
            ChannelFuture f = b.connect(host, port).sync();

            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭EventLoopGroup,释放资源
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoClient("127.0.0.1", 8080).start();
    }
}
客户端Handler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    // 当与服务器连接建立成功后,立即发送消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));
    }

    // 接收服务器的响应
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        System.out.println("客户端收到: " + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在这里插入图片描述

入站和出站

入站: 读数据,需要进行数据的解码

出站: 写数据,进行数据的编码

入站(Inbound):数据从外部进入系统。

流向:Socket -> ChannelPipeline -> 你的业务逻辑。

本质:读数据。

就像你家的信箱,邮递员(外部)把信塞进来,你(业务逻辑)去取信。

出站(Outbound):数据从系统发送到外部。

流向:你的业务逻辑 -> ChannelPipeline -> Socket。

本质:写数据。

就像你写好信要寄出去,你把信(业务逻辑)投进邮筒,邮递员(网络)取走送走

在 Netty 中,处理这两种事件的组件分别继承自不同的接口或类:

● 入站处理器(Inbound Handler)

○ 通常继承 ChannelInboundHandlerAdapter 或实现 ChannelInboundHandler,属于数据解码操作

○ 核心方法:channelRead()(有数据可读时触发)、channelActive()(连接建立时触发)、exceptionCaught()(异常时触发)等。

● 出站处理器(Outbound Handler)

○ 通常继承 ChannelOutboundHandlerAdapter 或实现 ChannelOutboundHandler。属于数据编码操作

○ 核心方法:write()(将数据写回客户端时触发)、flush()(刷新缓冲区时触发)、connect()、disconnect() 等。

Pipeline 的执行顺序

通过addLast()方法,向ChannelPipeline中添加入站和出站处理器

在 ChannelPipeline 中,入站处理器和出站处理器的执行方向是相反的。

假设我们把处理器像链表一样排列在 Pipeline 中:

Head <–> 入站处理器A <–> 入站处理器B <–> 出站处理器C <–> 出站处理器D <–> Tail

img

入站事件(从Head流向Tail)

Socket 收到数据(入站)。

  • 数据从 Head 开始,依次经过所有入站处理器(A -> B)。
  • 注意,它不会经过出站处理器(C、D),除非你在某个入站处理器里手动触发出站事件。
  • 到达 Tail 后,通常意味着数据解析完成,交给业务逻辑。

出站事件(从Tail流向Head)

  • 业务代码调用 ctx.writeAndFlush()(出站)。
  • 数据从 Tail 开始向前传播,依次经过所有出站处理器(D -> C)。
  • 注意,它不会反向经过入站处理器(B、A)。
  • 最终到达 Head,由底层的 Socket 将数据发送出去。

入站处理器:调用 ctx.fireChannelRead() 将事件向后(Tail方向)传递。

出站处理器:调用 ctx.write() 或 ctx.channel().write() 将事件向前(Head方向)传递。

定义3个相同的入站处理器

InBoundHandler1、InBoundHandler2、InBoundHandler3

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class InBoundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("InBoundHandler1 read: " + data);
        // If do not call fireChannelRead, the next ChannelHandler will not exec.
        ctx.fireChannelRead(msg);
        System.out.println("exit InBoundHandler1 channelRead");
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandler1 exception");
    }
}
定义3个相同的出站处理器

OutBoundHandler1、OutBoundHandler2、OutBoundHandler3

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class OutBoundHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandler1 write");
        ctx.write(msg);
        System.out.println("Exit OutBoundHandler1 write");
    }
}

粘包和半包

在基于TCP的通信中,半包是指一次读取到的只是一个完整消息的一部分,而粘包是一次读取到多个消息连在一起。

由于TCP是无边界的字节流协议,这种现象很常见。

演示代码:

服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws InterruptedException {
        EchoServer echoServer = new EchoServer(9999);
        System.out.println("服务器即将启动");
        echoServer.start();
    }

    public void start() throws InterruptedException {
        EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
            b.group(group)/*将线程组传入*/
                    .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .localAddress(new InetSocketAddress(port))/*指定服务器监听的端口*/
                    /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                    所以下面这段代码的作用就是为这个子channel增加handle*/
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(serverHandler);/*添加到该子channel的pipeline的尾部*/
                        }
                    });
            ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
            System.out.println("服务器启动完成,等待客户端的连接和数据.....");
            f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
        } finally {
            group.shutdownGracefully().sync();/*优雅关闭线程组*/
        }
    }

}

服务端handler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

// 同一个实例可以被多次添加到一个或者多个ChannelPipeline中
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("服务器接收信息:[" + request
                + "] 计数值:" + counter.incrementAndGet());
        String resp = "Hello," + request + ". Welcome"
                + System.getProperty("line.separator");
        // 向客户端返回信息
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));

    }

    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoClient {

    private final int port;
    private final String host;

    public EchoClient(int port, String host) {
        this.port = port;
        this.host = host;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
        try {
            final Bootstrap b = new Bootstrap();
            /*客户端启动必须*/
            b.group(group)/*将线程组传入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .remoteAddress(new InetSocketAddress(host, port))/*配置要连接服务器的ip地址和端口*/
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient(9999, "127.0.0.1").start();
    }
}

客户端handler

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 客户端读取到网络数据后的处理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("客户端收到[" + msg.toString(CharsetUtil.UTF_8)
                + "] 计数值:" + counter.incrementAndGet());
    }

    /*** 客户端被通知channel活跃后,做事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "张三 李四 王五 赵四" + System.getProperty("line.separator");
        for (int i = 0; i < 100; i++) {
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
输出:

在这里插入图片描述

解决方案

对于粘包和拆包问题,常见的解决方案有四种:

  • 客户端在发送数据包的时候,每个包都固定长度大小。如果客户端发送的数据长度不足指定字节的长度,则通过补充空格的方式补全到指定长度,Netty提供了FixedLengthFrameDecoder方式进行处理
  • 客户端在每个包的末尾使用固定的分隔符,例如\r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的\r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。Netty提供了两个编解码的类,LineBasedFrameDecoder(根据换行符)和DelimiterBasedFrameDecoder(自定义分隔符)进行处理
  • 将消息分为消息头和消息体,在消息头中存储整个消息的长度,只有读取到足够长度的消息,才认为是读到了一个完整的消息,一般使用LengthFieldBasedFrameDecoder与LengthFieldPrepender配合处理
  • 对于较复杂的协议信息,可以通过自定义协议进行粘包和拆包的处理。比如可以通过继承LengthFieldBasedFrameDecoder和LengthFieldPrepender来实现粘包和拆包的处理,也可以通过实现MessageToByteEncoder和ByteToMessageDecoder进行处理
下面通过使用分隔符,在server,和client添加如下代码,同时

在这里插入图片描述

注意,指定泛型要注意,匹配
在这里插入图片描述

输出:

在这里插入图片描述

实际操作中,会使用自定义格式这种方式:

自定义消息(协议)


public class Message {
    private int magicNumber;      // 魔数
    private byte version;         // 版本号
    private byte serializerType;  // 序列化类型
    private byte command;         // 指令类型
    private int length;           // 数据长度
    private String content;       // 实际内容


    public Message() {
        this.magicNumber = 0xCAFEBABE;
        this.version = 1;
        this.serializerType = 0;  // 默认使用字符串直接传输
    }


    public Message(byte command, String content) {
        this();
        this.command = command;
        this.content = content;
        this.length = content != null ? content.getBytes().length : 0;
    }


    // Getters and Setters
    public int getMagicNumber() {
        return magicNumber;
    }


    public void setMagicNumber(int magicNumber) {
        this.magicNumber = magicNumber;
    }


    public byte getVersion() {
        return version;
    }


    public void setVersion(byte version) {
        this.version = version;
    }


    public byte getSerializerType() {
        return serializerType;
    }


    public void setSerializerType(byte serializerType) {
        this.serializerType = serializerType;
    }


    public byte getCommand() {
        return command;
    }


    public void setCommand(byte command) {
        this.command = command;
    }


    public int getLength() {
        return length;
    }


    public void setLength(int length) {
        this.length = length;
    }


    public String getContent() {
        return content;
    }


    public void setContent(String content) {
        this.content = content;
        this.length = content != null ? content.getBytes().length : 0;
    }


    @Override
    public String toString() {
        return "Message{" +
                "magicNumber=" + Integer.toHexString(magicNumber) +
                ", version=" + version +
                ", command=" + command +
                ", length=" + length +
                ", content='" + content + '\'' +
                '}';
    }
}

自定义编码器和解码器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.nio.charset.StandardCharsets;

public class MessageDecoder extends LengthFieldBasedFrameDecoder {

    /**
     * 参数说明:
     * maxFrameLength: 最大帧长度,防止内存溢出
     * lengthFieldOffset: 长度字段的偏移量,魔数(4) + 版本(1) + 序列化类型(1) + 指令(1) = 7
     * lengthFieldLength: 长度字段本身占用的字节数,数据长度字段占4字节
     * lengthAdjustment: 长度调整值,数据长度字段之后还有数据内容,所以不需要调整,为0
     * initialBytesToStrip: 解码后跳过的字节数,我们保留完整头部用于解析,所以为0
     */
    public MessageDecoder() {
        super(1024 * 1024,  // 最大帧长度 1MB
                7,            // 长度字段偏移量:魔数4 + 版本1 + 序列化类型1 + 指令1 = 7
                4,            // 长度字段长度:数据长度字段占4字节
                0,            // 长度调整值
                0);           // 初始跳过字节数
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // 调用父类方法,如果数据不够或者数据不完整,会返回null
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }

        try {
            // 读取魔数
            int magicNumber = frame.readInt();
            if (magicNumber != 0xCAFEBABE) {
                throw new RuntimeException("魔数校验失败: " + Integer.toHexString(magicNumber));
            }

            // 读取版本号
            byte version = frame.readByte();

            // 读取序列化类型
            byte serializerType = frame.readByte();

            // 读取指令类型
            byte command = frame.readByte();

            // 读取数据长度
            int length = frame.readInt();

            // 读取数据内容
            String content = "";
            if (length > 0) {
                byte[] contentBytes = new byte[length];
                frame.readBytes(contentBytes);
                content = new String(contentBytes, StandardCharsets.UTF_8);
            }

            // 构建Message对象
            Message message = new Message();
            message.setMagicNumber(magicNumber);
            message.setVersion(version);
            message.setSerializerType(serializerType);
            message.setCommand(command);
            message.setLength(length);
            message.setContent(content);

            System.out.println("解码器 - 接收到消息: " + message);
            return message;

        } finally {
            // 释放frame
            frame.release();
        }
    }
}

编码器


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;


import java.nio.charset.StandardCharsets;


public class MessageEncoder extends MessageToByteEncoder<Message> {


    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 写入魔数 (4字节)
        out.writeInt(msg.getMagicNumber());


        // 2. 写入版本号 (1字节)
        out.writeByte(msg.getVersion());


        // 3. 写入序列化类型 (1字节)
        out.writeByte(msg.getSerializerType());


        // 4. 写入指令类型 (1字节)
        out.writeByte(msg.getCommand());


        // 5. 写入数据长度 (4字节)
        byte[] contentBytes = msg.getContent() != null ?
                msg.getContent().getBytes(StandardCharsets.UTF_8) : new byte[0];
        out.writeInt(contentBytes.length);


        // 6. 写入实际数据
        if (contentBytes.length > 0) {
            out.writeBytes(contentBytes);
        }


        System.out.println("编码器 - 发送消息: " + msg + ", 字节数: " + out.readableBytes());
    }
}

客户端

package com.yp.app5;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class LengthFieldBasedClient {

    private final String host;
    private final int port;

    public LengthFieldBasedClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new LoggingHandler(LogLevel.DEBUG))
                                    .addLast(new MessageDecoder())
                                    .addLast(new MessageEncoder())
                                    .addLast(new ClientBusinessHandler());
                        }
                    });

            System.out.println("客户端启动,连接服务器 " + host + ":" + port);
            ChannelFuture future = bootstrap.connect(host, port).sync();

            // 获取Channel,用于发送消息
            SocketChannel channel = (SocketChannel) future.channel();

            // 模拟粘包情况:连续发送多个消息
            simulateStickyPackets(channel);

            // 等待连接关闭
            channel.closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }

    /**
     * 模拟粘包:连续发送多个消息,不使用flush
     * 这些消息会在TCP缓冲区中粘在一起,但我们的解码器能正确分割
     */
    private void simulateStickyPackets(SocketChannel channel) {
        System.out.println("开始模拟粘包发送...");

        // 发送5条消息,连续写入但不立即flush
        for (int i = 1; i <= 5; i++) {
            Message message = new Message((byte) 1, "这是第 " + i + " 条消息,用于测试粘包");
            // 使用write,不立即flush,让消息在缓冲区粘在一起
            channel.write(message);
            System.out.println("写入第 " + i + " 条消息到缓冲区");
        }

        // 最后一次性flush,模拟粘包发送
        channel.flush();
        System.out.println("缓冲区数据已刷新,5条消息可能粘在一起发送");

        // 延时一下,再发送另一批消息
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 第二批消息,这次每条都flush(模拟非粘包情况)
        System.out.println("\n开始发送第二批消息(每条独立flush)...");
        for (int i = 6; i <= 8; i++) {
            Message message = new Message((byte) 1, "单独发送的消息 " + i);
            channel.writeAndFlush(message);
            System.out.println("发送第 " + i + " 条消息并立即flush");
            try {
                Thread.sleep(500); // 间隔500ms
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new LengthFieldBasedClient("127.0.0.1", 8080).start();
    }
}

package com.yp.app5;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ClientBusinessHandler extends SimpleChannelInboundHandler<Message> {

    private int responseCount = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        responseCount++;
        System.out.println("客户端收到第 " + responseCount + " 条响应: " + msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接服务端成功,开始发送消息...");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端:

package com.yp.app5;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class LengthFieldBasedServer {

    private final int port;

    public LengthFieldBasedServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    // 添加日志处理器,方便查看原始字节流
                                    .addLast(new LoggingHandler(LogLevel.DEBUG))
                                    // 添加解码器(处理粘包)
                                    .addLast(new MessageDecoder())
                                    // 添加编码器
                                    .addLast(new MessageEncoder())
                                    // 添加业务处理器
                                    .addLast(new ServerBusinessHandler());
                        }
                    });

            System.out.println("服务端启动中,监听端口: " + port);
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("服务端启动成功");

            // 等待服务端关闭
            future.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new LengthFieldBasedServer(8080).start();
    }
}
package com.yp.app5;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ServerBusinessHandler extends SimpleChannelInboundHandler<Message> {

    private int messageCount = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        messageCount++;
        System.out.println("=================================");
        System.out.println("服务端收到第 " + messageCount + " 条消息: " + msg);

        // 根据指令类型处理
        switch (msg.getCommand()) {
            case 1: // 请求
                System.out.println("处理请求: " + msg.getContent());
                // 创建响应消息
                Message response = new Message((byte) 2, "服务端已收到: " + msg.getContent());
                // 发送响应
                ctx.writeAndFlush(response);
                break;
            case 3: // 心跳
                System.out.println("收到心跳消息");
                break;
            default:
                System.out.println("未知指令: " + msg.getCommand());
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接建立: " + ctx.channel().remoteAddress());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接断开: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

在这里插入图片描述

自动重连

在分布式系统和网络编程中,网络连接是不可靠的,可能因为以下原因断开:

  • 网络波动:瞬时网络中断
  • 服务端重启:服务器维护或崩溃

本例使用IdleStateHandler机制实现自动重连。在Netty中,IdleStateHandler 是一个用于心跳检测的处理器,它可以监测连接的空闲状态并触发相应的事件。这个机制主要用于检测远程端点是否存活,如果不存活或不活跃,则对空闲的Socket连接进行处理,避免资源浪费。

public IdleStateHandler(
    int readerIdleTimeSeconds,
    int writerIdleTimeSeconds,
    int allIdleTimeSeconds) {



    this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
         TimeUnit.SECONDS);
}

  • readerIdleTimeSeconds:读超时。即当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件
  • writerIdleTimeSeconds: 写超时。即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
  • allIdleTimeSeconds: 读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件

Handler中一般需要重写userEventTriggereduserEventTriggeredChannelInboundHandlerAdapter 提供的一个回调方法,常与 IdleStateHandler 配合,用于处理读/写空闲事件,实现心跳检测与超时处理。

零拷贝技术

零拷贝(Zero-Copy) 是一种减少数据在内核态与用户态之间不必要拷贝次数的 I/O 优化技术,其核心目标是降低 CPU 占用、减少上下文切换、提升高并发场景下的吞吐性能。

内核态是操作系统内核运行的模式,具有对硬件资源的完全控制权限,可以直接访问CPU、内存、磁盘等资源,并执行特权指令。内核态主要用于内存管理、进程调度、设备驱动等核心功能。

用户态是普通应用程序运行的模式,权限受到限制,无法直接访问硬件资源。用户态程序需要通过系统调用向内核请求服务,例如文件操作、网络通信等。

Netty的零拷贝主要体现在以下五个方面:

技术/策略 零拷贝体现 核心作用
直接内存(Direct Buffers) 使用堆外内存进行Socket读写,避免JVM堆内存到直接内存的二次拷贝。 减少一次用户态内部的内存复制,提升I/O性能。
文件传输(FileRegion) 包装FileChannel.transferTo()方法,支持操作系统级别的零拷贝,数据直接在内核空间从磁盘传输到网络。 彻底避免了用户态和内核态之间的数据复制,极大地提高了大文件传输的效率。
组合缓冲区(CompositeByteBuf) 将多个ByteBuf组合成一个逻辑上的ByteBuf,内部通过对象引用来维护,无需物理拷贝。 消除了合并多个缓冲区时的内存复制操作。
缓冲区切片(Slice)与包装(Wrap) slice()操作将ByteBuf分解为多个共享同一内存区域的子缓冲区;wrappedBuffer()操作将byte[]/ByteBuffer等包装为ByteBuf,共享底层数组。 在拆包、组包或将外部数据转为Netty对象时,避免了数据的内存复制。
内存池(PooledByteBufAllocator) 复用预先分配的直接内存或堆内存,而不是每次使用都重新创建和销毁。 避免了频繁的内存分配和释放带来的性能开销和GC压力。

本文以从磁盘读取数据为例,了解下零拷贝

注意:图片来自网络,https://zhuanlan.zhihu.com/p/1895465665000896082

传统的IO技术实现传输过程:

img

四次拷贝:磁盘到内核空间的内核缓冲区,内核缓冲区到用户缓冲区,用户缓冲区到内核空间的socket缓冲区,内核的socket缓冲区到网卡

还发生了4次用户态与内核态的上下文切换,因为发生了两次系统调用,一次是read(),一次是write(),每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。

mmap方案:

img

用户缓冲区和内核缓冲区建立映射关系,相当于用户进程直接访问内核缓冲区,中间使用了3次拷贝,4次切换

sendfile+dma方案:

img

内核中的内核缓冲区和socket缓冲区,不进行数据拷贝,直接通过sg-dma将内核缓冲区数据拷贝到网卡,中间发生了2次切换

    |

| 文件传输(FileRegion) | 包装FileChannel.transferTo()方法,支持操作系统级别的零拷贝,数据直接在内核空间从磁盘传输到网络。 | 彻底避免了用户态和内核态之间的数据复制,极大地提高了大文件传输的效率。 |
| 组合缓冲区(CompositeByteBuf) | 将多个ByteBuf组合成一个逻辑上的ByteBuf,内部通过对象引用来维护,无需物理拷贝。 | 消除了合并多个缓冲区时的内存复制操作。 |
| 缓冲区切片(Slice)与包装(Wrap) | slice()操作将ByteBuf分解为多个共享同一内存区域的子缓冲区;wrappedBuffer()操作将byte[]/ByteBuffer等包装为ByteBuf,共享底层数组。 | 在拆包、组包或将外部数据转为Netty对象时,避免了数据的内存复制。 |
| 内存池(PooledByteBufAllocator) | 复用预先分配的直接内存或堆内存,而不是每次使用都重新创建和销毁。 | 避免了频繁的内存分配和释放带来的性能开销和GC压力。 |

本文以从磁盘读取数据为例,了解下零拷贝

注意:图片来自网络,https://zhuanlan.zhihu.com/p/1895465665000896082

传统的IO技术实现传输过程:

[外链图片转存中…(img-33XLYfXS-1777282395833)]

四次拷贝:磁盘到内核空间的内核缓冲区,内核缓冲区到用户缓冲区,用户缓冲区到内核空间的socket缓冲区,内核的socket缓冲区到网卡

还发生了4次用户态与内核态的上下文切换,因为发生了两次系统调用,一次是read(),一次是write(),每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。

mmap方案:

[外链图片转存中…(img-kJpYsPjO-1777282395833)]

用户缓冲区和内核缓冲区建立映射关系,相当于用户进程直接访问内核缓冲区,中间使用了3次拷贝,4次切换

sendfile+dma方案:

[外链图片转存中…(img-GiAUPWsj-1777282395833)]

内核中的内核缓冲区和socket缓冲区,不进行数据拷贝,直接通过sg-dma将内核缓冲区数据拷贝到网卡,中间发生了2次切换

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐