思考的心情
思考的心情
发布于 2025-04-05 / 12 阅读
0

Redisson RMapCache 核心源码解析

功能分析

先来看一下RMapCache提供的功能

  1. 支持闲置过期时间(多久未读)

  2. 支持写入过期时间

  3. 支持最大缓存容量,并支持指定LRU/LFU淘汰策略

  4. 支持分布式回调

Lua脚本分析

核心lua脚本(keys和argv已用变量名替换)

  1. put

local insertable = false; 
local v = redis.call('hget', cacheName, encodeMapKey); 
# 没有值表示新增
if v == false then 
    insertable = true; 
# 已经有值,但是过期了,判定为新增
else 
    local t, val = struct.unpack('dLc0', v); 
    local expireDate = 92233720368547758; 
    local expireDateScore = redis.call('zscore', timeoutSetName, encodeMapKey); 
    if expireDateScore ~= false then 
        expireDate = tonumber(expireDateScore) 
    end; 
    if t ~= 0 then 
        local expireIdle = redis.call('zscore', idleSetName, encodeMapKey); 
        if expireIdle ~= false then 
            expireDate = math.min(expireDate, tonumber(expireIdle)) 
        end; 
    end; 
    if expireDate <= tonumber(currentTimeMillis) then 
        insertable = true; 
    end; 
end; 

# 设定写入超时时间
if tonumber(ttlTimeout) > 0 then 
    redis.call('zadd', timeoutSetName, ttlTimeout, encodeMapKey); 
else 
    redis.call('zrem', timeoutSetName, encodeMapKey); 
end; 

# 设定闲置过期时间
if tonumber(maxIdleTimeout) > 0 then 
    redis.call('zadd', idleSetName, maxIdleTimeout, encodeMapKey); 
else 
    redis.call('zrem', idleSetName, encodeMapKey); 
end; 

# 监听器,回调使用
local hasListeners = redis.call('hget', optionsName, 'has-listeners'); 

# 缓存是否已满的判断,根据配置的淘汰算法,进行淘汰
# last access time
local maxSize = tonumber(redis.call('hget', optionsName, 'max-size'));  +
if maxSize ~= nil and maxSize ~= 0 then  +
    local currentTime = tonumber(currentTimeMillis);  +
    local lastAccessTimeSetName = lastAccessTimeSetName;  +

    local mode = redis.call('hget', optionsName, 'mode');  +
    # 默认按照LRU方式淘汰,此处写入最近过期时间
    if mode == false or mode == 'LRU' then  +
        redis.call('zadd', lastAccessTimeSetName, currentTime, encodeMapKey);  +
    end;  +

    local cacheSize = tonumber(redis.call('hlen', cacheName));  +
    if cacheSize >= maxSize then  +
        local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize);  +
        for index, lruItem in ipairs(lruItems) do  +
            if lruItem and lruItem ~= encodeMapKey then  +
                local lruItemValue = redis.call('hget', cacheName, lruItem);  +
                redis.call('hdel', cacheName, lruItem);  +
                redis.call('zrem', timeoutSetName, lruItem);  +
                redis.call('zrem', idleSetName, lruItem);  +
                redis.call('zrem', lastAccessTimeSetName, lruItem);  +
                if lruItemValue ~= false and hasListeners ~= false then   +
                    local removedChannelName = removedChannelName;  +
                    local ttl, obj = struct.unpack('dLc0', lruItemValue); +
                    local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(obj), obj); +
                    redis.call(publishCommand, removedChannelName, msg);  +
                end;  +
            end;  +
        end;  +
    end;  +

    # LFU 计数+1
    if mode == 'LFU' then  +
        redis.call('zincrby', lastAccessTimeSetName, 1, encodeMapKey);  +
    end;  +

end; 

# 将闲置过期时间拼接后写入value中
local value = struct.pack('dLc0', maxIdleDelta, string.len(encodeMapValue), encodeMapValue); 
redis.call('hset', cacheName, encodeMapKey, value); 

# pubsub 新增事件,实现分布式回调
if insertable == true then 
    if hasListeners ~= false then 
        local msg = struct.pack('Lc0Lc0', string.len(encodeMapKey), encodeMapKey, string.len(encodeMapValue), encodeMapValue); 
        redis.call(publishCommand, createdChannelName, msg); 
    end;
    return nil;
end; 

# pubsub 修改事件,实现分布式回调
local t, val = struct.unpack('dLc0', v); 
if hasListeners ~= false then 
    local msg = struct.pack('Lc0Lc0Lc0', string.len(encodeMapKey), encodeMapKey, string.len(encodeMapValue), encodeMapValue, string.len(val), val); 
    redis.call(publishCommand, updatedChannelName, msg); 
end;

return val,
  1. get

# 判空
local value = redis.call('hget', cacheName, encodeMapKey); 
if value == false then 
    return nil; 
end; 
# 读取过期时间拼接在value中,此处解开
local t, val = struct.unpack('dLc0', value); 
local expireDate = 92233720368547758;  +
local expireDateScore = redis.call('zscore', timeoutSetName, encodeMapKey); 
if expireDateScore ~= false then 
    expireDate = tonumber(expireDateScore) 
end; 

# 更新过期时间
if t ~= 0 then 
    local expireIdle = redis.call('zscore', idleSetName, encodeMapKey); 
    if expireIdle ~= false then 
        if tonumber(expireIdle) > tonumber(currentTimeMillis) then 
            redis.call('zadd', idleSetName, t tonumber(currentTimeMillis), encodeMapKey); 
        end; 
        expireDate = math.min(expireDate, tonumber(expireIdle)) 
    end; 
end; 
# 过期缓存 返回null
if expireDate <= tonumber(currentTimeMillis) then 
    return nil; 
end; 

# 有容量限制的缓存,LRU淘汰策略更新最新时间,LFU则更新计数
local maxSize = tonumber(redis.call('hget', optionsName, 'max-size'));  +
if maxSize ~= nil and maxSize ~= 0 then  +
    local mode = redis.call('hget', optionsName, 'mode');  +
    if mode == false or mode == 'LRU' then  +
        redis.call('zadd', lastAccessTimeSetName, tonumber(currentTimeMillis), encodeMapKey);  +
    else  +
        redis.call('zincrby', lastAccessTimeSetName, 1, encodeMapKey);  +
    end;  +
end; 
return val; 

小结

RedissonMapCache通过三个zscore维护淘汰策略和过期时间,其中idleSet和timeoutSet用于维护闲置过期和写入多久后过期,lastAccessTimeSet则是在指定最大容量后,根据不同淘汰策略写入不同值,其中lru写入最后一次key的读取时间,lfu则写入读取计数。

同时Redisson通过redis的pubsub服务提供分布式回调,本文只写到了插入和修改缓存后的回调,其实缓存过期后也会回调,后面再写一篇RedissonMapCache的缓存过期机制中再详细说明。

从上面看RedissonMapCache提供了不少实用功能,小功能中用用还不错,不过也有一些坑,后面有空再写一篇说说具体有那些坑。