SingleThreadEventExecutor.execute() 方法详解

本文详细介绍 SingleThreadEventExecutor 类中的 execute() 方法,这是 Netty 线程模型的核心实现,负责任务的提交和线程的启动。

一、方法概述

1.1 方法签名

// SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
    // 实现代码
}

1.2 方法作用

execute() 方法是 Executor 接口的实现,主要职责:

  1. 接收任务:接收用户提交的任务
  2. 添加到队列:将任务添加到 taskQueue
  3. 启动线程:如果线程还未启动,启动 EventLoop 线程
  4. 唤醒线程:如果线程阻塞在 select() 上,唤醒它

1.3 调用场景

// 场景1:注册 Channel
eventLoop.execute(() -> register0(promise));

// 场景2:用户提交任务
channel.eventLoop().execute(() -> {
    System.out.println("在 EventLoop 中执行");
});

// 场景3:写数据
channel.writeAndFlush(msg);  // 内部会调用 execute

// 场景4:定时任务
eventLoop.schedule(() -> {
    System.out.println("定时任务");
}, 5, TimeUnit.SECONDS);

二、完整源码分析

2.1 execute() 方法完整代码

@Override
public void execute(Runnable task) {
    // 1. 参数校验
    if (task == null) {
        throw new NullPointerException("task");
    }

    // 2. 判断当前线程是否是 EventLoop 线程
    boolean inEventLoop = inEventLoop();
    
    // 3. 将任务添加到任务队列
    addTask(task);
    
    // 4. 如果不是 EventLoop 线程,需要启动线程
    if (!inEventLoop) {
        startThread();
        
        // 5. 如果 EventLoop 已经关闭,尝试移除任务并拒绝
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // 任务队列不支持移除操作,只能继续
            }
            if (reject) {
                reject();
            }
        }
    }

    // 6. 如果需要,唤醒阻塞的线程
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

2.2 执行流程图

execute(task) 开始
    ↓
参数校验(task != null)
    ↓
inEventLoop() - 判断当前线程是否是 EventLoop 线程
    ↓
addTask(task) - 添加任务到队列
    ↓
判断:是否在 EventLoop 线程中?
    ├─ 是 → 跳过启动线程
    │
    └─ 否 → startThread() - 启动线程(如果未启动)
            ↓
        判断:EventLoop 是否已关闭?
            ├─ 是 → 移除任务并拒绝
            └─ 否 → 继续
    ↓
判断:是否需要唤醒线程?
    ├─ 是 → wakeup() - 唤醒阻塞的线程
    └─ 否 → 结束

三、逐步详解

步骤 1:参数校验

if (task == null) {
    throw new NullPointerException("task");
}

作用:

  • 防止空指针异常
  • 快速失败(Fail-Fast)原则

步骤 2:判断当前线程是否是 EventLoop 线程

boolean inEventLoop = inEventLoop();
inEventLoop() 方法的实现
// SingleThreadEventExecutor
@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

关键点:

  • this.thread 是 EventLoop 的工作线程
  • 比较当前线程和 EventLoop 线程是否是同一个
  • 这个判断非常重要,决定了后续的执行路径
为什么需要这个判断?

场景 1:在 EventLoop 线程中调用

// 在 EventLoop 线程中
channel.eventLoop().execute(() -> {
    // 此时 inEventLoop() = true
    // 不需要启动线程,直接添加到队列即可
});

场景 2:在其他线程中调用

// 在 main 线程中
public static void main(String[] args) {
    EventLoop eventLoop = new NioEventLoop(...);
    
    // 此时 inEventLoop() = false
    // 需要启动 EventLoop 线程
    eventLoop.execute(() -> {
        System.out.println("任务执行");
    });
}

两种场景的区别:

场景1(inEventLoop = true):
    当前线程 = EventLoop 线程
    ↓
    直接添加任务到队列
    ↓
    线程会在事件循环中取出任务执行

