Netty 架构设计 - 事件驱动模型详解(初学者版)

用最简单的方式理解 Netty 的核心设计思想

前言

很多初学者看 Netty 的文章时,会被各种专业术语搞晕:事件驱动、责任链、ChannelPipeline…这些到底是什么?

本文用生活化的例子,帮你彻底理解 Netty 的设计原理。看完后你会发现:Netty 的设计其实很简单,就是把复杂的网络编程变成了"搭积木"


一、传统网络编程的痛点

问题场景

假设你要开发一个聊天服务器,需要处理:

  1. 接收客户端连接
  2. 读取客户端消息
  3. 解析消息内容
  4. 处理业务逻辑
  5. 返回响应
  6. 记录日志
  7. 处理异常

传统写法:所有逻辑混在一起

// 传统 BIO 写法(伪代码)
while (true) {
    Socket socket = serverSocket.accept(); // 接收连接
    
    try {
        // 读取数据
        byte[] data = socket.read();
        
        // 解析数据
        String message = new String(data);
        
        // 处理业务
        String response = handleBusiness(message);
        
        // 返回响应
        socket.write(response.getBytes());
        
        // 记录日志
        log.info("处理完成");
        
    } catch (Exception e) {
        // 处理异常
        log.error("出错了", e);
    }
}

问题

  • 代码混乱,难以维护
  • 功能耦合,难以扩展
  • 一个线程处理一个连接,性能差

二、Netty 的解决方案:事件驱动 + 责任链

核心思想

Netty 把网络编程变成了**“事件响应""流水线处理”**:

  1. 事件驱动:不是你主动问"有数据吗?“,而是"有数据了会通知你”
  2. 责任链模式:把处理逻辑拆成多个小模块,像流水线一样依次处理

生活化类比

传统方式:你是个全能员工

客户来了 → 你接待
客户说话 → 你听
客户需求 → 你处理
客户付款 → 你收钱
客户离开 → 你送客

一个人干所有事,累死了!

Netty 方式:你是个流水线工厂

客户来了 → 前台接待(Handler 1)
客户说话 → 翻译员翻译(Handler 2)
客户需求 → 业务员处理(Handler 3)
客户付款 → 财务收钱(Handler 4)
客户离开 → 保安送客(Handler 5)

每个人只做自己的事,专业高效!


三、事件驱动模型详解

什么是事件?

在 Netty 中,事件就是网络操作的各个阶段

常见事件类型

入站事件(Inbound Events)- 数据进来

就像客户来你店里:

事件 生活场景 Netty 中的含义
channelRegistered 客户进门登记 Channel 注册到 EventLoop
channelActive 客户坐下准备点餐 连接建立成功
channelRead 客户开始说话 收到数据
channelReadComplete 客户说完了 数据读取完毕
channelInactive 客户起身要走 连接断开
exceptionCaught 客户闹事 发生异常
出站事件(Outbound Events)- 数据出去

就像你主动做事:

事件 生活场景 Netty 中的含义
bind 开门营业 绑定端口
connect 主动拜访客户 发起连接
write 给客户说话 写数据到缓冲区
flush 确保客户听到 把缓冲区数据发出去
disconnect 送客 断开连接
close 关门 关闭 Channel

事件驱动的工作流程

传统方式(主动轮询):
你:有客户吗?(查询)
门:没有
你:有客户吗?(查询)
门:没有
你:有客户吗?(查询)
门:有!
你:好的,我去接待

事件驱动方式(被动通知):
你:我在这等着,有客户叫我
--- 你可以做其他事 ---
门:有客户来了!(事件通知)
你:好的,我去接待

代码对比

传统方式

// 不停地问
while (true) {
    if (有数据()) {
        处理数据();
    }
    Thread.sleep(100); // 浪费 CPU
}

事件驱动方式

// 注册事件处理器,等通知
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 有数据时自动调用
        处理数据(msg);
    }
});

四、核心组件详解

1. Channel(通道)

是什么:Channel 就是一个连接,代表客户端和服务器之间的通信管道。

