一、概述

在 Java 并发编程中,队列是一种极其常用的数据结构。JDK 提供了两类并发安全队列:

  • 阻塞队列(如 LinkedBlockingQueue):使用锁(ReentrantLock)实现,当队列空或满时线程会阻塞。

  • 非阻塞队列(如 ConcurrentLinkedQueue):基于 CAS(Compare And Swap)无锁算法,通过不断重试而非挂起线程来实现线程安全。

ConcurrentLinkedQueue 是一个无界、线程安全、非阻塞的队列,底层采用单向链表结构。它遵循 FIFO(先进先出)原则,不允许 null 元素。在高并发场景下,其性能通常优于阻塞队列,因为避免了线程上下文切换的开销。

适用场景:生产者-消费者模型、任务队列、事件处理等对吞吐量要求较高且不希望线程阻塞的场景。


二、核心数据结构

1. Node 节点类

队列的每个元素被包装成一个 Node 对象:

private static class Node<E> {
    volatile E item;          // 节点存储的元素
    volatile Node<E> next;    // 后继指针

    Node(E item) {
        // 使用 Unsafe 直接设置 item 字段,避免指令重排序问题
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        // 原子地替换 item 值
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        // 延迟设置 next(允许其他线程稍后才看到这个修改,但性能更好)
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        // 原子地设置 next 指针
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe 相关静态代码块(获取字段偏移量)
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

关键点解释:

  • volatile 修饰 item 和 next:保证多线程环境下的可见性,一个线程修改后,其他线程立刻看到最新值。

  • Unsafe 提供的 CAS 操作casItem 和 casNext 是原子操作,用于实现无锁并发修改。CAS 操作会检查预期值是否匹配,匹配则更新为新值,整个过程是原子的(通过 CPU 指令 cmpxchg 实现)。

  • lazySetNext:使用 putOrderedObject,它仍然保证了最终一致性,但允许延迟写(StoreStore 屏障,不强制立即冲刷到主内存),性能优于 volatile 写,常用于引用替换后不再依赖旧引用的情况。

2. 头尾指针

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
    // 初始化时 head 和 tail 都指向一个 item = null 的哨兵节点
    head = tail = new Node<E>(null);
}
  • 哨兵节点head 和 tail 最初指向一个空节点item = null)。这样做简化了边界条件处理,避免频繁判断头尾是否为空。

  • volatile 修饰:保证 head 和 tail 引用的可见性,任何线程都能及时看到队列的头尾变化。


三、offer 操作 —— 入队原理

offer(E e) 方法用于在队列末尾添加一个元素。由于队列无界,它总是返回 true(除非参数为 null 抛出异常)。该方法不会阻塞线程,而是通过自旋 + CAS 不断尝试追加节点。

源码逐段分析

public boolean offer(E e) {
    checkNotNull(e);                    // (1) 不允许 null 元素
    final Node<E> newNode = new Node<>(e); // (2) 创建新节点

    for (Node<E> t = tail, p = t;;) {   // (3) p 用于遍历,t 保存最初的 tail
        Node<E> q = p.next;              // (4) 获取 p 的后继节点

        if (q == null) {                 // (5) 如果 p 是最后一个节点
            if (p.casNext(null, newNode)) { // (6) CAS 将 p.next 从 null 设置为 newNode
                if (p != t)             // (7) 如果 p 不是最初的 tail,则尝试更新 tail 指针
                    casTail(t, newNode);
                return true;
            }
        }
        else if (p == q) {               // (8) 遇到哨兵自引用节点(由 poll 产生)
            p = (t != (t = tail)) ? t : head; // (9) 重新定位:使用新 tail 或 head
        }
        else {                           // (10) 正常情况,继续向后遍历
            p = (p != t && t != (t = tail)) ? t : q;
        }
    }
}

详细流程

场景1:单线程首次添加

初始队列:head 和 tail 指向哨兵节点(item=null, next=null)。