场景2(inEventLoop = false):
    当前线程 ≠ EventLoop 线程
    ↓
    添加任务到队列
    ↓
    需要启动 EventLoop 线程(如果未启动)
    ↓
    EventLoop 线程会从队列中取出任务执行

步骤 3:将任务添加到任务队列

addTask(task);
addTask() 方法的实现
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    
    // 尝试添加到队列
    if (!offerTask(task)) {
        // 如果队列满了,执行拒绝策略
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}
taskQueue 是什么?
// SingleThreadEventExecutor
private final Queue<Runnable> taskQueue;

protected SingleThreadEventExecutor(...) {
    // 创建任务队列,默认容量 16
    taskQueue = newTaskQueue(this.maxPendingTasks);
}

protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // 使用 MPSC 队列(Multiple Producer Single Consumer)
    return new MpscUnboundedArrayQueue<Runnable>(maxPendingTasks);
}

taskQueue 的特点:

  1. MPSC 队列

    • Multiple Producer:多个线程可以同时提交任务
    • Single Consumer:只有 EventLoop 线程消费任务
    • 针对这种场景优化,性能更好
  2. 无界队列

    • MpscUnboundedArrayQueue 是无界的
    • 初始容量是 16,但可以动态扩容
    • 不会因为队列满而阻塞
  3. 线程安全

    • 使用 CAS 操作保证线程安全
    • 避免使用锁,性能更好
任务队列的作用
多个线程提交任务
    ↓
    ├─ 线程1: execute(task1)
    ├─ 线程2: execute(task2)
    └─ 线程3: execute(task3)
    ↓
所有任务进入 taskQueue
    ↓
    [task1, task2, task3, ...]
    ↓
EventLoop 线程从队列中取出任务执行
    ↓
    task1 -> task2 -> task3 -> ...

关键点:

  • 保证任务的顺序性
  • 实现多线程到单线程的转换
  • 避免并发问题

步骤 4:启动线程(如果需要)

if (!inEventLoop) {
    startThread();
    // ... 后续处理
}

这是 execute() 方法中最核心的部分,负责启动 EventLoop 线程。

startThread() 方法详解
private void startThread() {
    // 检查状态是否是 ST_NOT_STARTED
    if (state == ST_NOT_STARTED) {
        // 使用 CAS 操作,保证只启动一次
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                // 如果启动失败,恢复状态
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

关键点:

  1. 状态检查
if (state == ST_NOT_STARTED)
  • 只有在未启动状态才会尝试启动
  • 避免重复启动
  1. CAS 操作
STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)
  • 原子操作,保证线程安全
  • 即使多个线程同时调用 execute(),也只有一个线程能成功启动
  • 其他线程会失败,但不会报错,只是不执行启动逻辑
  1. 状态定义
// SingleThreadEventExecutor
private static final int ST_NOT_STARTED = 1;  // 未启动
private static final int ST_STARTED = 2;      // 已启动
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭
private static final int ST_SHUTDOWN = 4;      // 已关闭
private static final int ST_TERMINATED = 5;    // 已终止
为什么使用 CAS?

场景:多个线程同时提交第一个任务

时刻 T1:
    线程1: execute(task1) -> state = ST_NOT_STARTED -> 尝试 CAS
    线程2: execute(task2) -> state = ST_NOT_STARTED -> 尝试 CAS
    线程3: execute(task3) -> state = ST_NOT_STARTED -> 尝试 CAS

时刻 T2:
    线程1: CAS 成功 -> state = ST_STARTED -> 执行 doStartThread()
    线程2: CAS 失败 -> 不执行 doStartThread()
    线程3: CAS 失败 -> 不执行 doStartThread()

结果:
    只有线程1 启动了 EventLoop 线程
    线程2 和线程3 只是添加了任务到队列

如果不使用 CAS 会怎样?

// 错误的实现
if (state == ST_NOT_STARTED) {
    state = ST_STARTED;  // 不是原子操作
    doStartThread();
}
  • 可能多个线程都通过了 if 检查
  • 导致多次调用 doStartThread()
  • 创建多个线程,破坏单线程模型

步骤 5:doStartThread() - 真正启动线程

private void doStartThread() {
    assert thread == null;
    
    // 使用 executor 创建并启动线程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 1. 保存线程引用
            thread = Thread.currentThread();
            
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            
            try {
                // 2. 执行事件循环(这是核心!)
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 3. 清理工作(后面详细介绍)
                // ...
            }
        }
    });
}
关键点 1:executor 是什么?
// 在 NioEventLoopGroup 创建时设置
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