生活类比:电话线

  • 你和朋友打电话,电话线就是 Channel
  • 可以通过它发送消息(write)
  • 可以通过它接收消息(read)
  • 可以挂断(close)

常用方法

Channel channel = ...;

// 写数据(不会立即发送)
channel.write("Hello");

// 刷新(真正发送)
channel.flush();

// 写并刷新(常用)
channel.writeAndFlush("Hello");

// 关闭连接
channel.close();

// 获取远程地址
SocketAddress address = channel.remoteAddress();

代码示例

public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 连接建立时
        Channel channel = ctx.channel();
        System.out.println("新连接:" + channel.remoteAddress());
        
        // 发送欢迎消息
        channel.writeAndFlush("欢迎光临!");
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到消息时
        System.out.println("收到消息:" + msg);
        
        // 回复消息
        ctx.channel().writeAndFlush("收到:" + msg);
    }
}

2. ChannelHandler(事件处理器)

是什么:ChannelHandler 是处理事件的工作人员。

生活类比:餐厅的各个岗位

  • 前台(接待 Handler):负责接待客户
  • 服务员(点餐 Handler):负责记录订单
  • 厨师(烹饪 Handler):负责做菜
  • 收银员(结账 Handler):负责收钱

两大类型

ChannelInboundHandler(入站处理器)

处理进来的数据和事件:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    
    // 连接建立
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("连接建立了");
    }
    
    // 收到数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("收到数据:" + msg);
        
        // 传递给下一个处理器
        ctx.fireChannelRead(msg);
    }
    
    // 数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("数据读完了");
        ctx.flush(); // 刷新缓冲区
    }
    
    // 连接断开
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("连接断开了");
    }
    
    // 发生异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("出错了:" + cause.getMessage());
        ctx.close(); // 关闭连接
    }
}
ChannelOutboundHandler(出站处理器)

处理出去的数据和操作:

public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    
    // 写数据
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("准备发送:" + msg);
        
        // 可以修改数据
        String newMsg = "【已加密】" + msg;
        
        // 传递给下一个处理器
        ctx.write(newMsg, promise);
    }
    
    // 连接操作
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, 
                       SocketAddress localAddress, ChannelPromise promise) {
        System.out.println("准备连接:" + remoteAddress);
        ctx.connect(remoteAddress, localAddress, promise);
    }
    
    // 关闭操作
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("准备关闭连接");
        ctx.close(promise);
    }
}

3. ChannelPipeline(处理器链)

是什么:ChannelPipeline 是一条流水线,里面按顺序排列着多个 Handler。

生活类比:餐厅的服务流程

客户进门 → 前台接待 → 服务员点餐 → 厨师做菜 → 服务员上菜 → 收银结账 → 保安送客
         ↓          ↓          ↓         ↓         ↓         ↓
      Handler1   Handler2   Handler3  Handler4  Handler5  Handler6

工作原理

入站事件(数据进来):从头到尾
┌─────────────────────────────────────────┐
│  Pipeline                                │
│  ┌────┐  ┌────┐  ┌────┐  ┌────┐        │
│  │ H1 │→ │ H2 │→ │ H3 │→ │ H4 │        │
│  └────┘  └────┘  └────┘  └────┘        │
└─────────────────────────────────────────┘
   数据 →  解码  →  业务  →  响应

出站事件(数据出去):从尾到头
┌─────────────────────────────────────────┐
│  Pipeline                                │
│  ┌────┐  ┌────┐  ┌────┐  ┌────┐        │
│  │ H1 │← │ H2 │← │ H3 │← │ H4 │        │
│  └────┘  └────┘  └────┘  └────┘        │
└─────────────────────────────────────────┘
   发送 ←  编码  ←  业务  ←  数据

代码示例

// 配置 Pipeline
ChannelPipeline pipeline = channel.pipeline();

// 添加入站处理器(按顺序执行)
pipeline.addLast("decoder", new StringDecoder());      // 1. 解码
pipeline.addLast("logger", new LoggingHandler());      // 2. 日志
pipeline.addLast("business", new BusinessHandler());   // 3. 业务

// 添加出站处理器(逆序执行)
pipeline.addLast("encoder", new StringEncoder());      // 编码

