思考的心情
思考的心情
发布于 2025-05-01 / 24 阅读
0
0

Redisson 公平锁 bug 导致线程永久hang住源码分析

之前生产环境遇到过一个Redisson RFairLock引发的线程hang住的问题,当时导致了P0级别的故障,这里记录一下当时分析的过程

Jstack分析堆栈

首先通过jstack分析一下线程(由于涉及到公司内部代码,没有放出完整的堆栈),发现卡死在redisson的源码中

"scheduler-7" #362 prio=5 os_prio=0 tid=0x00007f58961c0800 nid=0x2624e waiting on condition [0x00007f528b5f3000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x00000006e8496a68> (a java.util.concurrent.Semaphore$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
 at java.util.concurrent.Semaphore.acquireUninterruptibly(Semaphore.java:335)
 at org.redisson.RedissonLock.lock(RedissonLock.java:133)
 at org.redisson.RedissonLock.lock(RedissonLock.java:77)

分析org.redisson.RedissonLock.lock(RedissonLock.java:133) 这部分代码

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        // 代码进入到这里,getLatch()是获取一个并发量为0的Semaphore,这个地方相当于让线程阻塞
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

从上面代码来看看,在循环获取锁的过程中,执行redis lua脚本返回的ttl < 0 导致持续阻塞

(诊断到这里的时候,其实问题心里大概已经有数,估计大概率是redisson bug或者用法不当导致的,为了尽快恢复环境,赶紧先干掉redis中残留的锁,然后马上重启一下环境,具体bug后面有空再慢慢分析)

RFairLock核心lua脚本分析

继续分析tryAcquire内的核心lua脚本

if (command == RedisCommands.EVAL_LONG) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            // remove stale threads
            "while true do " +
                "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
                "if firstThreadId2 == false then " +
                    "break;" +
                "end;" +

                "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
                "if timeout <= tonumber(ARGV[4]) then " +
                    // remove the item from the queue and timeout set
                    // NOTE we do not alter any other timeout
                    "redis.call('zrem', KEYS[3], firstThreadId2);" +
                    "redis.call('lpop', KEYS[2]);" +
                "else " +
                    "break;" +
                "end;" +
            "end;" +

            // check if the lock can be acquired now
            "if (redis.call('exists', KEYS[1]) == 0) " +
                "and ((redis.call('exists', KEYS[2]) == 0) " +
                    "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +

                // remove this thread from the queue and timeout set
                "redis.call('lpop', KEYS[2]);" +
                "redis.call('zrem', KEYS[3], ARGV[2]);" +

                // decrease timeouts for all waiting in the queue
                "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
                "for i = 1, #keys, 1 do " +
                    "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
                "end;" +

                // acquire the lock and set the TTL for the lease
                "redis.call('hset', KEYS[1], ARGV[2], 1);" +
                "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                "return nil;" +
            "end;" +

            // check if the lock is already held, and this is a re-entry
            "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
                "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
                "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                "return nil;" +
            "end;" +

            // the lock cannot be acquired
            // check if the thread is already in the queue
            "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
            "if timeout ~= false then " +
                // the real timeout is the timeout of the prior thread
                // in the queue, but this is approximately correct, and
                // avoids having to traverse the queue
                "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
            "end;" +

            // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
            // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
            // threadWaitTime
            "local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
            "local ttl;" +
            "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
                "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
            "else " +
                "ttl = redis.call('pttl', KEYS[1]);" +
            "end;" +
            "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
            "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                "redis.call('rpush', KEYS[2], ARGV[2]);" +
            "end;" +
            "return ttl;",
            Arrays.asList(getName(), threadsQueueName, timeoutSetName),
            internalLockLeaseTime, getLockName(threadId), wait, currentTime);
}

为了方便阅读,keys和argv替换为了变量名继续分析