// ThreadPerTaskExecutor 的实现
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    @Override
    public void execute(Runnable command) {
        // 每次都创建新线程
        threadFactory.newThread(command).start();
    }
}

重要理解:

  • executor 不是线程池,而是线程创建器
  • 每次调用 execute() 都会创建一个新线程
  • 但是 doStartThread() 只会被调用一次
  • 所以每个 EventLoop 只会创建一个线程
关键点 2:保存线程引用
thread = Thread.currentThread();

作用:

  • 保存当前线程的引用
  • 后续 inEventLoop() 方法会用到这个引用
  • 用于判断当前线程是否是 EventLoop 线程

时序:

doStartThread() 调用
    ↓
executor.execute(runnable)
    ↓
创建新线程并启动
    ↓
新线程执行 runnable.run()
    ↓
thread = Thread.currentThread()  // 保存新线程的引用
    ↓
后续所有操作都在这个线程中执行
关键点 3:执行事件循环
SingleThreadEventExecutor.this.run();

这是最核心的一行代码!

  • run() 方法是抽象方法,由子类实现
  • 对于 NioEventLoop,这个方法是一个无限循环
  • 循环中会处理 IO 事件和任务队列中的任务

NioEventLoop.run() 的简化版本:

@Override
protected void run() {
    for (;;) {  // 无限循环
        try {
            // 1. select IO 事件
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    break;
            }

            // 2. 处理 IO 事件
            processSelectedKeys();

            // 3. 处理任务队列中的任务
            runAllTasks();

        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

事件循环的工作流程:

线程启动
    ↓
进入 run() 方法
    ↓
开始无限循环
    ↓
    ┌─────────────────┐
    │                 │
    │  1. select()    │ ← 阻塞等待 IO 事件
    │     ↓           │
    │  2. process IO  │ ← 处理 IO 事件
    │     ↓           │
    │  3. run tasks   │ ← 处理任务队列
    │     ↓           │
    └─────┬───────────┘
          │
          └─ 循环继续

步骤 6:finally 块 - 清理工作

finally {
    // 1. 设置状态为 SHUTTING_DOWN
    for (;;) {
        int oldState = state;
        if (oldState >= ST_SHUTTING_DOWN || 
            STATE_UPDATER.compareAndSet(this, oldState, ST_SHUTTING_DOWN)) {
            break;
        }
    }

    // 2. 检查是否正常关闭
    if (success && gracefulShutdownStartTime == 0) {
        logger.error("Buggy EventExecutor implementation; " +
            "confirmShutdown() must be called before run() terminates.");
    }

    // 3. 执行剩余任务和关闭钩子
    try {
        for (;;) {
            if (confirmShutdown()) {
                break;
            }
        }
    } finally {
        try {
            // 4. 清理资源
            cleanup();
        } finally {
            // 5. 移除 FastThreadLocal
            FastThreadLocal.removeAll();

            // 6. 设置状态为 TERMINATED
            STATE_UPDATER.set(this, ST_TERMINATED);
            threadLock.release();
            
            // 7. 检查是否有未执行的任务
            if (!taskQueue.isEmpty()) {
                logger.warn("An event executor terminated with " +
                    "non-empty task queue (" + taskQueue.size() + ')');
            }

            // 8. 设置 terminationFuture 为成功
            terminationFuture.setSuccess(null);
        }
    }
}
清理步骤详解

1. 设置状态为 SHUTTING_DOWN

for (;;) {
    int oldState = state;
    if (oldState >= ST_SHUTTING_DOWN || 
        STATE_UPDATER.compareAndSet(this, oldState, ST_SHUTTING_DOWN)) {
        break;
    }
}
  • 使用 CAS 循环,确保状态更新成功
  • 如果已经是 SHUTTING_DOWN 或更高状态,直接退出

2. 检查是否正常关闭

if (success && gracefulShutdownStartTime == 0) {
    logger.error("Buggy EventExecutor implementation...");
}
  • success = true 表示 run() 方法正常返回
  • gracefulShutdownStartTime == 0 表示没有调用优雅关闭
  • 这是一个 bug,应该先调用 shutdownGracefully() 再退出循环

3. 执行剩余任务

for (;;) {
    if (confirmShutdown()) {
        break;
    }
}
  • confirmShutdown() 会执行队列中剩余的任务
  • 确保所有任务都被执行完
  • 这是优雅关闭的关键

4. 清理资源

cleanup();
  • 由子类实现,清理特定资源
  • 对于 NioEventLoop,会关闭 Selector

5. 移除 FastThreadLocal

FastThreadLocal.removeAll();
  • 清理线程本地变量
  • 避免内存泄漏

6. 设置最终状态

STATE_UPDATER.set(this, ST_TERMINATED);
terminationFuture.setSuccess(null);
  • 设置状态为 TERMINATED
  • 通知等待终止的线程

步骤 7:关闭状态检查

回到 execute() 方法:

if (!inEventLoop) {
    startThread();
    
    // 如果 EventLoop 已经关闭,尝试移除任务并拒绝
    if (isShutdown()) {
        boolean reject = false;
        try {
            if (removeTask(task)) {
                reject = true;
            }
        } catch (UnsupportedOperationException e) {
            // 任务队列不支持移除操作
        }
        if (reject) {
            reject();
        }
    }
}
为什么需要这个检查?

场景:EventLoop 正在关闭

时刻 T1:
    线程1: 调用 shutdownGracefully()
    ↓
    state = ST_SHUTTING_DOWN

时刻 T2:
    线程2: 调用 execute(task)
    ↓
    addTask(task) 成功(任务已加入队列)
    ↓
    startThread() 不执行(已启动)
    ↓
    isShutdown() = true
    ↓
    尝试移除任务
    ↓
    如果移除成功,执行拒绝策略

目的:

  • 防止在关闭过程中接收新任务
  • 如果任务已经加入队列,尝试移除
  • 如果移除成功,执行拒绝策略
isShutdown() 的实现
@Override
public boolean isShutdown() {
    return state >= ST_SHUTDOWN;
}
reject() 的实现
protected final void reject(Runnable task) {
    rejectedExecutionHandler.rejected(task, this);
}

// 默认的拒绝策略
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
        throw new RejectedExecutionException();
    }
};