完整示例

public class PipelineDemo {
    public static void main(String[] args) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                
                // 入站处理器链
                pipeline.addLast("1-接待", new Handler1());
                pipeline.addLast("2-翻译", new Handler2());
                pipeline.addLast("3-业务", new Handler3());
                
                // 出站处理器链
                pipeline.addLast("4-编码", new Handler4());
            }
        });
    }
    
    // Handler 1:接待
    static class Handler1 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("Handler1: 客户来了,接待中...");
            ctx.fireChannelRead(msg); // 传给下一个
        }
    }
    
    // Handler 2:翻译
    static class Handler2 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("Handler2: 翻译客户的话...");
            String translated = "【已翻译】" + msg;
            ctx.fireChannelRead(translated); // 传给下一个
        }
    }
    
    // Handler 3:业务处理
    static class Handler3 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("Handler3: 处理业务 - " + msg);
            
            // 处理完后,发送响应(触发出站)
            ctx.writeAndFlush("处理完成");
        }
    }
    
    // Handler 4:编码(出站)
    static class Handler4 extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            System.out.println("Handler4: 编码响应 - " + msg);
            ctx.write("【已编码】" + msg, promise);
        }
    }
}

执行流程

收到数据 "Hello"
  ↓
Handler1: 客户来了,接待中...
  ↓
Handler2: 翻译客户的话...(变成 "【已翻译】Hello")
  ↓
Handler3: 处理业务 - 【已翻译】Hello
  ↓ (调用 writeAndFlush,触发出站)
Handler4: 编码响应 - 处理完成(变成 "【已编码】处理完成")
  ↓
发送 "【已编码】处理完成"

4. ChannelHandlerContext(上下文)

是什么:ChannelHandlerContext 是 Handler 的工作台,提供了操作 Channel 和 Pipeline 的能力。

生活类比:服务员的工作台

  • 可以拿到客户信息(Channel)
  • 可以通知下一个同事(fireChannelRead)
  • 可以给客户发消息(write)
  • 可以查看整个流程(Pipeline)

常用方法

public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 1. 获取 Channel
        Channel channel = ctx.channel();
        
        // 2. 获取 Pipeline
        ChannelPipeline pipeline = ctx.pipeline();
        
        // 3. 传递给下一个入站处理器
        ctx.fireChannelRead(msg);
        
        // 4. 写数据(从当前位置开始,经过后续的出站处理器)
        ctx.write("Hello");
        
        // 5. 写数据(从 Pipeline 尾部开始,经过所有出站处理器)
        ctx.channel().write("Hello");
        
        // 6. 关闭连接
        ctx.close();
    }
}

ctx.write() vs channel.write() 的区别

Pipeline: [H1] → [H2] → [H3] → [H4]
                  ↑
                当前位置

ctx.write("Hello")
  → 从 H2 开始往前找出站处理器:H1
  
channel.write("Hello")  
  → 从 H4 开始往前找出站处理器:H4 → H3 → H2 → H1

示例

public class ContextDemo extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("收到消息:" + msg);
        
        // 方式1:通过 ctx 写(推荐,性能更好)
        ctx.writeAndFlush("响应1");
        
        // 方式2:通过 channel 写
        ctx.channel().writeAndFlush("响应2");
        
        // 传递给下一个处理器
        ctx.fireChannelRead(msg);
    }
}

5. Future 和 Promise

是什么:Future 是异步操作的结果占位符,Promise 是可写的 Future。

生活类比:取餐号

  • 你点餐后拿到取餐号(Future)
  • 号码上写着"88号"(将来的结果)
  • 你可以拿着号去做其他事
  • 做好了会叫号通知你

ChannelFuture 示例

// 发起连接(异步操作)
ChannelFuture future = bootstrap.connect("localhost", 8080);

// 方式1:添加监听器(推荐)
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) {
        if (f.isSuccess()) {
            System.out.println("连接成功");
            f.channel().writeAndFlush("Hello");
        } else {
            System.out.println("连接失败:" + f.cause());
        }
    }
});