head/tail → 哨兵节点 (null) ,p = t = 哨兵,q = p.next = null

  • 进入 if (q == null),执行 p.casNext(null, newNode) 成功。

  • 此时 p == t(都指向哨兵),不执行 casTail,直接返回 true

  • 队列状态:哨兵节点 → newNode(item1) → null,head 仍指向哨兵,tail 仍指向哨兵(延迟更新)。

为什么要延迟更新 tail? 因为每次添加都 CAS 更新 tail 会有额外开销。允许 tail 滞后(最多一个节点),减少 CAS 竞争,提升吞吐量。

场景2:多线程竞争追加

假设当前队列已有节点:哨兵 → node1 → null,tail 仍指向哨兵(滞后状态)。

线程 A 和线程 B 同时调用 offer

  • 线程 A 执行 for 循环,t = tail(哨兵),p = tq = p.next = node1q != null 且 p != q,进入 else 分支,p = q(即 p 移动到 node1)。

  • 此时线程 B 也进入循环,但可能线程 A 已经将 node1 的 next 修改?实际上,线程 A 尚未修改成功。我们看 CAS 竞争:

线程 A 继续循环,发现 p = node1q = p.next = null,于是尝试 p.casNext(null, newNodeA),成功。此时 p != t,所以线程 A 继续尝试 casTail(t, newNodeA) 将 tail 从哨兵更新为 newNodeA

线程 B 此时可能还在遍历:它看到的 tail 可能已被线程 A 更新为 newNodeA,也可能还是哨兵。最终线程 B 会找到真正的尾节点(newNodeA),然后在其后追加 newNodeB

关键点:CAS 保证了同时只有一个线程能成功修改 next 指针,失败的线程会重新读取最新的 tail 并继续尝试,直到成功。这个过程是无锁的自旋,不会阻塞线程。

场景3:遇到自引用节点(p == q

当执行 poll 移除元素后,可能产生节点的 next 指向自身的情况(自引用)。此时 p == q,说明当前节点是已经被逻辑删除的节点。代码会重新定位:如果 tail 已被其他线程更新则使用新 tail,否则使用 head(从头开始找有效节点)。这样保证遍历不会陷入死循环。


四、poll 操作 —— 出队原理

poll() 方法从队列头部获取并移除一个元素,如果队列为空则返回 null。同样采用自旋 + CAS 实现。

源码逐段分析

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;                       // (1) 获取当前节点的值
            if (item != null && p.casItem(item, null)) { // (2) CAS 将 item 设为 null
                if (p != h)                         // (3) 如果 p 不是原来的 head,则更新 head
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;                        // (4) 返回被移除的值
            }
            else if ((q = p.next) == null) {         // (5) 如果后继为 null,说明队列为空
                updateHead(h, p);
                return null;
            }
            else if (p == q)                         // (6) 遇到自引用,跳转到外层循环重新开始
                continue restartFromHead;
            else                                     // (7) 继续向后遍历
                p = q;
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))    // 原子地将 head 从 h 更新为 p
        h.lazySetNext(h);           // 将原 head 的 next 指向自身(自引用,便于 GC)
}

流程

场景1:队列非空,正常移除

初始队列:哨兵 → node1(item1) → node2(item2) → null,head 指向哨兵。

  • 进入内层循环,h = head = 哨兵p = hitem = p.item = null

  • item != null 不成立,进入 else if ((q = p.next) == null),这里 q = node1 不为空,继续。

  • else if (p == q) 不成立,执行 p = q,现在 p = node1

  • 继续循环:item = node1.item = item1(非空),尝试 p.casItem(item1, null) 成功。

  • 由于 p != h,进入 updateHead(h, p)

    • h 是哨兵,p 是 node1。casHead(哨兵, node1) 成功,将 head 指向 node1。

    • 然后 h.lazySetNext(h),将哨兵的 next 指向自身(哨兵 → 哨兵),变成自引用节点。

  • 返回 item1

此时队列状态:head 指向 node1(item1 已被置为 null),node1 的 next 仍指向 node2,哨兵节点被孤立,等待 GC。

