问题

Redisson版本: 3.12.5
在使用 redisson 的 lock api 时,如果跟 redis 之间的连接出现了中断,会导致调用方挂死。

样例代码:

// 1. start redis server
// 2. 初始化 RedissonClient
RedissonClient redissonClient = ...
// 3. stop redis server
// 这时候连接断开了,lock()调用挂起到永远
redissonClient.getLock(key).lock(); 

输出:

2020-08-20 00:26:49 [main] INFO  org.redisson.Version - Redisson 3.12.5
2020-08-20 00:26:50 [redisson-netty-2-9] INFO  o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:26:50 [redisson-netty-2-10] INFO  o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:26:51 [redisson-timer-4-1] WARN  io.netty.util.HashedWheelTimer - An exception was thrown by TimerTask.
java.lang.NullPointerException: cause
	at io.netty.util.internal.ObjectUtil.checkNotNull(ObjectUtil.java:33)
	at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:606)
	at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
	at org.redisson.misc.RedissonPromise.tryFailure(RedissonPromise.java:96)
	at org.redisson.command.RedisExecutor$2.run(RedisExecutor.java:228)
	at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
	at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
	at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

虽然打印了错误堆栈,但是程序不会退出。

分析

根据错误堆栈查找到:

// org.redisson.command.RedisExecutor
private void scheduleRetryTimeout(RFuture<RedisConnection> connectionFuture, RPromise<R> attemptPromise) {
    ...
        if (attempt == attempts) {
        	attemptPromise.tryFailure(exception);
        	return;
    	}
    ...
}    

debug后可以发现这里的 exception 在某些条件下会为 null,导致打印了 NPE 堆栈。
检查这个方法后,发现:

// org.redisson.command.RedisExecutor
private void scheduleRetryTimeout(RFuture<RedisConnection> connectionFuture, RPromise<R> attemptPromise) {
	...
	if (connectionFuture.cancel(false)) {
		if (exception == null) {
			...
		}
	} else {
		if (connectionFuture.isSuccess()) {
			if (writeFuture == null || !writeFuture.isDone()) {
				...
			}

			if (writeFuture.isSuccess()) {
				return;
			} 
			// 这里少了个 else 的判断,导致在 write failed 的时候,木有创建异常
		}
	}
	...
}

代码只判断了 write success 的情况,没有对 write fail 做处理。修改源码,加上:

...
		    if (writeFuture.isSuccess()) {
		        return;
		    } else if (exception == null) {
		        exception = new RedisException("===== Write failed.");
		    }
...

再次运行,输出:

2020-08-20 00:41:02 [main] INFO  org.redisson.Version - Redisson 3.12.5
2020-08-20 00:41:03 [redisson-netty-2-9] INFO  o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:41:03 [redisson-netty-2-13] INFO  o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379

这次没有错误了,但是程序还是挂死,看来问题不是那么简单。

这时候就需要做个 thread dump,看看那里挂起了。

根据上下文查找出挂起的地方:

"main" #1 prio=5 os_prio=0 tid=0x00007f4878011000 nid=0x7ae4 in Object.wait() [0x00007f487f94f000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000000da0ca0d0> (a io.netty.util.concurrent.ImmediateEventExecutor$ImmediatePromise)
	at java.lang.Object.wait(Object.java:502)
	at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:247)
	- locked <0x00000000da0ca0d0> (a io.netty.util.concurrent.ImmediateEventExecutor$ImmediatePromise)
	at org.redisson.misc.RedissonPromise.await(RedissonPromise.java:110)
	at org.redisson.misc.RedissonPromise.await(RedissonPromise.java:35)
	at org.redisson.command.CommandAsyncService.get(CommandAsyncService.java:139)
	at org.redisson.RedissonObject.get(RedissonObject.java:90)
	at org.redisson.RedissonLock.tryAcquire(RedissonLock.java:226)
	at org.redisson.RedissonLock.lock(RedissonLock.java:180)
	at org.redisson.RedissonLock.lock(RedissonLock.java:152)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)