// 方式2:阻塞等待(不推荐)
future.sync(); // 等待连接完成
if (future.isSuccess()) {
    System.out.println("连接成功");
}

链式操作

bootstrap.connect("localhost", 8080)
    .addListener(connectFuture -> {
        if (connectFuture.isSuccess()) {
            System.out.println("1. 连接成功");
            
            // 发送数据
            connectFuture.channel().writeAndFlush("Hello")
                .addListener(writeFuture -> {
                    if (writeFuture.isSuccess()) {
                        System.out.println("2. 发送成功");
                        
                        // 关闭连接
                        writeFuture.channel().close()
                            .addListener(closeFuture -> {
                                System.out.println("3. 关闭成功");
                            });
                    }
                });
        }
    });

五、责任链模式详解

什么是责任链模式?

定义:把请求沿着一条链传递,链上的每个节点都可以处理请求。

生活场景:请假审批流程

员工请假 → 组长审批 → 经理审批 → 总监审批 → 批准

每个人只负责自己的部分:

  • 组长:1天以内的假,我批
  • 经理:3天以内的假,我批
  • 总监:7天以内的假,我批
  • 更多天:需要老板批

Netty 中的责任链

Pipeline 就是责任链

pipeline.addLast("解码器", new StringDecoder());
pipeline.addLast("日志", new LoggingHandler());
pipeline.addLast("业务", new BusinessHandler());
pipeline.addLast("编码器", new StringEncoder());

数据流转

收到字节数据
  ↓
解码器:把字节转成字符串
  ↓
日志:记录日志
  ↓
业务:处理业务逻辑
  ↓
编码器:把字符串转成字节
  ↓
发送字节数据

责任链的优点

  1. 解耦:每个 Handler 只关心自己的事
  2. 灵活:可以随意增删 Handler
  3. 复用:Handler 可以在不同的 Pipeline 中复用

示例

// 场景1:简单服务器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new BusinessHandler());
pipeline.addLast(new StringEncoder());

// 场景2:加上日志
pipeline.addLast(new StringDecoder());
pipeline.addLast(new LoggingHandler());  // 新增
pipeline.addLast(new BusinessHandler());
pipeline.addLast(new StringEncoder());

// 场景3:加上权限验证
pipeline.addLast(new StringDecoder());
pipeline.addLast(new AuthHandler());     // 新增
pipeline.addLast(new LoggingHandler());
pipeline.addLast(new BusinessHandler());
pipeline.addLast(new StringEncoder());

实战示例:HTTP 服务器

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        // 1. HTTP 解码器
        pipeline.addLast("httpDecoder", new HttpRequestDecoder());
        
        // 2. HTTP 聚合器(把多个消息聚合成一个完整的 HTTP 请求)
        pipeline.addLast("httpAggregator", new HttpObjectAggregator(65536));
        
        // 3. HTTP 编码器
        pipeline.addLast("httpEncoder", new HttpResponseEncoder());
        
        // 4. 业务处理器
        pipeline.addLast("httpHandler", new HttpServerHandler());
    }
}

// 业务处理器
class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
        // 处理 HTTP 请求
        String uri = request.uri();
        System.out.println("收到请求:" + uri);
        
        // 构造响应
        String content = "Hello, Netty!";
        FullHttpResponse response = new DefaultFullHttpResponse(
            HttpVersion.HTTP_1_1,
            HttpResponseStatus.OK,
            Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)
        );
        
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.length());
        
        // 发送响应
        ctx.writeAndFlush(response);
    }
}

处理流程

客户端发送 HTTP 请求
  ↓
HttpRequestDecoder:解析 HTTP 协议
  ↓
HttpObjectAggregator:聚合成完整请求
  ↓
HttpServerHandler:处理业务逻辑
  ↓
HttpResponseEncoder:编码成 HTTP 响应
  ↓
发送给客户端

六、完整实战示例

场景:聊天服务器

需求:

  1. 客户端连接时,发送欢迎消息
  2. 收到客户端消息,广播给所有人
  3. 客户端断开时,通知其他人
  4. 记录所有操作日志