为什么设置自引用? 将已移除的节点的 next 指向自身,有助于垃圾回收,并作为“删除标志”,让遍历代码能识别并跳过。

场景2:队列为空时并发竞态

假如队列只有一个哨兵节点,且 head = tail = 哨兵next = null。线程 A 调用 poll

  • p = head = 哨兵item = null(q = p.next) == null,进入 else if,执行 updateHead(h, p)(但 h == pcasHead 不会执行),返回 null

如果此时线程 B 正在执行 offer,线程 A 可能在判断 q == null 时,线程 B 还没来得及将 newNode 链接到 next,因此线程 A 返回 null;但如果线程 B 已经 CAS 成功,那么线程 A 会看到 q != null,继续遍历并成功移除元素。这种弱一致性是并发队列的正常行为。

场景3:遇到自引用节点(p == q

当某个节点的 next 指向自身(由 updateHead 产生),遍历到该节点时 p == q,会触发 continue restartFromHead,跳回最外层循环,重新获取最新的 head,避免死循环。


五、关键设计总结

1. 为什么不用锁?

  • 锁的代价:线程阻塞和唤醒涉及用户态/内核态切换,上下文开销大。

  • CAS + 自旋:在低到中度竞争下,通过 CPU 指令原子操作和循环重试,性能远高于锁。虽然自旋会占用 CPU,但对于短操作(如入队/出队)非常高效。

2. head 和 tail 的延迟更新

  • 入队时并不总是立即更新 tail,允许 tail 滞后一个节点。这样做减少了 CAS 竞争,因为 tail 是共享变量,每次都更新会引发缓存一致性流量。

  • 出队时 head 也不是每次立即更新,而是当 p != h 时才更新(即“跳两步”策略)。这进一步减少了 CAS 操作。

3. 哨兵节点与自引用

  • 哨兵节点:避免了对 head/tail 为 null 的特殊处理,统一代码逻辑。

  • 自引用:移除了节点的 next 指向自身,既可以帮助 GC,又作为“无效节点”标记,让遍历算法能够跳过。

4. 弱一致性与 size() 的不精确性

ConcurrentLinkedQueue 的迭代器是弱一致性的,它不会抛出 ConcurrentModificationException,但迭代时可能看不到其他线程添加的新元素。同理,size() 方法需要遍历整个链表,在遍历过程中队列可能被修改,因此返回值只是一个估计值,不适合用于精确判断队列是否为空,应使用 isEmpty()


六、性能与适用场景

优势

  • 高吞吐量:无锁设计,减少了线程切换和锁争用。

  • 无界:不会因为队列满而阻塞生产者(但需注意内存无限增长风险)。

  • 低延迟:CAS 操作通常在几十纳秒到几百纳秒之间。

注意事项

  • 不适合需要精确容量控制和阻塞等待的场景(此时应选 LinkedBlockingQueue 或 ArrayBlockingQueue)。

  • 大量自旋可能造成 CPU 飙升(尤其是在极高竞争下),但实际中 ConcurrentLinkedQueue 表现优秀。

典型应用

  • 线程池的任务队列(如 ForkJoinPool 内部使用类似设计)。

  • 事件处理框架中的无锁事件队列。

  • 高并发日志记录(异步日志缓冲)。


七、结语

ConcurrentLinkedQueue 是基于 CAS 的非阻塞并发队列的经典实现。通过 volatile 保证可见性,Unsafe CAS 保证原子性,精妙的自旋和延迟更新策略,实现了高性能的线程安全队列。理解它的原理,不仅能帮助我们正确使用它,更能加深对无锁编程、ABA 问题(这里未涉及,但 CAS 需注意)、内存屏障等底层并发概念的认识。

在实际开发中,根据场景选择合适的并发队列:需要阻塞功能选择 BlockingQueue,追求极致吞吐且能容忍弱一致性,则 ConcurrentLinkedQueue 是绝佳选择。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