SingleThreadEventLoop 与 EventLoop 关系详解

NettyServer 中调用 b.bind(port).sync() 后,会来到:

// SingleThreadEventLoop
@Override 
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);  // 这里的 this 是什么?
    return promise;
}

然后在 AbstractChannel 中:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 这里进行赋值
    AbstractChannel.this.eventLoop = eventLoop;
    
    // 后面调用
    eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
    });
}

问题:

  1. eventLoop 是 SingleThreadEventLoop 实例吗?
  2. SingleThreadEventLoop 实例是个什么东西?
  3. 为什么后面是 SingleThreadEventLoop 去调用 eventLoop.execute() 方法?

答案详解

1. eventLoop 是 SingleThreadEventLoop 实例吗?

不完全是!准确地说,eventLoop 是 NioEventLoop 实例。

让我们看继承关系:

NioEventLoop 
    ↓ extends
SingleThreadEventLoop 
    ↓ extends
SingleThreadEventExecutor 
    ↓ extends
AbstractScheduledEventExecutor
    ↓ extends
AbstractEventExecutor
    ↓ implements
EventLoop (接口)

关键点:

  • NioEventLoop 是最终的实现类
  • SingleThreadEventLoop 是它的父类
  • EventLoop 是接口

所以:

  • 传入的 this 是 NioEventLoop 实例(因为调用 register 方法的对象是 NioEventLoop)
  • 赋值给 eventLoop 变量的也是 NioEventLoop 实例
  • 但由于多态,它可以用 EventLoop 接口类型来引用

2. SingleThreadEventLoop/NioEventLoop 实例是什么东西?

从代码流程看创建过程
// 第一步:创建 NioEventLoopGroup(线程池)
EventLoopGroup parentGroup = new NioEventLoopGroup();

// 内部会创建多个 NioEventLoop 实例
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, ...) {
    children = new EventExecutor[nThreads];  // 线程数组
    
    for (int i = 0; i < nThreads; i++) {
        // 创建每个 NioEventLoop
        children[i] = newChild(executor, args);
    }
}

// NioEventLoopGroup 中的 newChild 方法
@Override
protected EventLoop newChild(Executor executor, Object... args) {
    return new NioEventLoop(this, executor, selectorProvider, ...);
}
NioEventLoop 的本质

NioEventLoop 是一个"单线程的线程池",它具有以下特点:

  1. 它是一个 Executor(线程池)

    • 可以提交任务执行
    • 内部有一个 taskQueue 用于存放任务
  2. 它只有一个线程

    • 内部有一个 Thread 实例(懒加载,第一次提交任务时创建)
    • 所有任务都在这一个线程中执行
  3. 它有一个 Selector

    • 用于处理 NIO 的 IO 事件
    • 可以注册多个 Channel
  4. 它的核心工作

    protected void run() {
        for (;;) {
            // 1. select 操作,监听 IO 事件
            select(wakenUp.getAndSet(false));
            
            // 2. 处理 IO 事件
            processSelectedKeys();
            
            // 3. 处理任务队列中的任务
            runAllTasks();
        }
    }
    
形象比喻
NioEventLoopGroup(餐厅)
    ├── NioEventLoop 1(服务员1 + 他的工作台)
    │       ├── Thread(服务员本人)
    │       ├── Selector(监控多个桌子的设备)
    │       ├── taskQueue(待办任务清单)
    │       └── 多个 Channel(多张桌子)
    │
    ├── NioEventLoop 2(服务员2 + 他的工作台)
    │       ├── Thread
    │       ├── Selector
    │       ├── taskQueue
    │       └── 多个 Channel
    │
    └── ...

3. 为什么是 SingleThreadEventLoop 去调用 execute() 方法?

代码调用链
// 1. 在 SingleThreadEventLoop 的 register 方法中
@Override 
public ChannelFuture register(final ChannelPromise promise) {
    // this 是 NioEventLoop 实例
    promise.channel().unsafe().register(this, promise);
    return promise;
}

// 2. 进入 AbstractChannel.AbstractUnsafe 的 register 方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // eventLoop 就是上面传入的 this(NioEventLoop 实例)
    AbstractChannel.this.eventLoop = eventLoop;
    
    // 判断当前线程是否是 EventLoop 的线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        // 不是,则提交任务到 EventLoop
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    }
}
为什么要调用 execute()?

这是 Netty 线程模型的核心设计!

原因 1:线程安全

// 当前执行 register 的线程:main 线程(或其他业务线程)
// Channel 绑定的 EventLoop 的线程:NioEventLoop 的线程