服务端代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.concurrent.ConcurrentHashMap;

public class ChatServer {
    
    // 保存所有连接的客户端
    private static final ConcurrentHashMap<String, Channel> clients = new ConcurrentHashMap<>();
    
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            
                            // 责任链:按顺序添加处理器
                            pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast("logger", new LogHandler());
                            pipeline.addLast("chat", new ChatHandler());
                        }
                    });
            
            System.out.println("聊天服务器启动在 7397 端口");
            ChannelFuture future = bootstrap.bind(7397).sync();
            future.channel().closeFuture().sync();
            
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    // Handler 1:日志处理器
    static class LogHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("[日志] 新连接:" + ctx.channel().remoteAddress());
            ctx.fireChannelActive(); // 传递给下一个处理器
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("[日志] 收到消息:" + msg);
            ctx.fireChannelRead(msg); // 传递给下一个处理器
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("[日志] 连接断开:" + ctx.channel().remoteAddress());
            ctx.fireChannelInactive(); // 传递给下一个处理器
        }
    }
    
    // Handler 2:聊天业务处理器
    static class ChatHandler extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            String clientId = channel.id().asShortText();
            
            // 保存客户端
            clients.put(clientId, channel);
            
            // 发送欢迎消息
            channel.writeAndFlush("欢迎来到聊天室!你的ID是:" + clientId + "\n");
            
            // 通知其他人
            broadcast(clientId + " 加入了聊天室\n", channel);
            
            System.out.println("当前在线人数:" + clients.size());
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            String message = (String) msg;
            String clientId = ctx.channel().id().asShortText();
            
            // 广播消息
            String broadcastMsg = clientId + " 说:" + message + "\n";
            broadcast(broadcastMsg, null);
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            String clientId = ctx.channel().id().asShortText();
            
            // 移除客户端
            clients.remove(clientId);
            
            // 通知其他人
            broadcast(clientId + " 离开了聊天室\n", null);
            
            System.out.println("当前在线人数:" + clients.size());
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.out.println("发生异常:" + cause.getMessage());
            ctx.close();
        }
        
        // 广播消息给所有人(除了排除的 Channel)
        private void broadcast(String message, Channel exclude) {
            clients.values().forEach(channel -> {
                if (channel != exclude) {
                    channel.writeAndFlush(message);
                }
            });
        }
    }
}

客户端代码

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.Scanner;

public class ChatClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            
                            pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast("client", new ClientHandler());
                        }
                    });
            
            System.out.println("连接服务器...");
            ChannelFuture future = bootstrap.connect("localhost", 7397);
            
            // 添加连接监听器
            future.addListener((ChannelFutureListener) f -> {
                if (f.isSuccess()) {
                    System.out.println("连接成功!");
                    System.out.println("请输入消息(输入 quit 退出):");
                    
                    // 启动输入线程
                    Channel channel = f.channel();
                    new Thread(() -> {
                        Scanner scanner = new Scanner(System.in);
                        while (scanner.hasNextLine()) {
                            String line = scanner.nextLine();
                            if ("quit".equalsIgnoreCase(line)) {
                                channel.close();
                                break;
                            }
                            channel.writeAndFlush(line + "\n");
                        }
                    }).start();
                } else {
                    System.out.println("连接失败:" + f.cause().getMessage());
                }
            });
            
            future.channel().closeFuture().sync();
            
        } finally {
            group.shutdownGracefully();
        }
    }
    
    // 客户端处理器
    static class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.print(msg); // 打印服务器消息
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.out.println("发生异常:" + cause.getMessage());
            ctx.close();
        }
    }
}

运行效果

服务端输出

聊天服务器启动在 7397 端口
[日志] 新连接:/127.0.0.1:54321
当前在线人数:1
[日志] 新连接:/127.0.0.1:54322
当前在线人数:2
[日志] 收到消息:大家好
[日志] 收到消息:你好
[日志] 连接断开:/127.0.0.1:54321
当前在线人数:1

客户端1输出

连接服务器...
连接成功!
请输入消息(输入 quit 退出):
欢迎来到聊天室!你的ID是:abc123
def456 加入了聊天室
大家好
def456 说:你好
quit

