不依賴外部庫的情況下,限流算法有什么實現的思路?本文介紹了3種實現限流的方式。
一、漏桶算法
- 算法思想 與令牌桶是“反向”的算法,當有請求到來時先放到木桶中,worker以固定的速度從木桶中取出請求進行相應。如果木桶已經滿了,直接返回請求頻率超限的錯誤碼或者頁面
-
適用場景
流量最均勻的限流方式,一般用于流量“整形”,例如保護數據庫的限流。先把對數據庫的訪問加入到木桶中,worker再以db能夠承受的qps從木桶中取出請求,去訪問數據庫。不太適合電商搶購和微博出現熱點事件等場景的限流,一是應對突發流量不是很靈活,二是為每個user_id/ip維護一個隊列(木桶),workder從這些隊列中拉取任務,資源的消耗會比較大。
-
go語言實現
通常使用隊列來實現,在go語言中可以通過buffered channel來快速實現,任務加入channel,開啟一定數量的worker從channel中獲取任務執行。
package main
import (
"fmt"
"sync"
"time"
)
// 每個請求來了,把需要執行的業務邏輯封裝成Task,放入木桶,等待worker取出執行
type Task struct {
handler func() Result // worker從木桶中取出請求對象后要執行的業務邏輯函數
resChan chan Result // 等待worker執行并返回結果的channel
taskID int
}
// 封裝業務邏輯的執行結果
type Result struct {
}
// 模擬業務邏輯的函數
func handler() Result {
time.Sleep(300 * time.Millisecond)
return Result{}
}
func NewTask(id int) Task {
return Task{
handler: handler,
resChan: make(chan Result),
taskID: id,
}
}
// 漏桶
type LeakyBucket struct {
BucketSize int // 木桶的大小
NumWorker int // 同時從木桶中獲取任務執行的worker數量
bucket chan Task // 存方任務的木桶
}
func NewLeakyBucket(bucketSize int, numWorker int) *LeakyBucket {
return &LeakyBucket{
BucketSize: bucketSize,
NumWorker: numWorker,
bucket: make(chan Task, bucketSize),
}
}
func (b *LeakyBucket) validate(task Task) bool {
// 如果木桶已經滿了,返回false
select {
case b.bucket <- task:
default:
fmt.Printf("request[id=%d] is refused ", task.taskID)
return false
}
// 等待worker執行
<-task.resChan
fmt.Printf("request[id=%d] is run ", task.taskID)
return true
}
func (b *LeakyBucket) Start() {
// 開啟worker從木桶拉取任務執行
go func() {
for i := 0; i < b.NumWorker; i++ {
go func() {
for {
task := <-b.bucket
result := task.handler()
task.resChan <- result
}
}()
}
}()
}
func main() {
bucket := NewLeakyBucket(10, 4)
bucket.Start()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task := NewTask(id)
bucket.validate(task)
}(i)
}
wg.Wait()
}
二、令牌桶算法
-
算法思想
想象有一個木桶,以固定的速度往木桶里加入令牌,木桶滿了則不再加入令牌。服務收到請求時嘗試從木桶中取出一個令牌,如果能夠得到令牌則繼續執行后續的業務邏輯;如果沒有得到令牌,直接返回反問頻率超限的錯誤碼或頁面等,不繼續執行后續的業務邏輯
- 特點:由于木桶內只要有令牌,請求就可以被處理,所以令牌桶算法可以支持突發流量。同時由于往木桶添加令牌的速度是固定的,且木桶的容量有上限,所以單位時間內處理的請求書也能夠得到控制,起到限流的目的。假設加入令牌的速度為 1token/10ms,桶的容量為500,在請求比較的少的時候(小于每10毫秒1個請求)時,木桶可以先"攢"一些令牌(最多500個)。當有突發流量時,一下把木桶內的令牌取空,也就是有500個在并發執行的業務邏輯,之后要等每10ms補充一個新的令牌才能接收一個新的請求。
- 參數設置:木桶的容量 - 考慮業務邏輯的資源消耗和機器能承載并發處理多少業務邏輯。生成令牌的速度 - 太慢的話起不到“攢”令牌應對突發流量的效果。
-
適用場景:
適合電商搶購或者微博出現熱點事件這種場景,因為在限流的同時可以應對一定的突發流量。如果采用均勻速度處理請求的算法,在發生熱點時間的時候,會造成大量的用戶無法訪問,對用戶體驗的損害比較大。
-
go語言實現:
假設每100ms生產一個令牌,按user_id/IP記錄訪問最近一次訪問的時間戳 t_last 和令牌數,每次請求時如果 now - last > 100ms, 增加 (now - last) / 100ms個令牌。然后,如果令牌數 > 0,令牌數 -1 繼續執行后續的業務邏輯,否則返回請求頻率超限的錯誤碼或頁面。
package main
import (
"fmt"
"sync"
"time"
)
// 并發訪問同一個user_id/ip的記錄需要上鎖
var recordMu map[string]*sync.RWMutex
func init() {
recordMu = make(map[string]*sync.RWMutex)
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
type TokenBucket struct {
BucketSize int // 木桶內的容量:最多可以存放多少個令牌
TokenRate time.Duration // 多長時間生成一個令牌
records map[string]*record // 報錯user_id/ip的訪問記錄
}
// 上次訪問時的時間戳和令牌數
type record struct {
last time.Time
token int
}
func NewTokenBucket(bucketSize int, tokenRate time.Duration) *TokenBucket {
return &TokenBucket{
BucketSize: bucketSize,
TokenRate: tokenRate,
records: make(map[string]*record),
}
}
func (t *TokenBucket) getUidOrIp() string {
// 獲取請求用戶的user_id或者ip地址
return "127.0.0.1"
}
// 獲取這個user_id/ip上次訪問時的時間戳和令牌數
func (t *TokenBucket) getRecord(uidOrIp string) *record {
if r, ok := t.records[uidOrIp]; ok {
return r
}
return &record{}
}
// 保存user_id/ip最近一次請求時的時間戳和令牌數量
func (t *TokenBucket) storeRecord(uidOrIp string, r *record) {
t.records[uidOrIp] = r
}
// 驗證是否能獲取一個令牌
func (t *TokenBucket) validate(uidOrIp string) bool {
// 并發修改同一個用戶的記錄上寫鎖
rl, ok := recordMu[uidOrIp]
if !ok {
var mu sync.RWMutex
rl = &mu
recordMu[uidOrIp] = rl
}
rl.Lock()
defer rl.Unlock()
r := t.getRecord(uidOrIp)
now := time.Now()
if r.last.IsZero() {
// 第一次訪問初始化為最大令牌數
r.last, r.token = now, t.BucketSize
} else {
if r.last.Add(t.TokenRate).Before(now) {
// 如果與上次請求的間隔超過了token rate
// 則增加令牌,更新last
r.token += max(int(now.Sub(r.last) / t.TokenRate), t.BucketSize)
r.last = now
}
}
var result bool
if r.token > 0 {
// 如果令牌數大于1,取走一個令牌,validate結果為true
r.token--
result = true
}
// 保存最新的record
t.storeRecord(uidOrIp, r)
return result
}
// 返回是否被限流
func (t *TokenBucket) IsLimited() bool {
return !t.validate(t.getUidOrIp())
}
func main() {
tokenBucket := NewTokenBucket(5, 100*time.Millisecond)
for i := 0; i< 6; i++ {
fmt.Println(tokenBucket.IsLimited())
}
time.Sleep(100 * time.Millisecond)
fmt.Println(tokenBucket.IsLimited())
}
三、滑動時間窗口算法
-
算法思想
滑動時間窗口算法,是從對普通時間窗口計數的優化。
使用普通時間窗口時,我們會為每個user_id/ip維護一個KV: uidOrIp: timestamp_requestCount。假設限制1秒1000個請求,那么第100ms有一個請求,這個KV變成 uidOrIp: timestamp_1,遞200ms有1個請求,我們先比較距離記錄的timestamp有沒有超過1s,如果沒有只更新count,此時KV變成 uidOrIp: timestamp_2。當第1100ms來一個請求時,更新記錄中的timestamp并重置計數,KV變成 uidOrIp: newtimestamp_1
普通時間窗口有一個問題,假設有500個請求集中在前1s的后100ms,500個請求集中在后1s的前100ms,其實在這200ms沒就已經請求超限了,但是由于時間窗每經過1s就會重置計數,就無法識別到此時的請求超限。
對于滑動時間窗口,我們可以把1ms的時間窗口劃分成10個time slot, 每個time slot統計某個100ms的請求數量。每經過100ms,有一個新的time slot加入窗口,早于當前時間100ms的time slot出窗口。窗口內最多維護10個time slot,儲存空間的消耗同樣是比較低的。
-
適用場景
與令牌桶一樣,有應對突發流量的能力
-
go語言實現
主要就是實現sliding window算法。可以參考Bilibili開源的kratos框架里circuit breaker用循環列表保存time slot對象的實現,他們這個實現的好處是不用頻繁的創建和銷毀time slot對象。下面給出一個簡單的基本實現:
package main
import (
"fmt"
"sync"
"time"
)
var winMu map[string]*sync.RWMutex
func init() {
winMu = make(map[string]*sync.RWMutex)
}
type timeSlot struct {
timestamp time.Time // 這個timeSlot的時間起點
count int // 落在這個timeSlot內的請求數
}
func countReq(win []*timeSlot) int {
var count int
for _, ts := range win {
count += ts.count
}
return count
}
type SlidingWindowLimiter struct {
SlotDuration time.Duration // time slot的長度
WinDuration time.Duration // sliding window的長度
numSlots int // window內最多有多少個slot
windows map[string][]*timeSlot
maxReq int // win duration內允許的最大請求數
}
func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
SlotDuration: slotDuration,
WinDuration: winDuration,
numSlots: int(winDuration / slotDuration),
windows: make(map[string][]*timeSlot),
maxReq: maxReq,
}
}
// 獲取user_id/ip的時間窗口
func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot {
win, ok := l.windows[uidOrIp]
if !ok {
win = make([]*timeSlot, 0, l.numSlots)
}
return win
}
func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) {
l.windows[uidOrIp] = win
}
func (l *SlidingWindowLimiter) validate(uidOrIp string) bool {
// 同一user_id/ip并發安全
mu, ok := winMu[uidOrIp]
if !ok {
var m sync.RWMutex
mu = &m
winMu[uidOrIp] = mu
}
mu.Lock()
defer mu.Unlock()
win := l.getWindow(uidOrIp)
now := time.Now()
// 已經過期的time slot移出時間窗
timeoutOffset := -1
for i, ts := range win {
if ts.timestamp.Add(l.WinDuration).After(now) {
break
}
timeoutOffset = i
}
if timeoutOffset > -1 {
win = win[timeoutOffset+1:]
}
// 判斷請求是否超限
var result bool
if countReq(win) < l.maxReq {
result = true
}
// 記錄這次的請求數
var lastSlot *timeSlot
if len(win) > 0 {
lastSlot = win[len(win)-1]
if lastSlot.timestamp.Add(l.SlotDuration).Before(now) {
lastSlot = &timeSlot{timestamp: now, count: 1}
win = append(win, lastSlot)
} else {
lastSlot.count++
}
} else {
lastSlot = &timeSlot{timestamp: now, count: 1}
win = append(win, lastSlot)
}
l.storeWindow(uidOrIp, win)
return result
}
func (l *SlidingWindowLimiter) getUidOrIp() string {
return "127.0.0.1"
}
func (l *SlidingWindowLimiter) IsLimited() bool {
return !l.validate(l.getUidOrIp())
}
func main() {
limiter := NewSliding(100*time.Millisecond, time.Second, 10)
for i := 0; i < 5; i++ {
fmt.Println(limiter.IsLimited())
}
time.Sleep(100 * time.Millisecond)
for i := 0; i < 5; i++ {
fmt.Println(limiter.IsLimited())
}
fmt.Println(limiter.IsLimited())
for _, v := range limiter.windows[limiter.getUidOrIp()] {
fmt.Println(v.timestamp, v.count)
}
fmt.Println("a thousand years later...")
time.Sleep(time.Second)
for i := 0; i < 7; i++ {
fmt.Println(limiter.IsLimited())
}
for _, v := range limiter.windows[limiter.getUidOrIp()] {
fmt.Println(v.timestamp, v.count)
}
}
原文標題:幾種限流算法的go語言實現
文章出處:【微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
審核編輯:湯梓紅
聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。
舉報投訴
-
算法
+關注
關注
23文章
4612瀏覽量
92901 -
go語言
+關注
關注
1文章
158瀏覽量
9049
原文標題:幾種限流算法的go語言實現
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
常用限流方式分析 怎么設計出高并發限流方案
,而對于超過限制的流量,則通過拒絕服務的方式保證整體系統的可用性。 根據限流作用范圍,可以分為 單機限流和分布式限流 ;根據限流
限流方案常用算法 常用的限流方案
需要注意的是借助Redis實現的限流方案可用于分布式系統,而guava實現的限流只能應用于單機環境。如果你覺得服務器端限流麻煩,可以在不改任
發表于 04-08 10:50
?423次閱讀
評論