步骤 8:唤醒阻塞的线程

if (!addTaskWakesUp && wakesUpForTask(task)) {
    wakeup(inEventLoop);
}
为什么需要唤醒?

场景:EventLoop 线程阻塞在 select() 上

EventLoop 线程:
    ↓
    select()  ← 阻塞等待 IO 事件
    ↓
    (可能阻塞很长时间)

其他线程:
    ↓
    execute(task)  ← 提交任务
    ↓
    任务已加入队列,但 EventLoop 线程还在阻塞
    ↓
    需要唤醒 EventLoop 线程
    ↓
    wakeup()
    ↓
EventLoop 线程:
    ↓
    select() 返回
    ↓
    处理任务队列中的任务
addTaskWakesUp 是什么?
// SingleThreadEventExecutor
private final boolean addTaskWakesUp;

protected SingleThreadEventExecutor(..., boolean addTaskWakesUp, ...) {
    this.addTaskWakesUp = addTaskWakesUp;
}

// NioEventLoop 的构造
NioEventLoop(...) {
    super(parent, executor, false, ...);  // addTaskWakesUp = false
}

含义:

  • addTaskWakesUp = true:添加任务会自动唤醒线程
  • addTaskWakesUp = false:添加任务不会自动唤醒,需要手动唤醒