客户端2输出

连接服务器...
连接成功!
请输入消息(输入 quit 退出):
欢迎来到聊天室!你的ID是:def456
abc123 说:大家好
你好
abc123 离开了聊天室

代码解析

事件驱动体现

// 不需要轮询,事件发生时自动调用
@Override
public void channelActive(ChannelHandlerContext ctx) {
    // 连接建立时自动调用
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 收到数据时自动调用
}

责任链体现

pipeline.addLast("decoder", new StringDecoder());  // 1. 解码
pipeline.addLast("encoder", new StringEncoder());  // 2. 编码
pipeline.addLast("logger", new LogHandler());      // 3. 日志
pipeline.addLast("chat", new ChatHandler());       // 4. 业务

// 数据流转:
// 收到数据 → decoder → logger → chat
// 发送数据 → chat → encoder → 网络

异步非阻塞体现

// 连接操作立即返回,不阻塞
ChannelFuture future = bootstrap.connect("localhost", 7397);

// 添加监听器,连接完成后回调
future.addListener((ChannelFutureListener) f -> {
    if (f.isSuccess()) {
        System.out.println("连接成功!");
    }
});

七、核心设计思想总结

1. 事件驱动(Event-Driven)

传统方式:主动轮询

while (true) {
    if (有数据()) {
        处理();
    }
}

Netty 方式:被动通知

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 有数据时自动调用
}

优势

  • 不浪费 CPU(不需要轮询)
  • 响应及时(事件发生立即处理)
  • 代码清晰(每个事件对应一个方法)

2. 责任链模式(Chain of Responsibility)

核心思想:把复杂的处理逻辑拆分成多个小模块,像流水线一样依次处理。

类比

传统方式:一个人干所有活
你 → 接待、翻译、处理、编码、发送(累死)

Netty 方式:流水线分工
接待员 → 翻译员 → 业务员 → 编码员 → 发送员(高效)

优势

  • 解耦:每个 Handler 只关心自己的事
  • 灵活:可以随意增删 Handler
  • 复用:Handler 可以在不同场景复用

3. 异步非阻塞(Asynchronous Non-blocking)

阻塞方式

connect();  // 等待连接完成(阻塞)
write();    // 等待写入完成(阻塞)

Netty 方式

ChannelFuture future = connect();  // 立即返回
future.addListener(f -> {
    // 连接完成后回调
});

优势

  • 不阻塞线程
  • 提高并发能力
  • 充分利用系统资源

4. 组件化设计

Netty 把网络编程的各个部分都抽象成了组件:

组件 作用 类比
Channel 连接通道 电话线
EventLoop 事件循环 服务员
ChannelHandler 事件处理器 各个岗位的员工
ChannelPipeline 处理器链 流水线
ByteBuf 数据容器 购物车
Future 异步结果 取餐号

优势

  • 职责清晰
  • 易于理解
  • 方便扩展

八、学习路线建议

第一阶段:理解核心概念

  1. 事件驱动:理解"被动通知"的思想
  2. Channel:理解连接的抽象
  3. Handler:理解事件处理器
  4. Pipeline:理解责任链

第二阶段:动手实践

  1. 写一个简单的 Echo 服务器(回显服务器)
  2. 写一个聊天服务器
  3. 写一个 HTTP 服务器
  4. 写一个文件传输服务器

第三阶段:深入源码

  1. 研究 EventLoop 的实现
  2. 研究 Pipeline 的实现
  3. 研究 ByteBuf 的实现
  4. 研究编解码器的实现

第四阶段:性能优化

  1. 理解零拷贝
  2. 理解内存池
  3. 理解对象池
  4. 理解线程模型

九、常见问题解答

Q1:为什么要用事件驱动?

A:传统的轮询方式会浪费 CPU,而事件驱动是"有事才干活",效率更高。

// 轮询方式(浪费 CPU)
while (true) {
    if (有数据()) {  // 一直问
        处理();
    }
    Thread.sleep(100);
}

// 事件驱动(高效)
@Override
public void channelRead(...) {
    // 有数据时才调用
}

