代码分析
Redisson中RMapCache的实现是RedissonMapCache,先看RedissonMapCache构造器,里面有一个EvictionScheduler用于实现清理任务调度
public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapCacheOptions<K, V> options, WriteBehindService writeBehindService) {
super(commandExecutor, name, redisson, options, writeBehindService);
this.timeoutSetName = getTimeoutSetName(getRawName());
this.idleSetName = getIdleSetName(getRawName());
this.lastAccessTimeSetName = getLastAccessTimeSetName(getRawName());
this.optionsName = getOptionsName(getRawName());
//注册清理任务,Redisson调用该构造器的时候,会传入
if (evictionScheduler != null) {
evictionScheduler.schedule(getRawName(), timeoutSetName, idleSetName,
getExpiredChannelName(), lastAccessTimeSetName, options,
commandExecutor.getConnectionManager().getSubscribeService().getPublishCommand());
}
this.evictionScheduler = evictionScheduler;
this.publishCommand = commandExecutor.getConnectionManager().getSubscribeService().getPublishCommand();
}
查看EvictionScheduler中关联代码,这里可以看到具体的清理任务是MapCacheEvictionTask
public void schedule(String name, String timeoutSetName, String maxIdleSetName,
String expiredChannelName, String lastAccessTimeSetName, MapCacheOptions<?, ?> options,
String publishCommand) {
boolean removeEmpty;
if (options != null) {
removeEmpty = options.isRemoveEmptyEvictionTask();
} else {
removeEmpty = false;
}
// 开始调度task
addTask(name, () -> new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName,
executor, removeEmpty, this, publishCommand));
}
private void addTask(String name, Supplier<EvictionTask> supplier) {
//调度开始,下面看EvictionTask中的实现
tasks.computeIfAbsent(name, k -> {
EvictionTask task = supplier.get();
task.schedule();
return task;
});
}MapCacheEvictionTask继承EvictionTask,里面主要实现了execute方法,通过lua脚本去清理redis中的数据,定时任务的调度在父类EvictionTask中

public class MapCacheEvictionTask extends EvictionTask {
private final String name;
private final String timeoutSetName;
private final String maxIdleSetName;
private final String expiredChannelName;
private final String lastAccessTimeSetName;
private final String executeTaskOnceLatchName;
private boolean removeEmpty;
private EvictionScheduler evictionScheduler;
private String publishCommand;
public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName,
String expiredChannelName, String lastAccessTimeSetName, CommandAsyncExecutor executor,
boolean removeEmpty, EvictionScheduler evictionScheduler, String publishCommand) {
super(executor);
this.name = name;
this.timeoutSetName = timeoutSetName;
this.maxIdleSetName = maxIdleSetName;
this.expiredChannelName = expiredChannelName;
this.lastAccessTimeSetName = lastAccessTimeSetName;
this.executeTaskOnceLatchName = RedissonObject.prefixName("redisson__execute_task_once_latch", name);
this.removeEmpty = removeEmpty;
this.evictionScheduler = evictionScheduler;
this.publishCommand = publishCommand;
}
@Override
String getName() {
return name;
}
@Override
CompletionStage<Integer> execute() {
int latchExpireTime = Math.min(delay, 30);
RFuture<Integer> expiredFuture = executor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"if redis.call('setnx', KEYS[6], ARGV[4]) == 0 then "
+ "return -1;"
+ "end;"
+ "redis.call('expire', KEYS[6], ARGV[3]); "
+"local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "for i, key in ipairs(expiredKeys1) do "
+ "local v = redis.call('hget', KEYS[1], key); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "local listeners = redis.call(ARGV[5], KEYS[4], msg); "
+ "if (listeners == 0) then "
+ "break;"
+ "end; "
+ "end;"
+ "end;"
+ "for i=1, #expiredKeys1, 5000 do "
+ "redis.call('zrem', KEYS[5], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1)))); "
+ "end; "
+ "local expiredKeys2 = redis.call('zrangebyscore', KEYS[3], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "for i, key in ipairs(expiredKeys2) do "
+ "local v = redis.call('hget', KEYS[1], key); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "local listeners = redis.call(ARGV[5], KEYS[4], msg); "
+ "if (listeners == 0) then "
+ "break;"
+ "end; "
+ "end;"
+ "end;"
+ "for i=1, #expiredKeys2, 5000 do "
+ "redis.call('zrem', KEYS[5], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2)))); "
+ "end; "
+ "return #expiredKeys1 + #expiredKeys2;",
Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executeTaskOnceLatchName),
System.currentTimeMillis(), keysLimit, latchExpireTime, 1, publishCommand);
if (removeEmpty) {
CompletionStage<Integer> r = expiredFuture.thenCompose(removed -> {
RFuture<Integer> s = executor.readAsync(name, IntegerCodec.INSTANCE, RedisCommands.HLEN, name);
return s.thenCompose(size -> {
if (size == 0) {
evictionScheduler.remove(name);
RFuture<Long> f = executor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.DEL, name);
return f.thenApply(res -> {
return removed;
});
}
return CompletableFuture.completedFuture(removed);
});
});
return new CompletableFutureWrapper<>(r);
}
return expiredFuture;
}
}核心lua脚本分析(已替换argv和keys为具体变量)
# 加锁
if redis.call('setnx', executeTaskOnceLatchName, 1) == 0 then
return -1;
end;
redis.call('expire', executeTaskOnceLatchName, latchExpireTime);
# 写入超时zscore(时间戳从0到curTime),默认的keysLimit是100
local expiredKeys1 = redis.call('zrangebyscore', timeoutSetName, 0, currentTimeMillis, 'limit', 0, keysLimit);
for i, key in ipairs(expiredKeys1) do
local v = redis.call('hget', cacheName, key);
if v ~= false then
local t, val = struct.unpack('dLc0', v);
local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val);
local listeners = redis.call(publishCommand, expiredChannelName, msg);
if (listeners == 0) then
break;
end;
end;
end;
# lua脚本大数组分块,5000一组处理,不过由于keyLimit默认100,这里默认情况下只会走到100
for i=1, #expiredKeys1, 5000 do
redis.call('zrem', lastAccessTimeSetName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1))));
redis.call('zrem', maxIdleSetName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1))));
redis.call('zrem', timeoutSetName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1))));
redis.call('hdel', cacheName, unpack(expiredKeys1, i, math.min(i+4999, table.getn(expiredKeys1))));
end;
# 读取超时zscore,逻辑和上面一段类似
local expiredKeys2 = redis.call('zrangebyscore', maxIdleSetName, 0, currentTimeMillis, 'limit', 0, keysLimit);
for i, key in ipairs(expiredKeys2) do
local v = redis.call('hget', cacheName, key);
if v ~= false then
local t, val = struct.unpack('dLc0', v);
local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val);
local listeners = redis.call(publishCommand, expiredChannelName, msg);
if (listeners == 0) then
break;
end;
end;
end;
# 逻辑和上面写入超时部分类似
for i=1, #expiredKeys2, 5000 do
redis.call('zrem', lastAccessTimeSetName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2))));
redis.call('zrem', maxIdleSetName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2))));
redis.call('zrem', timeoutSetName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2))));
redis.call('hdel', cacheName, unpack(expiredKeys2, i, math.min(i+4999, table.getn(expiredKeys2))));
end;
return #expiredKeys1 #expiredKeys2;继续分析EvictionTask
EvictionTask实现Runnable的一个抽象类,通过redisson内部维护HashedWheelTimer去运行来执行清理任务,同时根据最近三次的清理情况控制清理间隔(具体见代码中的注释)。

