之前生产环境遇到过一个Redisson RFairLock引发的线程hang住的问题,当时导致了P0级别的故障,这里记录一下当时分析的过程
Jstack分析堆栈
首先通过jstack分析一下线程(由于涉及到公司内部代码,没有放出完整的堆栈),发现卡死在redisson的源码中
"scheduler-7" #362 prio=5 os_prio=0 tid=0x00007f58961c0800 nid=0x2624e waiting on condition [0x00007f528b5f3000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006e8496a68> (a java.util.concurrent.Semaphore$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
at java.util.concurrent.Semaphore.acquireUninterruptibly(Semaphore.java:335)
at org.redisson.RedissonLock.lock(RedissonLock.java:133)
at org.redisson.RedissonLock.lock(RedissonLock.java:77)分析org.redisson.RedissonLock.lock(RedissonLock.java:133) 这部分代码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
// 代码进入到这里,getLatch()是获取一个并发量为0的Semaphore,这个地方相当于让线程阻塞
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}从上面代码来看看,在循环获取锁的过程中,执行redis lua脚本返回的ttl < 0 导致持续阻塞
(诊断到这里的时候,其实问题心里大概已经有数,估计大概率是redisson bug或者用法不当导致的,为了尽快恢复环境,赶紧先干掉redis中残留的锁,然后马上重启一下环境,具体bug后面有空再慢慢分析)
RFairLock核心lua脚本分析
继续分析tryAcquire内的核心lua脚本
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
// check if the lock can be acquired now
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// check if the lock is already held, and this is a re-entry
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// the lock cannot be acquired
// check if the thread is already in the queue
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
// threadWaitTime
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), wait, currentTime);
}为了方便阅读,keys和argv替换为了变量名继续分析
-- remove stale threads
-- 清理过期的锁
while true do
local firstThreadId2 = redis.call('lindex', threadsQueueKey, 0);
if firstThreadId2 == false then break end
local timeout =
tonumber(redis.call('zscore', timeoutSetKey, firstThreadId2));
if timeout <= tonumber(currentTimeMillis) then
-- remove the item from the queue and timeout set
-- NOTE we do not alter any other timeout
redis.call('zrem', timeoutSetKey, firstThreadId2);
redis.call('lpop', threadsQueueKey);
else
break
end
end
-- check if the lock can be acquired now
-- 锁不存在 and (线程队列不存在key or 队列第一个元素为当前线程)
-- 线程id队列不会过期,所以这个地方要再判断一下线程队列id的队首是否为当前线程
if (redis.call('exists', lockKey) == 0) and
((redis.call('exists', threadsQueueKey) == 0) or
(redis.call('lindex', threadsQueueKey, 0) == currentThreadId)) then
-- remove this thread from the queue and timeout set
-- 先清理数据
redis.call('lpop', threadsQueueKey);
redis.call('zrem', timeoutSetKey, currentThreadId);
-- decrease timeouts for all waiting in the queue
-- 这个地方逻辑是这样的
-- timeoutSet存储的时间是具体的时间,而非ttl
-- 所以当前面的线程获取到锁后,需要扣除前面线程的等待时间
local keys = redis.call('zrange', timeoutSetKey, 0, -1);
for i = 1, #keys, 1 do
redis.call('zincrby', timeoutSetKey, -tonumber(waitTimeMillis), keys[i]);
end
-- acquire the lock and set the TTL for the lease
-- 加锁成功
redis.call('hset', lockKey, currentThreadId, 1);
redis.call('pexpire', lockKey, leaseTimeMillis);
return nil;
end
-- check if the lock is already held, and this is a re-entry
-- 重入逻辑,计数+1
if redis.call('hexists', lockKey, currentThreadId) == 1 then
redis.call('hincrby', lockKey, currentThreadId, 1);
redis.call('pexpire', lockKey, leaseTimeMillis);
return nil;
end
-- the lock cannot be acquired
-- check if the thread is already in the queue
-- BUG关键点!!!!! 重入场景,计算剩余时间
local timeout = redis.call('zscore', timeoutSetKey, currentThreadId);
if timeout ~= false then
-- the real timeout is the timeout of the prior thread
-- in the queue, but this is approximately correct, and
-- avoids having to traverse the queue
-- 这个逻辑配合上层代码实际有个bug,上层重入逻辑中没有修改timeout时间,这里重入的时候可能会返回负数
-- 场景如下
-- 1. 线程1 10:00 加锁 10s
-- 2. 线程1 10:09 重入加锁 10s
-- 3. 计算 10:10 - 10s - 10:09 = -9s
-- 如果返回负数,上层判断可能会进入永久阻塞,直接导致线程hang死
return timeout - tonumber(waitTimeMillis) - tonumber(currentTimeMillis);
end
-- add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
-- the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
-- threadWaitTime
-- 将任务排到末尾,并且设置超时时间
local lastThreadId = redis.call('lindex', threadsQueueKey, -1);
-- 计算过ttl = 队尾线程过期时间 - 当前时间
local ttl;
if lastThreadId ~= false and lastThreadId ~= currentThreadId then
ttl = tonumber(redis.call('zscore', timeoutSetKey, lastThreadId)) -
tonumber(currentTimeMillis);
else
ttl = redis.call('pttl', lockKey);
end
-- 超时时间为 ttl + 当前时间 + 等待时间 ,其实等同于 队尾时间 + 等待时间
local timeout = ttl + tonumber(waitTimeMillis) + tonumber(currentTimeMillis);
if redis.call('zadd', timeoutSetKey, timeout, currentThreadId) == 1 then
redis.call('rpush', threadsQueueKey, currentThreadId);
end
return ttl;
解决方案
目前此问题已在3.31.1中修复,可以直接升级redisson版本,这里也看一下官方是怎么改的
Fixed - RFairLock doesn't calculate remaining ttl properly before nex… · redisson/redisson@f08dfd8

其实就是改了一下计算的方式,直接返回key的有效期