对于 NioEventLoop:

  • addTaskWakesUp = false
  • 需要手动调用 wakeup() 唤醒
wakesUpForTask(task) 的实现
protected boolean wakesUpForTask(Runnable task) {
    return true;  // 默认所有任务都需要唤醒
}
wakeup(inEventLoop) 的实现
// NioEventLoop
@Override
protected void wakeup(boolean inEventLoop) {
    // 如果不是在 EventLoop 线程中,且需要唤醒
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        selector.wakeup();  // 唤醒阻塞在 select() 上的线程
    }
}

关键点:

  1. 只在非 EventLoop 线程中唤醒
if (!inEventLoop)
  • 如果当前就是 EventLoop 线程,不需要唤醒
  • 因为线程正在运行,不可能阻塞
  1. 使用 CAS 避免重复唤醒
wakenUp.compareAndSet(false, true)
  • 避免多个线程重复调用 selector.wakeup()
  • selector.wakeup() 是有开销的
  1. 调用 Selector.wakeup()
selector.wakeup();
  • 这是 JDK NIO 的方法
  • 会唤醒阻塞在 select() 上的线程

四、完整的执行流程

4.1 第一次调用 execute() 的流程

【主线程】
    ↓
execute(task1)
    ↓
inEventLoop() = false(当前是主线程)
    ↓
addTask(task1)  ← 任务加入队列
    ↓
startThread()
    ↓
state = ST_NOT_STARTED
    ↓
CAS(ST_NOT_STARTED -> ST_STARTED) 成功
    ↓
doStartThread()
    ↓
executor.execute(runnable)
    ↓
【创建新线程:EventLoop-1】
    ↓
thread = Thread.currentThread()  ← 保存线程引用
    ↓
进入事件循环:run()
    ↓
    ┌─────────────────┐
    │  for (;;) {     │
    │    select()     │ ← 等待 IO 事件
    │    processIO()  │ ← 处理 IO
    │    runTasks()   │ ← 执行 task1
    │  }              │
    └─────────────────┘

4.2 后续调用 execute() 的流程

【其他线程】
    ↓
execute(task2)
    ↓
inEventLoop() = false(不是 EventLoop 线程)
    ↓
addTask(task2)  ← 任务加入队列
    ↓
startThread()
    ↓
state = ST_STARTED(已启动)
    ↓
不执行启动逻辑
    ↓
wakeup()  ← 唤醒 EventLoop 线程
    ↓
【EventLoop 线程】
    ↓
select() 返回(被唤醒)
    ↓
runTasks()  ← 执行 task2

4.3 在 EventLoop 线程中调用 execute()

【EventLoop 线程】
    ↓
execute(task3)
    ↓
inEventLoop() = true(当前就是 EventLoop 线程)
    ↓
addTask(task3)  ← 任务加入队列
    ↓
不执行 startThread()(已在 EventLoop 中)
    ↓
不执行 wakeup()(线程正在运行)
    ↓
继续事件循环
    ↓
runTasks()  ← 执行 task3

五、关键时序图

5.1 多线程提交任务的时序