// 为了保证线程安全,所有对 Channel 的操作都必须在它绑定的 EventLoop 线程中执行

原因 2:避免并发问题

// 如果不这样做,可能出现:
// - main 线程在操作 Channel
// - EventLoop 线程也在操作 Channel
// - 导致并发问题

// 通过 execute() 提交任务,保证所有操作都在同一个线程中串行执行
execute() 方法做了什么?
// SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
    // 1. 判断当前线程是否是 EventLoop 的线程
    boolean inEventLoop = inEventLoop();
    
    // 2. 将任务添加到任务队列
    addTask(task);
    
    // 3. 如果不是 EventLoop 线程提交的任务
    if (!inEventLoop) {
        // 启动 EventLoop 的线程(如果还没启动)
        startThread();
    }
}

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

private void doStartThread() {
    // 使用 ThreadPerTaskExecutor 创建线程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 将新创建的线程设置为 NioEventLoop 的线程
            thread = Thread.currentThread();
            
            // 执行 NioEventLoop 的 run 方法(死循环)
            SingleThreadEventExecutor.this.run();
        }
    });
}

完整流程图

main 线程                          NioEventLoop 线程
   |                                      |
   |--b.bind(port)                        |
   |                                      |
   |--initAndRegister()                   |
   |                                      |
   |--group.register(channel)             |
   |                                      |
   |--next() 选择一个 NioEventLoop         |
   |                                      |
   |--eventLoop.register(promise)         |
   |    (此时 eventLoop 是 NioEventLoop)   |
   |                                      |
   |--unsafe.register(this, promise)      |
   |    (this 是 NioEventLoop 实例)        |
   |                                      |
   |--AbstractChannel.eventLoop = eventLoop|
   |    (将 NioEventLoop 绑定到 Channel)   |
   |                                      |
   |--eventLoop.execute(task)             |
   |    (提交 register0 任务)              |
   |                                      |
   |--startThread()                       |
   |    (启动 NioEventLoop 线程)           |
   |                                      |
   |                                      |--Thread 被创建
   |                                      |
   |                                      |--run() 死循环开始
   |                                      |
   |                                      |--从 taskQueue 取出任务
   |                                      |
   |                                      |--执行 register0(promise)
   |                                      |
   |                                      |--doRegister()
   |                                      |   (JDK 底层注册)
   |                                      |
   |                                      |--pipeline.fireChannelRegistered()
   |                                      |
   |<--promise.setSuccess()---------------|
   |    (通知 main 线程注册完成)            |

关键理解点

1. 类型关系

NioEventLoop eventLoop = ...;

// 以下都是合法的,因为继承关系
EventLoop el1 = eventLoop;              // 接口类型
SingleThreadEventLoop el2 = eventLoop;  // 父类类型
EventExecutor el3 = eventLoop;          // 更上层的接口

2. 为什么叫 SingleThreadEventLoop?

// 因为它是一个"单线程的事件循环"
public class SingleThreadEventLoop {
    private Thread thread;  // 只有一个线程
    
    protected void run() {
        for (;;) {  // 事件循环
            // 处理 IO 事件
            // 处理任务
        }
    }
}

3. execute() 的作用

// 作用1:线程安全
eventLoop.execute(() -> {
    // 这里的代码一定在 EventLoop 的线程中执行
    // 避免了多线程并发问题
});

// 作用2:懒加载线程
// 第一次调用 execute() 时,才会创建 Thread 实例
// 这样可以节省资源

// 作用3:任务队列
// 如果 EventLoop 线程正在处理 IO 事件
// 新提交的任务会进入队列等待
// 保证了任务的顺序执行

总结

  1. eventLoop 是 NioEventLoop 实例,不是 SingleThreadEventLoop 实例(SingleThreadEventLoop 是父类)

  2. NioEventLoop 是一个"单线程的线程池"

    • 有一个 Thread 实例
    • 有一个 Selector 实例
    • 有一个 taskQueue 任务队列
    • 可以处理 IO 事件和普通任务
  3. 调用 execute() 的原因

    • 保证线程安全(所有操作在同一线程)
    • 懒加载线程(第一次提交任务时创建)
    • 任务队列化(保证顺序执行)
  4. 这是 Netty 的核心设计

    • 一个 Channel 绑定一个 NioEventLoop
    • 该 Channel 的所有操作都在这个 NioEventLoop 的线程中执行
    • 避免了锁竞争,提高了性能
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