看到是 DefaultPromise.await() 方法里调用了 Object.wait()
修改源码,加入测试打印信息:

// io.netty.util.concurrent.DefaultPromise
@Override
public Promise<V> await() throws InterruptedException {
	...
	synchronized (this) {
		while (!isDone()) {
			incWaiters();
			try {
				System.out.println("before wait: " + this + ", " + this.waiters);
				wait();
			} finally {
				decWaiters();
				System.out.println("after wait: " + this + ", " + this.waiters);
			}
		}
	}
	return this;
}

重新运行,输出:

2020-08-20 00:51:11 [main] INFO  org.redisson.Version - Redisson 3.12.5
2020-08-20 00:51:11 [redisson-netty-2-12] INFO  o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
2020-08-20 00:51:11 [redisson-netty-2-13] INFO  o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
before wait: ImmediateEventExecutor$ImmediatePromise@420745d7(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@420745d7(success), 0
before wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(success: 1), 0
before wait: ImmediateEventExecutor$ImmediatePromise@5c09d180(incomplete), 1

从这里可以看出 waiters 的数量为1,然后进入了 wait()
wait() 必定有 notify() 或者 notifyAll(),而且因为 wait() 的对象还是 this,所以 DefaultPromise 里面必定有相应的方法来唤醒等待的线程。

DefaultPromise 里面搜索一下,发现:

// io.netty.util.concurrent.DefaultPromise
private synchronized boolean checkNotifyWaiters() {
	if (waiters > 0) {
		notifyAll();
	}
	return listeners != null;
}

果然有 notifyAll() ,而且前提还是 waiters > 0
回头看一下 NPE 的错误堆栈,可以发现这个调用链:

// org.redisson.command.RedisExecutor
if (attempt == attempts) {
	attemptPromise.tryFailure(exception);
	return;
}
--->
// org.redisson.misc.RedissonPromise
@Override
public boolean tryFailure(Throwable cause) {
	if (promise.tryFailure(cause)) {
		completeExceptionally(cause);
		return true;
	}
	return false;
}
--->
// io.netty.util.concurrent.DefaultPromise
@Override
public boolean tryFailure(Throwable cause) {
	return setFailure0(cause);
}
->
private boolean setFailure0(Throwable cause) {
	return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
->
private boolean setValue0(Object objResult) {
	if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
			RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
		if (checkNotifyWaiters()) {
			notifyListeners();
		}
		return true;
	}
	return false;
}
->
private synchronized boolean checkNotifyWaiters() {
	if (waiters > 0) {
		notifyAll();
	}
	return listeners != null;
}

根据上面的测试打印,可以知道 waiters 是大于0的,所以当出现 write failed 后,最终应该是能调用得到 notifyAll() 的,但现在却木有生效。

你可能会认为是以下判断为 false 导致没调用到 notifyAll

// io.netty.util.concurrent.DefaultPromise
private boolean setValue0(Object objResult) {
	if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
			RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
		...
	}
}

实际上,就算把这个判断去掉,也是一样不通过的。
这时候就应该怀疑是不是那个问题了。(啥问题?你猜,hah~~)

接下来就是一个反复阅读源码 + debug 的过程,此处省略一万字。

重点看这个方法:

// org.redisson.RedissonLock
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
	CommandBatchService executorService = createCommandBatchService();
	RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
	if (!(commandExecutor instanceof CommandBatchService)) {    
		executorService.executeAsync();
	}
	return result;
}

这里返回的 result,它的 await() 方法最终会被调用,结果就是导致程序被永远挂起,它对应到上面测试打印信息里面的最后一条:

before wait: ImmediateEventExecutor$ImmediatePromise@5c09d180(incomplete), 1

executorService.executeAsync() 则最终会因为 write failed 而经过上面论述的调用链。
修改源码,加些测试打印信息:

// org.redisson.RedissonLock
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
	CommandBatchService executorService = createCommandBatchService();
	RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
	System.out.println("======== main result: " + result);
	if (!(commandExecutor instanceof CommandBatchService)) {    
		RFuture<BatchResult<?>> rs = executorService.executeAsync();
		System.out.println("======== async result: " + rs);
	}
	return result;
}


