思考的心情
思考的心情
发布于 2025-04-28 / 28 阅读
0

Redisson RMapCache定时清理任务源码分析

代码分析

Redisson中RMapCache的实现是RedissonMapCache,先看RedissonMapCache构造器,里面有一个EvictionScheduler用于实现清理任务调度

    public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
                            String name, RedissonClient redisson, MapCacheOptions<K, V> options, WriteBehindService writeBehindService) {
        super(commandExecutor, name, redisson, options, writeBehindService);
        this.timeoutSetName = getTimeoutSetName(getRawName());
        this.idleSetName = getIdleSetName(getRawName());
        this.lastAccessTimeSetName = getLastAccessTimeSetName(getRawName());
        this.optionsName = getOptionsName(getRawName());
        
        //注册清理任务,Redisson调用该构造器的时候,会传入
        if (evictionScheduler != null) {
            evictionScheduler.schedule(getRawName(), timeoutSetName, idleSetName,
                    getExpiredChannelName(), lastAccessTimeSetName, options,
                    commandExecutor.getConnectionManager().getSubscribeService().getPublishCommand());
        }
        this.evictionScheduler = evictionScheduler;
        this.publishCommand = commandExecutor.getConnectionManager().getSubscribeService().getPublishCommand();
    }

查看EvictionScheduler中关联代码,这里可以看到具体的清理任务是MapCacheEvictionTask

    public void schedule(String name, String timeoutSetName, String maxIdleSetName,
                         String expiredChannelName, String lastAccessTimeSetName, MapCacheOptions<?, ?> options,
                         String publishCommand) {
        boolean removeEmpty;
        if (options != null) {
            removeEmpty = options.isRemoveEmptyEvictionTask();
        } else {
            removeEmpty = false;
        }
        // 开始调度task
        addTask(name, () -> new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName,
                executor, removeEmpty, this, publishCommand));
    }

    private void addTask(String name, Supplier<EvictionTask> supplier) {
        //调度开始,下面看EvictionTask中的实现
        tasks.computeIfAbsent(name, k -> {
            EvictionTask task = supplier.get();
            task.schedule();
            return task;
        });
    }

MapCacheEvictionTask继承EvictionTask,里面主要实现了execute方法,通过lua脚本去清理redis中的数据,定时任务的调度在父类EvictionTask中

public class MapCacheEvictionTask extends EvictionTask {

    private final String name;
    private final String timeoutSetName;
    private final String maxIdleSetName;
    private final String expiredChannelName;
    private final String lastAccessTimeSetName;
    private final String executeTaskOnceLatchName;
    private boolean removeEmpty;

    private EvictionScheduler evictionScheduler;

    private String publishCommand;

