Netty 线程模型学习记:从懵逼到入门

这是一个小白的学习记录
边学边练,把踩过的坑都记下来


为啥要学线程模型?

上回学了核心组件,感觉自己对 Netty 有点理解了。但要想让 Netty 跑起来更快,必须理解它的线程模型。

后来才发现,Netty 的线程模型还真有点复杂,不学还真玩不转。

一、Reactor 模式是啥?

我理解的 Reactor 模式

Reactor 模式就是一种事件处理模式,用事件驱动的方式处理并发请求。简单来说,就是有一个分发器,收到事件后分发给相应的处理器。

三种 Reactor 模式

1. 单线程 Reactor 模式

结构:一个线程既要接受连接,又要处理业务逻辑

优点

  • 实现简单
  • 没有多线程竞争问题

缺点

  • 无法充分利用多核 CPU
  • 一个连接阻塞了,其他连接都受影响

适用场景:连接少、业务简单的场景

单线程Reactor

Reactor
Single Thread

Acceptor

Handler

Client1

Client2

2. 多线程 Reactor 模式

结构:一个线程接受连接,多个线程处理业务

优点

  • 能利用多核 CPU
  • 一个连接阻塞不影响其他连接

缺点

  • 实现复杂了点
  • 有线程竞争问题

适用场景:连接多、业务复杂的场景

多线程Reactor

Reactor
Acceptor Thread

Acceptor

Worker Thread Pool

Handler1

Handler2

Client1

Client2

3. 主从多线程 Reactor 模式

结构:一个主线程池接受连接,一个从线程池处理业务

优点

  • 充分利用多核 CPU
  • 接受连接和处理业务分离
  • 单个连接阻塞不影响其他连接

缺点

  • 实现更复杂了

适用场景:高并发、高性能的场景(Netty 用的就是这个)

主从多线程Reactor

Main Reactor Pool
Acceptor Threads

Acceptor1

Acceptor2

Sub Reactor Pool
Worker Threads

Handler1

Handler2

Handler...

Client1

Client2

Client...

二、Netty 的线程模型

Netty 用的是主从多线程 Reactor 模式

  • bossGroup:主线程池,负责接受客户端连接
  • workerGroup:从线程池,负责处理客户端的 I/O 操作

线程池配置

// 创建主线程池,用于接受连接
// 通常设置为 1 个线程,因为单个线程就能处理很多连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);

// 创建从线程池,用于处理业务
// 默认线程数是 CPU 核心数的 2 倍
EventLoopGroup workerGroup = new NioEventLoopGroup();

我踩过的线程数配置坑

刚开始我觉得线程数越多越好,就把 bossGroup 设成了 8(我电脑 8 核),结果 CPU 使用率飙升,程序反而变慢了。

后来才知道,bossGroup 只负责接受连接,根本不需要那么多线程。1 个线程就够了!

教训:bossGroup 线程数设为 1,workerGroup 用默认值就好。

三、线程数最佳实践

  1. bossGroup 线程数

    • 通常设为 1 个线程
    • 如果你是多核 CPU,想充分利用的话,可以设为 CPU 核心数
  2. workerGroup 线程数

    • 默认值:CPU 核心数的 2 倍
    • I/O 密集型应用:可以设为 CPU 核心数的 2-4 倍
    • 计算密集型应用:设为 CPU 核心数就够了

四、业务线程池的重要性

为啥需要业务线程池?

如果在 ChannelHandler 里直接处理耗时业务,会阻塞 I/O 线程,导致其他连接得不到处理。

怎么用业务线程池?

// 创建业务线程池
ExecutorService businessExecutor = Executors.newFixedThreadPool(50);