// io.netty.util.concurrent.DefaultPromise
private boolean setFailure0(Throwable cause) {
	// 为了更好地看出问题,这里也加入测试打印,这个 cause message 的内容是前面修复 NPE 异常时指定的
	if (cause != null && "===== Write failed.".equals(cause.getMessage())) {
		System.out.println("========== setFailure0: " + this + ", waiters: " + waiters);
	}
	return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}

重新启动,输出:

2020-08-20 01:48:39 [main] INFO  org.redisson.Version - Redisson 3.12.5
2020-08-20 01:48:40 [redisson-netty-2-14] INFO  o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
2020-08-20 01:48:40 [redisson-netty-2-11] INFO  o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
before wait: ImmediateEventExecutor$ImmediatePromise@420745d7(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@420745d7(success), 0
before wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(success: 1), 0
======== main result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete)]
======== async result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete)]
before wait: ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete), 1
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@385ecb2e(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@7d5c5241(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete), waiters: 0

有意思了。

  1. main result (@2c0f7678) 和 async result (@88a8218) 是两个不同的 RedissonPromise 对象。
  2. main result (@2c0f7678) 走进了 wait()waiters == 1
  3. async result (@88a8218) 因为 write failed 也走到了 setFailure0(),但是它的 waiters == 0,所以不会 notifyAll()

问题就出在这里,wait()notifyAll() 的调用分属2个不同的对象。

解决方法

修改源码:

// org.redisson.RedissonLock
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
	CommandBatchService executorService = createCommandBatchService();
	RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
	System.out.println("======== main result: " + result);
	if (!(commandExecutor instanceof CommandBatchService)) {
		RFuture<BatchResult<?>> rs = executorService.executeAsync();
		System.out.println("======== async result: " + rs);
		// 以下的强制类型转换可能存在问题,请根据需要进行优化
		rs.onComplete((v, e) -> {
			if (e == null) {
				((RPromise) result).trySuccess(v);
			} else {
				((RPromise) result).tryFailure(e);
			}
		});
	}
	return result;
}

重新启动,输出:

2020-08-20 02:06:46 [main] INFO  org.redisson.Version - Redisson 3.12.5
2020-08-20 02:06:47 [redisson-netty-2-13] INFO  o.r.c.pool.MasterConnectionPool - 5 connections initialized for localhost/127.0.0.1:16379
2020-08-20 02:06:47 [redisson-netty-2-14] INFO  o.r.c.p.MasterPubSubConnectionPool - 1 connections initialized for localhost/127.0.0.1:16379
before wait: ImmediateEventExecutor$ImmediatePromise@420745d7(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@420745d7(success), 0
before wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(incomplete), 1
after wait: ImmediateEventExecutor$ImmediatePromise@7e11ab3d(success: 1), 0
======== main result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete)]
======== async result: RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete)]
before wait: ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete), 1
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@68bf51eb(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@2dbdc298(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@88a8218(incomplete), waiters: 0
========== setFailure0: ImmediateEventExecutor$ImmediatePromise@2c0f7678(incomplete), waiters: 1
after wait: ImmediateEventExecutor$ImmediatePromise@2c0f7678(failure: org.redisson.client.RedisException: ===== Write failed.), 0

org.redisson.client.RedisException: ===== Write failed.

	at org.redisson.command.RedisExecutor$2.run(RedisExecutor.java:215)
	at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
	at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
	at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

可以看出,在 async result (@88a8218) setFailure0() 后,main result (@2c0f7678) 也跟着 setFailure0(),接着从 wait() 退出,打印了 "after wait … ", waiters 也变回了0,最终把 write failed 的异常给抛了出来。

GitHub 加速计划 / re / redisson
23.06 K
5.31 K
下载
Redisson - Easy Redis Java client with features of In-Memory Data Grid. Sync/Async/RxJava/Reactive API. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring Cache, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache ...
最近提交(Master分支:2 个月前 )
15bd94ed - 2 个月前
d220f2a8 - 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