主线程          线程2           EventLoop线程      taskQueue
  |              |                  |                |
  | execute(t1)  |                  |                |
  |------------->|                  |                |
  |              |                  |                |
  | addTask(t1)  |                  |                |
  |------------------------------------>| [t1]       |
  |              |                  |                |
  | startThread()|                  |                |
  |------------->|                  |                |
  |              | 创建线程         |                |
  |              |----------------->|                |
  |              |                  | run() 开始     |
  |              |                  |                |
  |              | execute(t2)      |                |
  |              |----------------->|                |
  |              |                  |                |
  |              | addTask(t2)      |                |
  |              |-------------------->| [t1, t2]    |
  |              |                  |                |
  |              | wakeup()         |                |
  |              |----------------->|                |
  |              |                  | select() 返回  |
  |              |                  |                |
  |              |                  | runTasks()     |
  |              |                  |<---------------|
  |              |                  | 执行 t1        |
  |              |                  | 执行 t2        |
  |              |                  |                |

5.2 线程启动的详细时序

调用线程                    executor                EventLoop线程
    |                          |                          |
    | execute(task)            |                          |
    |                          |                          |
    | startThread()            |                          |
    |                          |                          |
    | doStartThread()          |                          |
    |                          |                          |
    | executor.execute(r)      |                          |
    |------------------------->|                          |
    |                          |                          |
    |                          | newThread(r).start()     |
    |                          |------------------------->|
    |                          |                          |
    | 返回                     |                          | thread = currentThread
    |<-------------------------|                          |
    |                          |                          |
    |                          |                          | run() 开始
    |                          |                          |
    |                          |                          | for (;;) {
    |                          |                          |   select()
    |                          |                          |   processIO()
    |                          |                          |   runTasks()
    |                          |                          | }
    |                          |                          |

六、重要概念辨析

6.1 inEventLoop 的重要性

为什么需要判断 inEventLoop?

原因 1:避免死锁

// 如果不判断,可能导致死锁
eventLoop.execute(() -> {
    // 在 EventLoop 线程中
    eventLoop.execute(() -> {
        // 如果这里也调用 startThread()
        // 会尝试启动新线程,但线程已经在运行
        // 可能导致问题
    });
});

原因 2:性能优化

// 在 EventLoop 线程中
if (inEventLoop()) {
    // 直接添加任务,不需要唤醒
    addTask(task);
} else {
    // 需要唤醒线程
    addTask(task);
    wakeup();
}

原因 3:保证顺序性

// 在 EventLoop 线程中
channel.write(msg1);  // 立即加入队列
channel.write(msg2);  // 立即加入队列
// 保证 msg1 在 msg2 之前

// 在其他线程中
channel.write(msg1);  // 加入队列,可能需要等待
channel.write(msg2);  // 加入队列,可能需要等待
// 顺序由队列保证

6.2 线程启动的时机

延迟启动的优势

优势 1:节省资源

// 创建 EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup(8);
// 此时 8 个线程都还没有创建

// 只有在使用时才创建
group.register(channel1);  // 创建线程1
group.register(channel2);  // 创建线程2
// 如果只用了 2 个,就只创建 2 个线程

优势 2:避免空转

// 如果在构造时就启动所有线程
// 没有任务的线程会一直空转,浪费 CPU
for (;;) {
    select();  // 没有 Channel 注册,一直返回 0
    runTasks();  // 没有任务,什么都不做
}

优势 3:灵活性

// 可以在任何时候开始使用
EventLoopGroup group = new NioEventLoopGroup();
// ... 做一些其他初始化工作 ...
// 需要时才开始使用
group.execute(task);  // 此时才启动线程

6.3 taskQueue 的设计

为什么使用 MPSC 队列?

场景分析:

生产者(多个):
    - 主线程
    - 业务线程池
    - 其他 EventLoop 线程
    ↓
    都可能调用 execute() 提交任务
    ↓
消费者(单个):
    - EventLoop 线程
    ↓
    只有这一个线程消费任务

MPSC 队列的优势:

  1. 针对性优化:专门为多生产者单消费者场景设计
  2. 无锁化:使用 CAS 操作,避免锁竞争
  3. 高性能:比通用的并发队列性能更好

对比其他队列:

LinkedBlockingQueue:
    - 使用锁保证线程安全
    - 生产者和消费者都需要竞争锁
    - 性能较差

