Redis を使用して固定ウィンドウを実装するのは比較的簡単です。主な理由は、同時に存在する固定ウィンドウが 1 つだけであるためです。ウィンドウに入るときに、pexpire
コマンドを使用して有効期限をウィンドウ時間サイズに設定し、有効期限とともにウィンドウが期限切れになるようにします。同時に、incr を使用します。
コマンドを使用してウィンドウ数を増やします。
counter==1
の場合にウィンドウの有効期限を設定する必要があるため、アトミック性を確保するために、単純な Lua
スクリプト実装を使用します。
const fixedWindowLimiterTryAcquireRedisScript = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) -- 获取原始值 local counter = tonumber(redis.call("get", KEYS[1])) if counter == nil then counter = 0 end -- 若到达窗口请求上限,请求失败 if counter >= limit then return 0 end -- 窗口值+1 redis.call("incr", KEYS[1]) if counter == 0 then redis.call("pexpire", KEYS[1], window) end return 1 `
Redis の hash
を使用して各小さなウィンドウの数を保存し、各リクエストですべての ## が保存されます。 #有効なウィンドウ数 が
count まで累積され、
hdel を使用して無効なウィンドウを削除し、最終的にウィンドウの総数が上限を超えているかどうかを判断します。
hash の走査であり、時間計算量は O(N)、N は小さなウィンドウの数です。したがって、小さな窓はあまり多くしないほうがよいでしょう。
package redis import ( "context" "errors" "github.com/go-redis/redis/v8" "time" ) // FixedWindowLimiter 固定窗口限流器 type FixedWindowLimiter struct { limit int // 窗口请求上限 window int // 窗口时间大小 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewFixedWindowLimiter(client *redis.Client, limit int, window time.Duration) (*FixedWindowLimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.Millisecond != 0 { return nil, errors.New("the window uint must not be less than millisecond") } return &FixedWindowLimiter{ limit: limit, window: int(window / time.Millisecond), client: client, script: redis.NewScript(fixedWindowLimiterTryAcquireRedisScript), }, nil } func (l *FixedWindowLimiter) TryAcquire(ctx context.Context, resource string) error { success, err := l.script.Run(ctx, l.client, []string{resource}, l.window, l.limit).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
const slidingWindowLimiterTryAcquireRedisScriptHashImpl = ` -- ARGV[1]: 窗口时间大小 -- ARGV[2]: 窗口请求上限 -- ARGV[3]: 当前小窗口值 -- ARGV[4]: 起始小窗口值 local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local currentSmallWindow = tonumber(ARGV[3]) local startSmallWindow = tonumber(ARGV[4]) -- 计算当前窗口的请求总数 local counters = redis.call("hgetall", KEYS[1]) local count = 0 for i = 1, #(counters) / 2 do local smallWindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallWindow < startSmallWindow then redis.call("hdel", KEYS[1], smallWindow) else count = count + counter end end -- 若到达窗口请求上限,请求失败 if count >= limit then return 0 end -- 若没到窗口请求上限,当前小窗口计数器+1,请求成功 redis.call("hincrby", KEYS[1], currentSmallWindow, 1) redis.call("pexpire", KEYS[1], window) return 1 `
list を使用して時間計算量を最適化できます。リストの構造は次のとおりです:
[counter, smallWindow1, count1, smallWindow2, count2, smallWindow3, count3...]
#5. 長さが 1 より大きい場合、最後から 2 番目の要素
を取得します7.counter 1
8.Return success
package redis import ( "context" "errors" "github.com/go-redis/redis/v8" "time" ) // SlidingWindowLimiter 滑动窗口限流器 type SlidingWindowLimiter struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallWindow int64 // 小窗口时间大小 smallWindows int64 // 小窗口数量 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewSlidingWindowLimiter(client *redis.Client, limit int, window, smallWindow time.Duration) ( *SlidingWindowLimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.Millisecond != 0 || smallWindow%time.Millisecond != 0 { return nil, errors.New("the window uint must not be less than millisecond") } // 窗口时间必须能够被小窗口时间整除 if window%smallWindow != 0 { return nil, errors.New("window cannot be split by integers") } return &SlidingWindowLimiter{ limit: limit, window: int64(window / time.Millisecond), smallWindow: int64(smallWindow / time.Millisecond), smallWindows: int64(window / smallWindow), client: client, script: redis.NewScript(slidingWindowLimiterTryAcquireRedisScriptHashImpl), }, nil } func (l *SlidingWindowLimiter) TryAcquire(ctx context.Context, resource string) error { // 获取当前小窗口值 currentSmallWindow := time.Now().UnixMilli() / l.smallWindow * l.smallWindow // 获取起始小窗口值 startSmallWindow := currentSmallWindow - l.smallWindow*(l.smallWindows-1) success, err := l.script.Run( ctx, l.client, []string{resource}, l.window, l.limit, currentSmallWindow, startSmallWindow).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
アルゴリズムはすべて操作
list先頭または末尾であるため、時間計算量は O(1 )リーキーバケットアルゴリズム
を使用してこれら 2 つの値を保存します。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:plain;">const slidingWindowLimiterTryAcquireRedisScriptListImpl = `
-- ARGV[1]: 窗口时间大小
-- ARGV[2]: 窗口请求上限
-- ARGV[3]: 当前小窗口值
-- ARGV[4]: 起始小窗口值
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local currentSmallWindow = tonumber(ARGV[3])
local startSmallWindow = tonumber(ARGV[4])
-- 获取list长度
local len = redis.call("llen", KEYS[1])
-- 如果长度是0,设置counter,长度+1
local counter = 0
if len == 0 then
redis.call("rpush", KEYS[1], 0)
redis.call("pexpire", KEYS[1], window)
len = len + 1
else
-- 如果长度大于1,获取第二第个元素
local smallWindow1 = tonumber(redis.call("lindex", KEYS[1], 1))
counter = tonumber(redis.call("lindex", KEYS[1], 0))
-- 如果该值小于起始小窗口值
if smallWindow1 < startSmallWindow then
local count1 = redis.call("lindex", KEYS[1], 2)
-- counter-第三个元素的值
counter = counter - count1
-- 长度-2
len = len - 2
-- 删除第二第三个元素
redis.call("lrem", KEYS[1], 1, smallWindow1)
redis.call("lrem", KEYS[1], 1, count1)
end
end
-- 若到达窗口请求上限,请求失败
if counter >= limit then
return 0
end
-- 如果长度大于1,获取倒数第二第一个元素
if len > 1 then
local smallWindown = tonumber(redis.call("lindex", KEYS[1], -2))
-- 如果倒数第二个元素小窗口值大于等于当前小窗口值
if smallWindown >= currentSmallWindow then
-- 把倒数第二个元素当成当前小窗口(因为它更新),倒数第一个元素值+1
local countn = redis.call("lindex", KEYS[1], -1)
redis.call("lset", KEYS[1], -1, countn + 1)
else
-- 否则,添加新的窗口值,添加新的计数(1),更新过期时间
redis.call("rpush", KEYS[1], currentSmallWindow, 1)
redis.call("pexpire", KEYS[1], window)
end
else
-- 否则,添加新的窗口值,添加新的计数(1),更新过期时间
redis.call("rpush", KEYS[1], currentSmallWindow, 1)
redis.call("pexpire", KEYS[1], window)
end
-- counter + 1并更新
redis.call("lset", KEYS[1], 0, counter + 1)
return 1
`</pre><div class="contentsignin">ログイン後にコピー</div></div><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:plain;">const leakyBucketLimiterTryAcquireRedisScript = `
-- ARGV[1]: 最高水位
-- ARGV[2]: 水流速度/秒
-- ARGV[3]: 当前时间(秒)
local peakLevel = tonumber(ARGV[1])
local currentVelocity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local lastTime = tonumber(redis.call("hget", KEYS[1], "lastTime"))
local currentLevel = tonumber(redis.call("hget", KEYS[1], "currentLevel"))
-- 初始化
if lastTime == nil then
lastTime = now
currentLevel = 0
redis.call("hmset", KEYS[1], "currentLevel", currentLevel, "lastTime", lastTime)
end
-- 尝试放水
-- 距离上次放水的时间
local interval = now - lastTime
if interval > 0 then
-- 当前水位-距离上次放水的时间(秒)*水流速度
local newLevel = currentLevel - interval * currentVelocity
if newLevel < 0 then
newLevel = 0
end
currentLevel = newLevel
redis.call("hmset", KEYS[1], "currentLevel", newLevel, "lastTime", now)
end
-- 若到达最高水位,请求失败
if currentLevel >= peakLevel then
return 0
end
-- 若没有到达最高水位,当前水位+1,请求成功
redis.call("hincrby", KEYS[1], "currentLevel", 1)
redis.call("expire", KEYS[1], peakLevel / currentVelocity)
return 1
`</pre><div class="contentsignin">ログイン後にコピー</div></div>
トークンバケット
package redis import ( "context" "github.com/go-redis/redis/v8" "time" ) // LeakyBucketLimiter 漏桶限流器 type LeakyBucketLimiter struct { peakLevel int // 最高水位 currentVelocity int // 水流速度/秒 client *redis.Client // Redis客户端 script *redis.Script // TryAcquire脚本 } func NewLeakyBucketLimiter(client *redis.Client, peakLevel, currentVelocity int) *LeakyBucketLimiter { return &LeakyBucketLimiter{ peakLevel: peakLevel, currentVelocity: currentVelocity, client: client, script: redis.NewScript(leakyBucketLimiterTryAcquireRedisScript), } } func (l *LeakyBucketLimiter) TryAcquire(ctx context.Context, resource string) error { // 当前时间 now := time.Now().Unix() success, err := l.script.Run(ctx, l.client, []string{resource}, l.peakLevel, l.currentVelocity, now).Bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return ErrAcquireFailed } return nil }
const tokenBucketLimiterTryAcquireRedisScript = ` -- ARGV[1]: 容量 -- ARGV[2]: 发放令牌速率/秒 -- ARGV[3]: 当前时间(秒) local capacity = tonumber(ARGV[1]) local rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local lastTime = tonumber(redis.call("hget", KEYS[1], "lastTime")) local currentTokens = tonumber(redis.call("hget", KEYS[1], "currentTokens")) -- 初始化 if lastTime == nil then lastTime = now currentTokens = capacity redis.call("hmset", KEYS[1], "currentTokens", currentTokens, "lastTime", lastTime) end -- 尝试发放令牌 -- 距离上次发放令牌的时间 local interval = now - lastTime if interval > 0 then -- 当前令牌数量+距离上次发放令牌的时间(秒)*发放令牌速率 local newTokens = currentTokens + interval * rate if newTokens > capacity then newTokens = capacity end currentTokens = newTokens redis.call("hmset", KEYS[1], "currentTokens", newTokens, "lastTime", now) end -- 如果没有令牌,请求失败 if currentTokens == 0 then return 0 end -- 果有令牌,当前令牌-1,请求成功 redis.call("hincrby", KEYS[1], "currentTokens", -1) redis.call("expire", KEYS[1], capacity / rate) return 1 `
スライディング ログ
以上がGo+Redis を使用して一般的な電流制限アルゴリズムを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。