    public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName,
                                String expiredChannelName, String lastAccessTimeSetName, CommandAsyncExecutor executor,
                                boolean removeEmpty, EvictionScheduler evictionScheduler, String publishCommand) {
        super(executor);
        this.name = name;
        this.timeoutSetName = timeoutSetName;
        this.maxIdleSetName = maxIdleSetName;
        this.expiredChannelName = expiredChannelName;
        this.lastAccessTimeSetName = lastAccessTimeSetName;
        this.executeTaskOnceLatchName = RedissonObject.prefixName("redisson__execute_task_once_latch", name);
        this.removeEmpty = removeEmpty;
        this.evictionScheduler = evictionScheduler;
        this.publishCommand = publishCommand;
    }

    @Override
    String getName() {
        return name;
    }

    @Override
    CompletionStage<Integer> execute() {
        int latchExpireTime = Math.min(delay, 30);
        RFuture<Integer> expiredFuture = executor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
                "if redis.call('setnx', KEYS[6], ARGV[4]) == 0 then "
                 + "return -1;"
              + "end;"
              + "redis.call('expire', KEYS[6], ARGV[3]); "
               +"local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                + "for i, key in ipairs(expiredKeys1) do "
                    + "local v = redis.call('hget', KEYS[1], key); "
                    + "if v ~= false then "
                        + "local t, val = struct.unpack('dLc0', v); "
                        + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
                        + "local listeners = redis.call(ARGV[5], KEYS[4], msg); "
                        + "if (listeners == 0) then "
                            + "break;"
                        + "end; "
                    + "end;"
                + "end;"
                + "for i=1, #expiredKeys1, 5000 do "
                    + "redis.call('zrem', KEYS[5], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
                    + "redis.call('zrem', KEYS[3], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
                    + "redis.call('zrem', KEYS[2], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
                    + "redis.call('hdel', KEYS[1], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
                + "end; "
              + "local expiredKeys2 = redis.call('zrangebyscore', KEYS[3], 0, ARGV[1], 'limit', 0, ARGV[2]); "
              + "for i, key in ipairs(expiredKeys2) do "
                  + "local v = redis.call('hget', KEYS[1], key); "
                  + "if v ~= false then "
                      + "local t, val = struct.unpack('dLc0', v); "
                      + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
                      + "local listeners = redis.call(ARGV[5], KEYS[4], msg); "
                      + "if (listeners == 0) then "
                          + "break;"
                      + "end; "
                  + "end;"
              + "end;"
              + "for i=1, #expiredKeys2, 5000 do "
                  + "redis.call('zrem', KEYS[5], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
                  + "redis.call('zrem', KEYS[3], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
                  + "redis.call('zrem', KEYS[2], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
                  + "redis.call('hdel', KEYS[1], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
              + "end; "
              + "return #expiredKeys1 + #expiredKeys2;",
              Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executeTaskOnceLatchName),
              System.currentTimeMillis(), keysLimit, latchExpireTime, 1, publishCommand);

        if (removeEmpty) {
            CompletionStage<Integer> r = expiredFuture.thenCompose(removed -> {
                RFuture<Integer> s = executor.readAsync(name, IntegerCodec.INSTANCE, RedisCommands.HLEN, name);
                return s.thenCompose(size -> {
                    if (size == 0) {
                        evictionScheduler.remove(name);
                        RFuture<Long> f = executor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.DEL, name);
                        return f.thenApply(res -> {
                            return removed;
                        });
                    }
                    return CompletableFuture.completedFuture(removed);
                });
            });
            return new CompletableFutureWrapper<>(r);
        }
        return expiredFuture;
    }

}

核心lua脚本分析(已替换argv和keys为具体变量)

# 加锁
if redis.call('setnx', executeTaskOnceLatchName, 1) == 0 then 
    return -1;
end;
redis.call('expire', executeTaskOnceLatchName, latchExpireTime); 

# 写入超时zscore(时间戳从0到curTime),默认的keysLimit是100
local expiredKeys1 = redis.call('zrangebyscore', timeoutSetName, 0, currentTimeMillis, 'limit', 0, keysLimit); 
for i, key in ipairs(expiredKeys1) do 
    local v = redis.call('hget', cacheName, key); 
    if v ~= false then 
        local t, val = struct.unpack('dLc0', v); 
        local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); 
        local listeners = redis.call(publishCommand, expiredChannelName, msg); 
        if (listeners == 0) then 
            break;
        end; 
    end;
end;
# lua脚本大数组分块,5000一组处理,不过由于keyLimit默认100,这里默认情况下只会走到100
for i=1, #expiredKeys1, 5000 do 
    redis.call('zrem', lastAccessTimeSetName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); 
    redis.call('zrem', maxIdleSetName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); 
    redis.call('zrem', timeoutSetName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); 
    redis.call('hdel', cacheName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); 
end; 