Q2:为什么要用责任链?

A:把复杂的逻辑拆分成多个小模块,每个模块只做一件事,代码更清晰,更容易维护。

// 不用责任链(混乱)
void handle(data) {
    解码(data);
    验证(data);
    处理(data);
    编码(data);
    发送(data);
}

// 用责任链(清晰)
pipeline.addLast(new 解码器());
pipeline.addLast(new 验证器());
pipeline.addLast(new 处理器());
pipeline.addLast(new 编码器());

Q3:Handler 什么时候用 Inbound,什么时候用 Outbound?

A

  • Inbound:处理进来的数据(读、连接建立、异常等)
  • Outbound:处理出去的操作(写、连接、关闭等)
// 处理收到的数据 → Inbound
class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(...) { }
}

// 处理发送的数据 → Outbound
class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(...) { }
}

Q4:ctx.write() 和 channel.write() 有什么区别?

A

  • ctx.write():从当前 Handler 开始往前找出站处理器
  • channel.write():从 Pipeline 尾部开始找出站处理器
Pipeline: [H1][H2][H3][H4]
                  ↑
                当前位置

ctx.write("Hello")      → 从 H2 往前:H1
channel.write("Hello")  → 从 H4 往前:H4H3H2H1

建议:一般用 ctx.write(),性能更好。

Q5:为什么要用 Future?

A:因为 Netty 是异步的,操作立即返回,但结果还没出来。Future 就是"结果的占位符",可以设置回调,结果出来后自动通知你。

// 同步方式(阻塞)
connect();  // 等待连接完成
write();    // 等待写入完成

// 异步方式(不阻塞)
ChannelFuture future = connect();  // 立即返回
future.addListener(f -> {
    // 连接完成后回调
    write();
});

Q6:一个 Channel 可以有多个 Handler 吗?

A:可以!而且这正是 Netty 的设计精髓。一个 Pipeline 可以有多个 Handler,形成责任链。

pipeline.addLast("解码", new StringDecoder());
pipeline.addLast("日志", new LoggingHandler());
pipeline.addLast("验证", new AuthHandler());
pipeline.addLast("业务", new BusinessHandler());
pipeline.addLast("编码", new StringEncoder());

Q7:Handler 可以复用吗?

A

  • 无状态 Handler:可以复用(加 @Sharable 注解)
  • 有状态 Handler:不能复用(每个 Channel 一个实例)
// 无状态,可以复用
@Sharable
class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(...) {
        // 不保存任何状态
    }
}

// 有状态,不能复用
class MyHandler extends ChannelInboundHandlerAdapter {
    private int count = 0;  // 有状态
    
    @Override
    public void channelRead(...) {
        count++;  // 会被多个 Channel 共享,出问题
    }
}

十、总结

Netty 的核心设计思想

  1. 事件驱动:不是你问"好了吗",而是"好了"会通知你
  2. 责任链:把复杂的处理拆成多个小模块,像流水线一样
  3. 异步非阻塞:操作立即返回,不等待结果
  4. 组件化:每个部分都是独立的组件,职责清晰

核心组件关系图

┌─────────────────────────────────────────────────┐
│                   Channel                        │
│  (连接通道,代表一个客户端连接)                    │
│                                                  │
│  ┌────────────────────────────────────────┐    │
│  │         ChannelPipeline                 │    │
│  │  (处理器链,责任链模式)                   │    │
│  │                                          │    │
│  │  ┌────────┐  ┌────────┐  ┌────────┐   │    │
│  │  │Handler1│→ │Handler2│→ │Handler3│   │    │
│  │  │(解码)  │  │(日志)  │  │(业务)  │   │    │
│  │  └────────┘  └────────┘  └────────┘   │    │
│  │                                          │    │
│  └────────────────────────────────────────┘    │
│                                                  │
│  ChannelFuture (异步操作结果)                    │
└─────────────────────────────────────────────────┘

记忆口诀

  • Channel:一个连接
  • Handler:一个员工
  • Pipeline:一条流水线
  • Event:一个通知
  • Future:一个取餐号
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