-- remove stale threads
-- 清理过期的锁
while true do
    local firstThreadId2 = redis.call('lindex', threadsQueueKey, 0);
    if firstThreadId2 == false then break end
    local timeout =
        tonumber(redis.call('zscore', timeoutSetKey, firstThreadId2));
    if timeout <= tonumber(currentTimeMillis) then
        -- remove the item from the queue and timeout set
        -- NOTE we do not alter any other timeout
        redis.call('zrem', timeoutSetKey, firstThreadId2);
        redis.call('lpop', threadsQueueKey);
    else
        break
    end
end

-- check if the lock can be acquired now
-- 锁不存在 and  (线程队列不存在key or 队列第一个元素为当前线程)
-- 线程id队列不会过期,所以这个地方要再判断一下线程队列id的队首是否为当前线程
if (redis.call('exists', lockKey) == 0) and
    ((redis.call('exists', threadsQueueKey) == 0) or
        (redis.call('lindex', threadsQueueKey, 0) == currentThreadId)) then
    -- remove this thread from the queue and timeout set
    -- 先清理数据
    redis.call('lpop', threadsQueueKey);
    redis.call('zrem', timeoutSetKey, currentThreadId);
    -- decrease timeouts for all waiting in the queue
    -- 这个地方逻辑是这样的
    -- timeoutSet存储的时间是具体的时间,而非ttl
    -- 所以当前面的线程获取到锁后,需要扣除前面线程的等待时间
    local keys = redis.call('zrange', timeoutSetKey, 0, -1);
    for i = 1, #keys, 1 do
        redis.call('zincrby', timeoutSetKey, -tonumber(waitTimeMillis), keys[i]);
    end
    -- acquire the lock and set the TTL for the lease
    -- 加锁成功
    redis.call('hset', lockKey, currentThreadId, 1);
    redis.call('pexpire', lockKey, leaseTimeMillis);
    return nil;
end
-- check if the lock is already held, and this is a re-entry
-- 重入逻辑,计数+1
if redis.call('hexists', lockKey, currentThreadId) == 1 then
    redis.call('hincrby', lockKey, currentThreadId, 1);
    redis.call('pexpire', lockKey, leaseTimeMillis);
    return nil;
end
-- the lock cannot be acquired
-- check if the thread is already in the queue
-- BUG关键点!!!!!   重入场景,计算剩余时间
local timeout = redis.call('zscore', timeoutSetKey, currentThreadId);
if timeout ~= false then
    -- the real timeout is the timeout of the prior thread
    -- in the queue, but this is approximately correct, and
    -- avoids having to traverse the queue
    -- 这个逻辑配合上层代码实际有个bug,上层重入逻辑中没有修改timeout时间,这里重入的时候可能会返回负数
    -- 场景如下
    -- 1. 线程1 10:00 加锁 10s
    -- 2. 线程1 10:09 重入加锁 10s
    -- 3. 计算  10:10 - 10s - 10:09  = -9s
    -- 如果返回负数,上层判断可能会进入永久阻塞,直接导致线程hang死
    return timeout - tonumber(waitTimeMillis) - tonumber(currentTimeMillis);
end
-- add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
-- the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
-- threadWaitTime
-- 将任务排到末尾,并且设置超时时间
local lastThreadId = redis.call('lindex', threadsQueueKey, -1);
-- 计算过ttl = 队尾线程过期时间 - 当前时间
local ttl;
if lastThreadId ~= false and lastThreadId ~= currentThreadId then
    ttl = tonumber(redis.call('zscore', timeoutSetKey, lastThreadId)) -
              tonumber(currentTimeMillis);
else
    ttl = redis.call('pttl', lockKey);
end
-- 超时时间为 ttl + 当前时间 + 等待时间 ,其实等同于  队尾时间 + 等待时间
local timeout = ttl + tonumber(waitTimeMillis) + tonumber(currentTimeMillis);
if redis.call('zadd', timeoutSetKey, timeout, currentThreadId) == 1 then
    redis.call('rpush', threadsQueueKey, currentThreadId);
end
return ttl;

解决方案

目前此问题已在3.31.1中修复,可以直接升级redisson版本,这里也看一下官方是怎么改的

Fixed - RFairLock doesn't calculate remaining ttl properly before nex… · redisson/redisson@f08dfd8

其实就是改了一下计算的方式,直接返回key的有效期


评论