Java并发编程:ConcurrentLinkedQueue设计与实现
一、概述
在 Java 并发编程中,队列是一种极其常用的数据结构。JDK 提供了两类并发安全队列:
-
阻塞队列(如
LinkedBlockingQueue):使用锁(ReentrantLock)实现,当队列空或满时线程会阻塞。 -
非阻塞队列(如
ConcurrentLinkedQueue):基于 CAS(Compare And Swap)无锁算法,通过不断重试而非挂起线程来实现线程安全。
ConcurrentLinkedQueue 是一个无界、线程安全、非阻塞的队列,底层采用单向链表结构。它遵循 FIFO(先进先出)原则,不允许 null 元素。在高并发场景下,其性能通常优于阻塞队列,因为避免了线程上下文切换的开销。
适用场景:生产者-消费者模型、任务队列、事件处理等对吞吐量要求较高且不希望线程阻塞的场景。
二、核心数据结构
1. Node 节点类
队列的每个元素被包装成一个 Node 对象:
private static class Node<E> {
volatile E item; // 节点存储的元素
volatile Node<E> next; // 后继指针
Node(E item) {
// 使用 Unsafe 直接设置 item 字段,避免指令重排序问题
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
// 原子地替换 item 值
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
// 延迟设置 next(允许其他线程稍后才看到这个修改,但性能更好)
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
// 原子地设置 next 指针
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe 相关静态代码块(获取字段偏移量)
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
关键点解释:
-
volatile修饰item和next:保证多线程环境下的可见性,一个线程修改后,其他线程立刻看到最新值。 -
Unsafe提供的 CAS 操作:casItem和casNext是原子操作,用于实现无锁并发修改。CAS 操作会检查预期值是否匹配,匹配则更新为新值,整个过程是原子的(通过 CPU 指令cmpxchg实现)。 -
lazySetNext:使用putOrderedObject,它仍然保证了最终一致性,但允许延迟写(StoreStore 屏障,不强制立即冲刷到主内存),性能优于volatile写,常用于引用替换后不再依赖旧引用的情况。
2. 头尾指针
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
// 初始化时 head 和 tail 都指向一个 item = null 的哨兵节点
head = tail = new Node<E>(null);
}
-
哨兵节点:
head和tail最初指向一个空节点(item = null)。这样做简化了边界条件处理,避免频繁判断头尾是否为空。 -
volatile修饰:保证head和tail引用的可见性,任何线程都能及时看到队列的头尾变化。
三、offer 操作 —— 入队原理
offer(E e) 方法用于在队列末尾添加一个元素。由于队列无界,它总是返回 true(除非参数为 null 抛出异常)。该方法不会阻塞线程,而是通过自旋 + CAS 不断尝试追加节点。
源码逐段分析
public boolean offer(E e) {
checkNotNull(e); // (1) 不允许 null 元素
final Node<E> newNode = new Node<>(e); // (2) 创建新节点
for (Node<E> t = tail, p = t;;) { // (3) p 用于遍历,t 保存最初的 tail
Node<E> q = p.next; // (4) 获取 p 的后继节点
if (q == null) { // (5) 如果 p 是最后一个节点
if (p.casNext(null, newNode)) { // (6) CAS 将 p.next 从 null 设置为 newNode
if (p != t) // (7) 如果 p 不是最初的 tail,则尝试更新 tail 指针
casTail(t, newNode);
return true;
}
}
else if (p == q) { // (8) 遇到哨兵自引用节点(由 poll 产生)
p = (t != (t = tail)) ? t : head; // (9) 重新定位:使用新 tail 或 head
}
else { // (10) 正常情况,继续向后遍历
p = (p != t && t != (t = tail)) ? t : q;
}
}
}
详细流程
场景1:单线程首次添加
初始队列:head 和 tail 指向哨兵节点(item=null, next=null)。
head/tail → 哨兵节点 (null) ,p = t = 哨兵,q = p.next = null
-
进入
if (q == null),执行p.casNext(null, newNode)成功。 -
此时
p == t(都指向哨兵),不执行casTail,直接返回true。 -
队列状态:哨兵节点 → newNode(item1) → null,
head仍指向哨兵,tail仍指向哨兵(延迟更新)。
为什么要延迟更新 tail? 因为每次添加都 CAS 更新
tail会有额外开销。允许tail滞后(最多一个节点),减少 CAS 竞争,提升吞吐量。
场景2:多线程竞争追加
假设当前队列已有节点:哨兵 → node1 → null,tail 仍指向哨兵(滞后状态)。
线程 A 和线程 B 同时调用 offer:
-
线程 A 执行
for循环,t = tail(哨兵),p = t,q = p.next = node1。q != null且p != q,进入else分支,p = q(即 p 移动到 node1)。 -
此时线程 B 也进入循环,但可能线程 A 已经将 node1 的
next修改?实际上,线程 A 尚未修改成功。我们看 CAS 竞争:
线程 A 继续循环,发现 p = node1,q = p.next = null,于是尝试 p.casNext(null, newNodeA),成功。此时 p != t,所以线程 A 继续尝试 casTail(t, newNodeA) 将 tail 从哨兵更新为 newNodeA。
线程 B 此时可能还在遍历:它看到的 tail 可能已被线程 A 更新为 newNodeA,也可能还是哨兵。最终线程 B 会找到真正的尾节点(newNodeA),然后在其后追加 newNodeB。
关键点:CAS 保证了同时只有一个线程能成功修改 next 指针,失败的线程会重新读取最新的 tail 并继续尝试,直到成功。这个过程是无锁的自旋,不会阻塞线程。
场景3:遇到自引用节点(p == q)
当执行 poll 移除元素后,可能产生节点的 next 指向自身的情况(自引用)。此时 p == q,说明当前节点是已经被逻辑删除的节点。代码会重新定位:如果 tail 已被其他线程更新则使用新 tail,否则使用 head(从头开始找有效节点)。这样保证遍历不会陷入死循环。
四、poll 操作 —— 出队原理
poll() 方法从队列头部获取并移除一个元素,如果队列为空则返回 null。同样采用自旋 + CAS 实现。
源码逐段分析
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item; // (1) 获取当前节点的值
if (item != null && p.casItem(item, null)) { // (2) CAS 将 item 设为 null
if (p != h) // (3) 如果 p 不是原来的 head,则更新 head
updateHead(h, ((q = p.next) != null) ? q : p);
return item; // (4) 返回被移除的值
}
else if ((q = p.next) == null) { // (5) 如果后继为 null,说明队列为空
updateHead(h, p);
return null;
}
else if (p == q) // (6) 遇到自引用,跳转到外层循环重新开始
continue restartFromHead;
else // (7) 继续向后遍历
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p)) // 原子地将 head 从 h 更新为 p
h.lazySetNext(h); // 将原 head 的 next 指向自身(自引用,便于 GC)
}
流程
场景1:队列非空,正常移除
初始队列:哨兵 → node1(item1) → node2(item2) → null,head 指向哨兵。
-
进入内层循环,
h = head = 哨兵,p = h,item = p.item = null。 -
item != null不成立,进入else if ((q = p.next) == null),这里q = node1不为空,继续。 -
else if (p == q)不成立,执行p = q,现在p = node1。 -
继续循环:
item = node1.item = item1(非空),尝试p.casItem(item1, null)成功。 -
由于
p != h,进入updateHead(h, p):-
h是哨兵,p是 node1。casHead(哨兵, node1)成功,将head指向 node1。 -
然后
h.lazySetNext(h),将哨兵的next指向自身(哨兵 → 哨兵),变成自引用节点。
-
-
返回
item1。
此时队列状态:head 指向 node1(item1 已被置为 null),node1 的 next 仍指向 node2,哨兵节点被孤立,等待 GC。
为什么设置自引用? 将已移除的节点的
next指向自身,有助于垃圾回收,并作为“删除标志”,让遍历代码能识别并跳过。
场景2:队列为空时并发竞态
假如队列只有一个哨兵节点,且 head = tail = 哨兵,next = null。线程 A 调用 poll:
-
p = head = 哨兵,item = null,(q = p.next) == null,进入else if,执行updateHead(h, p)(但h == p,casHead不会执行),返回null。
如果此时线程 B 正在执行 offer,线程 A 可能在判断 q == null 时,线程 B 还没来得及将 newNode 链接到 next,因此线程 A 返回 null;但如果线程 B 已经 CAS 成功,那么线程 A 会看到 q != null,继续遍历并成功移除元素。这种弱一致性是并发队列的正常行为。
场景3:遇到自引用节点(p == q)
当某个节点的 next 指向自身(由 updateHead 产生),遍历到该节点时 p == q,会触发 continue restartFromHead,跳回最外层循环,重新获取最新的 head,避免死循环。
五、关键设计总结
1. 为什么不用锁?
-
锁的代价:线程阻塞和唤醒涉及用户态/内核态切换,上下文开销大。
-
CAS + 自旋:在低到中度竞争下,通过 CPU 指令原子操作和循环重试,性能远高于锁。虽然自旋会占用 CPU,但对于短操作(如入队/出队)非常高效。
2. head 和 tail 的延迟更新
-
入队时并不总是立即更新
tail,允许tail滞后一个节点。这样做减少了 CAS 竞争,因为tail是共享变量,每次都更新会引发缓存一致性流量。 -
出队时
head也不是每次立即更新,而是当p != h时才更新(即“跳两步”策略)。这进一步减少了 CAS 操作。
3. 哨兵节点与自引用
-
哨兵节点:避免了对
head/tail为null的特殊处理,统一代码逻辑。 -
自引用:移除了节点的
next指向自身,既可以帮助 GC,又作为“无效节点”标记,让遍历算法能够跳过。
4. 弱一致性与 size() 的不精确性
ConcurrentLinkedQueue 的迭代器是弱一致性的,它不会抛出 ConcurrentModificationException,但迭代时可能看不到其他线程添加的新元素。同理,size() 方法需要遍历整个链表,在遍历过程中队列可能被修改,因此返回值只是一个估计值,不适合用于精确判断队列是否为空,应使用 isEmpty()。
六、性能与适用场景
优势
-
高吞吐量:无锁设计,减少了线程切换和锁争用。
-
无界:不会因为队列满而阻塞生产者(但需注意内存无限增长风险)。
-
低延迟:CAS 操作通常在几十纳秒到几百纳秒之间。
注意事项
-
不适合需要精确容量控制和阻塞等待的场景(此时应选
LinkedBlockingQueue或ArrayBlockingQueue)。 -
大量自旋可能造成 CPU 飙升(尤其是在极高竞争下),但实际中
ConcurrentLinkedQueue表现优秀。
典型应用
-
线程池的任务队列(如
ForkJoinPool内部使用类似设计)。 -
事件处理框架中的无锁事件队列。
-
高并发日志记录(异步日志缓冲)。
七、结语
ConcurrentLinkedQueue 是基于 CAS 的非阻塞并发队列的经典实现。通过 volatile 保证可见性,Unsafe CAS 保证原子性,精妙的自旋和延迟更新策略,实现了高性能的线程安全队列。理解它的原理,不仅能帮助我们正确使用它,更能加深对无锁编程、ABA 问题(这里未涉及,但 CAS 需注意)、内存屏障等底层并发概念的认识。
在实际开发中,根据场景选择合适的并发队列:需要阻塞功能选择 BlockingQueue,追求极致吞吐且能容忍弱一致性,则 ConcurrentLinkedQueue 是绝佳选择。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)