abstract class EvictionTask implements TimerTask {
private final Logger log = LoggerFactory.getLogger(getClass());
final Deque<Integer> sizeHistory = new LinkedList<>();
final int minDelay;
final int maxDelay;
final int keysLimit;
int delay = 5;
final CommandAsyncExecutor executor;
volatile Timeout timeout;
EvictionTask(CommandAsyncExecutor executor) {
super();
this.executor = executor;
this.minDelay = executor.getServiceManager().getCfg().getMinCleanUpDelay();
this.maxDelay = executor.getServiceManager().getCfg().getMaxCleanUpDelay();
this.keysLimit = executor.getServiceManager().getCfg().getCleanUpKeysAmount();
this.delay = minDelay;
}
public void schedule() {
timeout = executor.getServiceManager().newTimeout(this, delay, TimeUnit.SECONDS);
}
public void cancel() {
timeout.cancel();
}
abstract CompletionStage<Integer> execute();
abstract String getName();
@Override
public void run(Timeout timeout) {
if (executor.getServiceManager().isShuttingDown()) {
return;
}
CompletionStage<Integer> future = execute();
future.whenComplete((size, e) -> {
if (e != null) {
log.error("Unable to evict elements for '{}'", getName(), e);
schedule();
return;
}
log.debug("{} elements evicted. Object name: {}", size, getName());
if (size == -1) {
schedule();
return;
}
if (sizeHistory.size() == 2) {
// first > last > cur 最近三次清理数量递减,延迟调度
if (sizeHistory.peekFirst() > sizeHistory.peekLast()
&& sizeHistory.peekLast() > size) {
delay = Math.min(maxDelay, (int) (delay*1.5));
}
// if (sizeHistory.peekFirst() < sizeHistory.peekLast()
// && sizeHistory.peekLast() < size) {
// prevDelay = Math.max(minDelay, prevDelay/2);
// }
// first == last == cur,最近三次清理数量相同
if (sizeHistory.peekFirst().intValue() == sizeHistory.peekLast()
&& sizeHistory.peekLast().intValue() == size) {
// 最近三次清理都达到最大限制,增加调度频率
if (size >= keysLimit) {
delay = Math.max(minDelay, delay/4);
}
// 最近三次全部为0,减少调度
if (size == 0) {
delay = Math.min(maxDelay, (int) (delay*1.5));
}
}
sizeHistory.pollFirst();
}
sizeHistory.add(size);
// 调度下一轮
schedule();
});
}
}
小结
RMapCache的定时清理任务挺巧秒的,通过最近三次清理数量动态控制清理间隔时间,定时任务调度和Redisson的看门狗机制比较类似,都是在一个任务完成后,立刻调度下一轮。