Netty源码分析---SingleThreadEventLoop与EventLoop关系详解
·
SingleThreadEventLoop 与 EventLoop 关系详解
在 NettyServer 中调用 b.bind(port).sync() 后,会来到:
// SingleThreadEventLoop
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise); // 这里的 this 是什么?
return promise;
}
然后在 AbstractChannel 中:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 这里进行赋值
AbstractChannel.this.eventLoop = eventLoop;
// 后面调用
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
问题:
- eventLoop 是 SingleThreadEventLoop 实例吗?
- SingleThreadEventLoop 实例是个什么东西?
- 为什么后面是 SingleThreadEventLoop 去调用 eventLoop.execute() 方法?
答案详解
1. eventLoop 是 SingleThreadEventLoop 实例吗?
不完全是!准确地说,eventLoop 是 NioEventLoop 实例。
让我们看继承关系:
NioEventLoop
↓ extends
SingleThreadEventLoop
↓ extends
SingleThreadEventExecutor
↓ extends
AbstractScheduledEventExecutor
↓ extends
AbstractEventExecutor
↓ implements
EventLoop (接口)
关键点:
NioEventLoop是最终的实现类SingleThreadEventLoop是它的父类EventLoop是接口
所以:
- 传入的 this 是 NioEventLoop 实例(因为调用 register 方法的对象是 NioEventLoop)
- 赋值给 eventLoop 变量的也是 NioEventLoop 实例
- 但由于多态,它可以用 EventLoop 接口类型来引用
2. SingleThreadEventLoop/NioEventLoop 实例是什么东西?
从代码流程看创建过程
// 第一步:创建 NioEventLoopGroup(线程池)
EventLoopGroup parentGroup = new NioEventLoopGroup();
// 内部会创建多个 NioEventLoop 实例
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, ...) {
children = new EventExecutor[nThreads]; // 线程数组
for (int i = 0; i < nThreads; i++) {
// 创建每个 NioEventLoop
children[i] = newChild(executor, args);
}
}
// NioEventLoopGroup 中的 newChild 方法
@Override
protected EventLoop newChild(Executor executor, Object... args) {
return new NioEventLoop(this, executor, selectorProvider, ...);
}
NioEventLoop 的本质
NioEventLoop 是一个"单线程的线程池",它具有以下特点:
-
它是一个 Executor(线程池)
- 可以提交任务执行
- 内部有一个 taskQueue 用于存放任务
-
它只有一个线程
- 内部有一个 Thread 实例(懒加载,第一次提交任务时创建)
- 所有任务都在这一个线程中执行
-
它有一个 Selector
- 用于处理 NIO 的 IO 事件
- 可以注册多个 Channel
-
它的核心工作
protected void run() { for (;;) { // 1. select 操作,监听 IO 事件 select(wakenUp.getAndSet(false)); // 2. 处理 IO 事件 processSelectedKeys(); // 3. 处理任务队列中的任务 runAllTasks(); } }
形象比喻
NioEventLoopGroup(餐厅)
├── NioEventLoop 1(服务员1 + 他的工作台)
│ ├── Thread(服务员本人)
│ ├── Selector(监控多个桌子的设备)
│ ├── taskQueue(待办任务清单)
│ └── 多个 Channel(多张桌子)
│
├── NioEventLoop 2(服务员2 + 他的工作台)
│ ├── Thread
│ ├── Selector
│ ├── taskQueue
│ └── 多个 Channel
│
└── ...
3. 为什么是 SingleThreadEventLoop 去调用 execute() 方法?
代码调用链
// 1. 在 SingleThreadEventLoop 的 register 方法中
@Override
public ChannelFuture register(final ChannelPromise promise) {
// this 是 NioEventLoop 实例
promise.channel().unsafe().register(this, promise);
return promise;
}
// 2. 进入 AbstractChannel.AbstractUnsafe 的 register 方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// eventLoop 就是上面传入的 this(NioEventLoop 实例)
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前线程是否是 EventLoop 的线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 不是,则提交任务到 EventLoop
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
为什么要调用 execute()?
这是 Netty 线程模型的核心设计!
原因 1:线程安全
// 当前执行 register 的线程:main 线程(或其他业务线程)
// Channel 绑定的 EventLoop 的线程:NioEventLoop 的线程
// 为了保证线程安全,所有对 Channel 的操作都必须在它绑定的 EventLoop 线程中执行
原因 2:避免并发问题
// 如果不这样做,可能出现:
// - main 线程在操作 Channel
// - EventLoop 线程也在操作 Channel
// - 导致并发问题
// 通过 execute() 提交任务,保证所有操作都在同一个线程中串行执行
execute() 方法做了什么?
// SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
// 1. 判断当前线程是否是 EventLoop 的线程
boolean inEventLoop = inEventLoop();
// 2. 将任务添加到任务队列
addTask(task);
// 3. 如果不是 EventLoop 线程提交的任务
if (!inEventLoop) {
// 启动 EventLoop 的线程(如果还没启动)
startThread();
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
// 使用 ThreadPerTaskExecutor 创建线程
executor.execute(new Runnable() {
@Override
public void run() {
// 将新创建的线程设置为 NioEventLoop 的线程
thread = Thread.currentThread();
// 执行 NioEventLoop 的 run 方法(死循环)
SingleThreadEventExecutor.this.run();
}
});
}
完整流程图
main 线程 NioEventLoop 线程
| |
|--b.bind(port) |
| |
|--initAndRegister() |
| |
|--group.register(channel) |
| |
|--next() 选择一个 NioEventLoop |
| |
|--eventLoop.register(promise) |
| (此时 eventLoop 是 NioEventLoop) |
| |
|--unsafe.register(this, promise) |
| (this 是 NioEventLoop 实例) |
| |
|--AbstractChannel.eventLoop = eventLoop|
| (将 NioEventLoop 绑定到 Channel) |
| |
|--eventLoop.execute(task) |
| (提交 register0 任务) |
| |
|--startThread() |
| (启动 NioEventLoop 线程) |
| |
| |--Thread 被创建
| |
| |--run() 死循环开始
| |
| |--从 taskQueue 取出任务
| |
| |--执行 register0(promise)
| |
| |--doRegister()
| | (JDK 底层注册)
| |
| |--pipeline.fireChannelRegistered()
| |
|<--promise.setSuccess()---------------|
| (通知 main 线程注册完成) |
关键理解点
1. 类型关系
NioEventLoop eventLoop = ...;
// 以下都是合法的,因为继承关系
EventLoop el1 = eventLoop; // 接口类型
SingleThreadEventLoop el2 = eventLoop; // 父类类型
EventExecutor el3 = eventLoop; // 更上层的接口
2. 为什么叫 SingleThreadEventLoop?
// 因为它是一个"单线程的事件循环"
public class SingleThreadEventLoop {
private Thread thread; // 只有一个线程
protected void run() {
for (;;) { // 事件循环
// 处理 IO 事件
// 处理任务
}
}
}
3. execute() 的作用
// 作用1:线程安全
eventLoop.execute(() -> {
// 这里的代码一定在 EventLoop 的线程中执行
// 避免了多线程并发问题
});
// 作用2:懒加载线程
// 第一次调用 execute() 时,才会创建 Thread 实例
// 这样可以节省资源
// 作用3:任务队列
// 如果 EventLoop 线程正在处理 IO 事件
// 新提交的任务会进入队列等待
// 保证了任务的顺序执行
总结
-
eventLoop 是 NioEventLoop 实例,不是 SingleThreadEventLoop 实例(SingleThreadEventLoop 是父类)
-
NioEventLoop 是一个"单线程的线程池":
- 有一个 Thread 实例
- 有一个 Selector 实例
- 有一个 taskQueue 任务队列
- 可以处理 IO 事件和普通任务
-
调用 execute() 的原因:
- 保证线程安全(所有操作在同一线程)
- 懒加载线程(第一次提交任务时创建)
- 任务队列化(保证顺序执行)
-
这是 Netty 的核心设计:
- 一个 Channel 绑定一个 NioEventLoop
- 该 Channel 的所有操作都在这个 NioEventLoop 的线程中执行
- 避免了锁竞争,提高了性能
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)