# 读取超时zscore,逻辑和上面一段类似
local expiredKeys2 = redis.call('zrangebyscore', maxIdleSetName, 0, currentTimeMillis, 'limit', 0, keysLimit); 
for i, key in ipairs(expiredKeys2) do 
    local v = redis.call('hget', cacheName, key); 
    if v ~= false then 
        local t, val = struct.unpack('dLc0', v); 
        local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); 
        local listeners = redis.call(publishCommand, expiredChannelName, msg); 
        if (listeners == 0) then 
            break;
        end; 
    end;
end;

# 逻辑和上面写入超时部分类似
for i=1, #expiredKeys2, 5000 do 
    redis.call('zrem', lastAccessTimeSetName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); 
    redis.call('zrem', maxIdleSetName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); 
    redis.call('zrem', timeoutSetName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); 
    redis.call('hdel', cacheName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); 
end; 

return #expiredKeys1 #expiredKeys2;

继续分析EvictionTask

EvictionTask实现Runnable的一个抽象类,通过redisson内部维护HashedWheelTimer去运行来执行清理任务,同时根据最近三次的清理情况控制清理间隔(具体见代码中的注释)。

abstract class EvictionTask implements TimerTask {

    private final Logger log = LoggerFactory.getLogger(getClass());
    
    final Deque<Integer> sizeHistory = new LinkedList<>();
    final int minDelay;
    final int maxDelay;
    final int keysLimit;
    
    int delay = 5;

    final CommandAsyncExecutor executor;

    volatile Timeout timeout;

    EvictionTask(CommandAsyncExecutor executor) {
        super();
        this.executor = executor;
        this.minDelay = executor.getServiceManager().getCfg().getMinCleanUpDelay();
        this.maxDelay = executor.getServiceManager().getCfg().getMaxCleanUpDelay();
        this.keysLimit = executor.getServiceManager().getCfg().getCleanUpKeysAmount();
        this.delay = minDelay;
    }

    public void schedule() {
        timeout = executor.getServiceManager().newTimeout(this, delay, TimeUnit.SECONDS);
    }

    public void cancel() {
        timeout.cancel();
    }

    abstract CompletionStage<Integer> execute();
    
    abstract String getName();
    
    @Override
    public void run(Timeout timeout) {
        if (executor.getServiceManager().isShuttingDown()) {
            return;
        }

        CompletionStage<Integer> future = execute();
        future.whenComplete((size, e) -> {
            if (e != null) {
                log.error("Unable to evict elements for '{}'", getName(), e);
                schedule();
                return;
            }

            log.debug("{} elements evicted. Object name: {}", size, getName());
            
            if (size == -1) {
                schedule();
                return;
            }

            if (sizeHistory.size() == 2) {

                // first > last > cur 最近三次清理数量递减,延迟调度
                if (sizeHistory.peekFirst() > sizeHistory.peekLast()
                        && sizeHistory.peekLast() > size) {
                    delay = Math.min(maxDelay, (int) (delay*1.5));
                }

//                    if (sizeHistory.peekFirst() < sizeHistory.peekLast()
//                            && sizeHistory.peekLast() < size) {
//                        prevDelay = Math.max(minDelay, prevDelay/2);
//                    }
                // first == last == cur,最近三次清理数量相同
                if (sizeHistory.peekFirst().intValue() == sizeHistory.peekLast()
                        && sizeHistory.peekLast().intValue() == size) {
                    // 最近三次清理都达到最大限制,增加调度频率
                    if (size >= keysLimit) {
                        delay = Math.max(minDelay, delay/4);
                    }
                    // 最近三次全部为0,减少调度
                    if (size == 0) {
                        delay = Math.min(maxDelay, (int) (delay*1.5));
                    }
                }

                sizeHistory.pollFirst();
            }

            sizeHistory.add(size);
            // 调度下一轮
            schedule();
        });
    }

}

小结

RMapCache的定时清理任务挺巧秒的,通过最近三次清理数量动态控制清理间隔时间,定时任务调度和Redisson的看门狗机制比较类似,都是在一个任务完成后,立刻调度下一轮。