ConcurrentLinkedQueue:
    - 使用 CAS 操作
    - 但没有针对 MPSC 场景优化
    - 性能中等

MpscUnboundedArrayQueue:
    - 专门为 MPSC 场景设计
    - 使用 CAS + 数组
    - 性能最好

七、常见问题

Q1: 为什么 execute() 方法不会阻塞?

答案:

public void execute(Runnable task) {
    addTask(task);  // 只是添加到队列,不会阻塞
    startThread();  // 只是启动线程,不会等待
    wakeup();       // 只是唤醒,不会等待
    // 立即返回
}
  • execute() 只是将任务加入队列,不会等待任务执行
  • 任务的执行是异步的,由 EventLoop 线程负责
  • 调用线程可以立即返回,继续做其他事情

Q2: 如果 taskQueue 满了会怎样?

答案:

protected void addTask(Runnable task) {
    if (!offerTask(task)) {
        reject(task);  // 执行拒绝策略
    }
}
  • 默认使用 MpscUnboundedArrayQueue,是无界队列
  • 理论上不会满,会动态扩容
  • 如果使用有界队列,满了会执行拒绝策略
  • 默认拒绝策略是抛出 RejectedExecutionException

Q3: 多个线程同时调用 execute() 会怎样?

答案:

线程1: execute(task1)
线程2: execute(task2)
线程3: execute(task3)
    ↓
所有任务都会加入 taskQueue
    ↓
    [task1, task2, task3]
    ↓
只有一个线程能成功启动 EventLoop 线程(通过 CAS)
    ↓
EventLoop 线程按顺序执行任务
    ↓
task1 -> task2 -> task3
  • 多个线程可以同时提交任务(MPSC 队列保证线程安全)
  • 但只有一个线程能启动 EventLoop 线程(CAS 保证)
  • 任务按照加入队列的顺序执行

Q4: 为什么需要 wakeup()?

答案:

场景:EventLoop 线程阻塞在 select() 上

// EventLoop 线程
for (;;) {
    select();  // 可能阻塞很长时间(如 1 秒)
    processIO();
    runTasks();  // 如果不唤醒,任务要等 1 秒后才能执行
}

如果不唤醒:

  • 任务已经加入队列,但 EventLoop 线程还在阻塞
  • 任务要等到 select() 超时或有 IO 事件才能执行
  • 延迟可能很大

唤醒后:

  • selector.wakeup() 会立即让 select() 返回
  • EventLoop 线程可以立即处理任务
  • 延迟很小

Q5: inEventLoop() 为什么这么重要?

答案:

场景 1:避免不必要的操作

if (inEventLoop()) {
    // 当前就是 EventLoop 线程
    // 不需要启动线程
    // 不需要唤醒线程
} else {
    // 当前不是 EventLoop 线程
    // 需要启动线程(如果未启动)
    // 需要唤醒线程(如果阻塞)
}

场景 2:保证线程安全

if (inEventLoop()) {
    // 可以直接操作 Channel,不需要加锁
    channel.write(msg);
} else {
    // 需要提交任务到 EventLoop
    eventLoop.execute(() -> channel.write(msg));
}

场景 3:性能优化

// 在 EventLoop 线程中
for (int i = 0; i < 1000; i++) {
    channel.write(msg);  // 直接操作,很快
}

// 在其他线程中
for (int i = 0; i < 1000; i++) {
    channel.write(msg);  // 每次都要提交任务,较慢
}

Q6: 为什么使用 CAS 而不是 synchronized?

答案:

CAS 的优势:

  1. 无锁化:不需要获取锁,避免线程阻塞
  2. 性能好:在竞争不激烈时,性能远好于 synchronized
  3. 公平性:所有线程都有机会成功,不会饿死

synchronized 的劣势:

// 如果使用 synchronized
synchronized (this) {
    if (state == ST_NOT_STARTED) {
        state = ST_STARTED;
        doStartThread();
    }
}
  • 所有线程都要竞争锁
  • 即使线程只是添加任务,也要等待锁
  • 性能较差