// 在 ChannelHandler 中使用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 提交到业务线程池处理
    businessExecutor.submit(() -> {
        try {
            // 处理耗时业务逻辑
            String result = handleBusinessLogic(msg);
            // 处理完成后写回客户端
            ctx.channel().writeAndFlush(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
}

我踩过的坑

有一次我在 ChannelHandler 里直接调用了一个数据库查询,结果查询慢的时候,整个 Netty 服务器都卡住了。后来用了业务线程池,问题就解决了。

教训:耗时操作一定要放到业务线程池里!

五、线程安全问题

Netty 的线程安全保证

Netty 保证了以下几点:

  1. Channel 操作线程安全:同一个 Channel 的所有操作都由同一个 EventLoop 线程处理,没有并发问题
  2. ChannelPipeline 线程安全:修改 Pipeline 的操作是线程安全的
  3. EventLoop 线程安全:EventLoop 本身是线程安全的

需要注意的线程安全问题

  1. 共享资源:多个 Channel 共享的资源需要考虑线程安全
  2. 业务逻辑:业务逻辑里的共享变量需要考虑线程安全
  3. 外部调用:从外部线程调用 Channel 的方法时需要考虑线程安全

线程安全的最佳实践

  1. 用 ChannelHandlerContext 的方法:比如 ctx.writeAndFlush(),这些方法是线程安全的
  2. 避免共享可变状态:尽量用局部变量,别用实例变量
  3. 用线程安全的集合:比如 ConcurrentHashMap
  4. 必要时用锁:对于必须共享的资源,用锁保护

六、实战:线程模型配置

示例 1:默认线程模型

public class DefaultThreadModelServer {
    public static void main(String[] args) throws Exception {
        // 默认线程模型
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new StringDecoder());
                     ch.pipeline().addLast(new StringEncoder());
                     ch.pipeline().addLast(new ServerHandler());
                 }
             });

            ChannelFuture f = b.bind(8080).sync();
            System.out.println("Server started on port 8080");
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

示例 2:自定义线程数

public class CustomThreadModelServer {
    public static void main(String[] args) throws Exception {
        // 自定义线程数
        int bossThreads = 1; // bossGroup 通常设为 1
        int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
        
        EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads);
        EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads);

        // 后续代码与示例 1 相同
        // ...
    }
}

示例 3:使用业务线程池

public class BusinessThreadPoolServer {
    private static final ExecutorService BUSINESS_EXECUTOR = Executors.newFixedThreadPool(50);

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new StringDecoder());
                     ch.pipeline().addLast(new StringEncoder());
                     ch.pipeline().addLast(new BusinessHandler());
                 }
             });

            ChannelFuture f = b.bind(8080).sync();
            System.out.println("Server started on port 8080");
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            BUSINESS_EXECUTOR.shutdown();
        }
    }

    static class BusinessHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            // 提交到业务线程池处理
            BUSINESS_EXECUTOR.submit(() -> {
                try {
                    // 模拟耗时操作
                    Thread.sleep(1000);
                    String response = "Processed: " + msg;
                    ctx.writeAndFlush(response);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

七、性能调优建议

  1. 合理配置线程数

    • 根据 CPU 核心数和业务特点配置
    • 不要线程数过多,否则上下文切换开销大
  2. 选择合适的 EventLoopGroup

    • Linux 系统用 EpollEventLoopGroup,性能更好
    • 其他系统用 NioEventLoopGroup
  3. 避免阻塞操作

    • 耗时操作放到业务线程池
    • 不要在 I/O 线程里做耗时的事
  4. 优化连接管理

    • 设置合理的连接超时时间
    • 用空闲检测机制及时关闭空闲连接
  5. 监控线程状态

    • 监控 CPU 使用率
    • 监控线程队列长度
    • 监控线程执行时间

八、我踩过的其他坑

  1. 线程数过多:一开始把线程数设得太多,导致性能下降
  2. 阻塞 I/O 线程:在 ChannelHandler 里直接做耗时操作,导致服务器卡住
  3. 忘记关闭线程池:程序结束时没调用 shutdownGracefully(),导致资源泄漏
  4. 线程安全问题:多个 Channel 共享变量,导致并发问题

验证步骤

1. 测试默认线程模型

java DefaultThreadModelServer

2. 测试业务线程池

java BusinessThreadPoolServer

3. 压力测试

用 JMeter 或 ab 工具测试并发性能。

预期结果:使用业务线程池的服务器在高并发下性能更好,不会因为耗时操作而卡住。

总结

其实 Netty 的线程模型也没那么复杂,就是主从多线程 Reactor 模式。关键是要理解 bossGroup 和 workerGroup 的分工,合理配置线程数,避免阻塞 I/O 线程。

我也是踩了几个坑才明白这些道理的。现在配置线程模型时,心里总算有底了。

肯定有理解不对的地方,欢迎大佬指正。

如果你也是新手,希望这篇笔记能帮到你。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