CAS 的实现:

// 使用 CAS
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
    doStartThread();
}
  • 只有需要启动线程的线程才会执行 CAS
  • 其他线程直接跳过,不会阻塞
  • 性能更好

Q7: EventLoop 线程什么时候会退出?

答案:

正常退出:

// 调用优雅关闭
eventLoop.shutdownGracefully();
    ↓
设置状态为 ST_SHUTTING_DOWNEventLoop 线程检测到状态变化
    ↓
退出事件循环
    ↓
执行 finally 块中的清理工作
    ↓
线程退出

异常退出:

// run() 方法抛出异常
try {
    SingleThreadEventExecutor.this.run();
} catch (Throwable t) {
    logger.warn("Unexpected exception", t);
}
finally {
    // 执行清理工作
}

关键点:

  • 线程不会自动退出,需要显式关闭
  • 优雅关闭会等待任务执行完
  • 异常退出也会执行清理工作

八、总结

8.1 execute() 方法的核心职责

  1. 接收任务:接收用户提交的 Runnable 任务
  2. 加入队列:将任务添加到 taskQueue
  3. 启动线程:如果线程未启动,启动 EventLoop 线程(只启动一次)
  4. 唤醒线程:如果线程阻塞,唤醒它来处理任务

8.2 关键步骤回顾

execute(task)
    ↓
1. 参数校验
    ↓
2. inEventLoop() - 判断当前线程
    ↓
3. addTask(task) - 加入队列
    ↓
4. if (!inEventLoop) {
       startThread() - 启动线程(如果需要)
       检查关闭状态
   }
    ↓
5. wakeup() - 唤醒线程(如果需要)

8.3 线程启动流程

startThread()
    ↓
检查状态:state == ST_NOT_STARTED
    ↓
CAS 操作:ST_NOT_STARTED -> ST_STARTED
    ↓
doStartThread()
    ↓
executor.execute(runnable)
    ↓
创建新线程并启动
    ↓
thread = Thread.currentThread()
    ↓
进入事件循环:run()
    ↓
for (;;) {
    select()      - 等待 IO 事件
    processIO()   - 处理 IO 事件
    runTasks()    - 执行任务队列
}

8.4 重要特性

1. 延迟启动
  • 线程不是在构造时创建,而是在第一次使用时创建
  • 节省资源,避免空转
2. 单线程模型
  • 每个 EventLoop 只有一个线程
  • 通过 CAS 保证只启动一次
  • 避免并发问题
3. 任务队列
  • 使用 MPSC 队列,支持多生产者单消费者
  • 无界队列,不会阻塞
  • 保证任务的顺序性
4. 线程安全
  • 通过 inEventLoop() 判断当前线程
  • 在 EventLoop 线程中可以直接操作,不需要加锁
  • 在其他线程中需要提交任务
5. 唤醒机制
  • 如果线程阻塞在 select() 上,需要唤醒
  • 使用 CAS 避免重复唤醒
  • 保证任务能及时执行

8.5 与其他组件的关系

用户代码
    ↓
调用 execute(task)
    ↓
SingleThreadEventExecutor
    ↓
    ├─ taskQueue(存储任务)
    ├─ thread(工作线程)
    └─ executor(线程创建器)
    ↓
NioEventLoop
    ↓
    ├─ Selector(监听 IO 事件)
    ├─ processSelectedKeys()(处理 IO)
    └─ runAllTasks()(执行任务)

8.6 设计模式

1. 生产者-消费者模式
  • 多个线程生产任务(调用 execute)
  • 一个线程消费任务(EventLoop 线程)
  • 通过队列解耦
2. 单例模式
  • 每个 EventLoop 只有一个线程
  • 通过 CAS 保证只创建一次
3. 模板方法模式
  • execute() 定义了执行流程
  • run() 由子类实现具体逻辑
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