成人怡红院-成人怡红院视频在线观看-成人影视大全-成人影院203nnxyz-美女毛片在线看-美女免费黄

站長資訊網
最全最豐富的資訊網站

一文聊聊go語言中的限流漏桶和令牌桶庫

本篇文章帶大家聊聊go語言中的限流漏桶和令牌桶庫,介紹令牌桶和漏桶的實現原理以及在實際項目中簡單應用。

一文聊聊go語言中的限流漏桶和令牌桶庫

為什么需要限流中間件?

在大數據量高并發訪問時,經常會出現服務或接口面對大量的請求而導致數據庫崩潰的情況,甚至引發連鎖反映導致整個系統崩潰。或者有人惡意攻擊網站,大量的無用請求出現會導致緩存穿透的情況出現。使用限流中間件可以在短時間內對請求進行限制數量,起到降級的作用,從而保障了網站的安全性。

應對大量并發請求的策略?

  • 使用消息中間件進行統一限制(降速)

  • 使用限流方案將多余請求返回(限流)

  • 升級服務器

  • 緩存(但仍然有緩存穿透等危險)

  • 等等

可以看出在代碼已經無法提升的情況下,只能去提升硬件水平。或者改動架構再加一層!也可以使用消息中間件統一處理。而結合看來,限流方案是一種既不需要大幅改動也不需要高額開銷的策略。

常見的限流方案

  • 令牌桶算法

  • 漏桶算法

  • 滑動窗口算法

  • 等等

漏桶

引入ratelimit庫

go get -u go.uber.org/ratelimit

庫函數源代碼

 // New returns a Limiter that will limit to the given RPS.  func New(rate int, opts ...Option) Limiter {      return newAtomicBased(rate, opts...)  }    // newAtomicBased returns a new atomic based limiter.  func newAtomicBased(rate int, opts ...Option) *atomicLimiter {      // TODO consider moving config building to the implementation      // independent code.      config := buildConfig(opts)      perRequest := config.per / time.Duration(rate)      l := &atomicLimiter{          perRequest: perRequest,          maxSlack:   -1 * time.Duration(config.slack) * perRequest,          clock:      config.clock,      }        initialState := state{          last:     time.Time{},          sleepFor: 0,      }      atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))      return l  }
登錄后復制

該函數使用了函數選項模式多個結構體對象進行初始化

根據傳入的值來初始化一個桶結構體 rateint 傳參 。

初始化過程中包括了

  • 每一滴水需要的時間 perquest = config.per / time.Duration(rate)
  • maxSlack 寬松度(寬松度為負值)-1 * time.Duration(config.slack) * perRequest 松緊度是用來規范等待時間的

 // Clock is the minimum necessary interface to instantiate a rate limiter with  // a clock or mock clock, compatible with clocks created using  // github.com/andres-erbsen/clock.  type Clock interface {     Now() time.Time     Sleep(time.Duration)  }
登錄后復制

同時還需要結構體Clock來記錄當前請求的時間now和此刻的請求所需要花費等待的時間sleep

 type state struct {     last     time.Time     sleepFor time.Duration  }
登錄后復制

state 主要用來記錄上次執行的時間以及當前執行請求需要花費等待的時間(作為中間狀態記錄)

最重要的Take邏輯

 func (t *atomicLimiter) Take() time.Time {     var (        newState state        taken    bool        interval time.Duration     )     for !taken {        now := t.clock.Now()          previousStatePointer := atomic.LoadPointer(&t.state)        oldState := (*state)(previousStatePointer)          newState = state{           last:     now,           sleepFor: oldState.sleepFor,        }         if oldState.last.IsZero() {           taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))           continue        }        // 計算是否需要進行等待取水操作        newState.sleepFor += t.perRequest(每兩滴水之間的間隔時間) - now.Sub(oldState.last)(當前時間與上次取水時間的間隔)                  // 如果等待取水時間特別小,就需要松緊度進行維護        if newState.sleepFor < t.maxSlack {           newState.sleepFor = t.maxSlack        }         // 如果等待時間大于0,就進行更新        if newState.sleepFor > 0 {           newState.last = newState.last.Add(newState.sleepFor)           interval, newState.sleepFor = newState.sleepFor, 0        }        taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))     }     t.clock.Sleep(interval)     // 最后返回需要等待的時間      return newState.last  }
登錄后復制

實現一個Take方法

  • 該Take方法會進行原子性操作(可以理解為加鎖和解鎖),在大量并發請求下仍可以保證正常使用。

  • 記錄下當前的時間 now := t.clock.Now()

  • oldState.last.IsZero()判斷是不是第一次取水,如果是就直接將state結構體中的值進行返回。而這個結構體中初始化了上次執行時間,如果是第一次取水就作為當前時間直接傳參。

  • 如果 newState.sleepFor 非常小,就會出現問題,因此需要借助寬松度,一旦這個最小值比寬松度小,就用寬松度對取水時間進行維護。

  • 如果newState.sleepFor > 0 就直接更新結構體中上次執行時間newState.last = newState.last.Add(newState.sleepFor)并記錄需要等待的時間interval, newState.sleepFor = newState.sleepFor, 0

  • 如果允許取水和等待操作,那就說明沒有發生并發競爭的情況,就模擬睡眠時間t.clock.Sleep(interval)。然后將取水的目標時間進行返回,由服務端代碼來判斷是否打回響應或者等待該時間后繼續響應。

t.clock.Sleep(interval)

 func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
登錄后復制

實際上在一個請求來的時候,限流器就會進行睡眠對應的時間,并在睡眠后將最新取水時間返回。

實際應用(使用Gin框架)

 func ratelimit1() func(ctx *gin.Context) {      r1 := rate1.New(100)      return func(ctx *gin.Context) {          now := time.Now()          //  Take 返回的是一個 time.Duration的時間          if r1.Take().Sub(now) > 0 {              // 返回的時間比當前的時間還大,說明需要進行等待              // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行              // 如果不需要等待請求時間,就直接進行Abort 然后返回              response(ctx, http.StatusRequestTimeout, "rate1 limit...")              fmt.Println("rate1 limit...")              ctx.Abort()              return          }          // 放行          ctx.Next()      }  }
登錄后復制

這里你可以進行選擇是否返回。因為Take一定會執行sleep函數,所以當執行take結束后表示當前請求已經接到了水。當前演示使用第一種情況。

  • 如果你的業務要求響應不允許進行等待。那么可以在該請求接完水之后然后,如上例。

  • 如果你的業務允許響應等待,那么該請求等待對應的接水時間后進行下一步。具體代碼就是將if中的內容直接忽略。(建議使用)

測試代碼

這里定義了一個響應函數和一個handler函數方便測試

 func response(c *gin.Context, code int, info any) {     c.JSON(code, info)  }    func pingHandler(c *gin.Context) {     response(c, 200, "ping ok~")  }
登錄后復制

執行go test -run=Run -v先開啟一個web服務

 func TestRun(t *testing.T) {     r := gin.Default()       r.GET("/ping1", ratelimit1(), pingHandler)     r.GET("/ping2", ratelimit2(), helloHandler)       _ = r.Run(":4399")  }
登錄后復制

使用接口壓力測試工具go-wrk進行測試->tsliwowicz/go-wrk: go-wrk)

在golang引入install版本可以直接通過go install github.com/tsliwowicz/go-wrk@latest下載

使用幫助

    Usage: go-wrk <options> <url>     Options:      -H       Header to add to each request (you can define multiple -H flags) (Default )      -M       HTTP method (Default GET)      -T       Socket/request timeout in ms (Default 1000)      -body    request body string or @filename (Default )      -c       Number of goroutines to use (concurrent connections) (Default 10)      -ca      CA file to verify peer against (SSL/TLS) (Default )      -cert    CA certificate file to verify peer against (SSL/TLS) (Default )      -d       Duration of test in seconds (Default 10)      -f       Playback file name (Default <empty>)      -help    Print help (Default false)      -host    Host Header (Default )      -http    Use HTTP/2 (Default true)      -key     Private key file name (SSL/TLS (Default )      -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)      -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)      -no-vr   Skip verifying SSL certificate of the server (Default false)      -redir   Allow Redirects (Default false)      -v       Print version details (Default false)
登錄后復制

-t 8個線程 -c 400個連接 -n 模擬100次請求 -d 替換-n 表示連接時間

輸入go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1

可以稍微等待一下水流積攢(壓測速度過快)。

一文聊聊go語言中的限流漏桶和令牌桶庫可以看出,89個請求全部返回。也就是說在一段請求高峰期,不會有請求進行響應。因此我認為既然內部已經睡眠,那么就也就應該對請求放行處理。

令牌桶

引入ratelimit

go get -u github.com/juju/ratelimit

初始化

 // NewBucket returns a new token bucket that fills at the  // rate of one token every fillInterval, up to the given  // maximum capacity. Both arguments must be  // positive. The bucket is initially full.  func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {     return NewBucketWithClock(fillInterval, capacity, nil)  }    // NewBucketWithClock is identical to NewBucket but injects a testable clock  // interface.  func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {     return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)  }
登錄后復制

進行Bucket桶的初始化。

 func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {     if clock == nil {        clock = realClock{}     }      // 填充速率     if fillInterval <= 0 {        panic("token bucket fill interval is not > 0")     }      // 最大令牌容量     if capacity <= 0 {        panic("token bucket capacity is not > 0")     }      // 單次令牌生成量     if quantum <= 0 {        panic("token bucket quantum is not > 0")     }     return &Bucket{        clock:           clock,        startTime:       clock.Now(),        latestTick:      0,        fillInterval:    fillInterval,        capacity:        capacity,        quantum:         quantum,        availableTokens: capacity,     }  }
登錄后復制

令牌桶初始化過程,初始化結構體 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三個變量有一個小于或者等于0的話直接進行報錯返回。在最開始就將當前令牌數初始化為最大容量

調用

 // TakeAvailable takes up to count immediately available tokens from the  // bucket. It returns the number of tokens removed, or zero if there are  // no available tokens. It does not block.  func (tb *Bucket) TakeAvailable(count int64) int64 {     tb.mu.Lock()     defer tb.mu.Unlock()     return tb.takeAvailable(tb.clock.Now(), count)  }
登錄后復制

調用TakeAvailable函數,傳入參數為需要取出的令牌數量,返回參數是實際能夠取出的令牌數量。

內部實現

 func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {     // 如果需要取出的令牌數小于等于零,那么就返回0個令牌      if count <= 0 {        return 0     }      // 根據時間對當前桶中令牌數進行計算     tb.adjustavailableTokens(tb.currentTick(now))      // 計算之后的令牌總數小于等于0,說明當前令牌不足取出,那么就直接返回0個令牌     if tb.availableTokens <= 0 {        return 0     }      // 如果當前存儲的令牌數量多于請求數量,那么就返回取出令牌數     if count > tb.availableTokens {        count = tb.availableTokens     }      // 調整令牌數     tb.availableTokens -= count     return count  }
登錄后復制

  • 如果需要取出的令牌數小于等于零,那么就返回0個令牌

  • 根據時間對當前桶中令牌數進行計算

  • 計算之后的令牌總數小于等于0,說明當前令牌不足取出,那么就直接返回0個令牌

  • 如果當前存儲的令牌數量多于請求數量,那么就返回取出令牌數

  • 調整令牌數

調整令牌

 func (tb *Bucket) adjustavailableTokens(tick int64) {     lastTick := tb.latestTick     tb.latestTick = tick      // 如果當前令牌數大于最大等于容量,直接返回最大容量     if tb.availableTokens >= tb.capacity {        return     }      // 當前令牌數 += (當前時間 - 上次取出令牌數的時間) * quannum(每次生成令牌量)     tb.availableTokens += (tick - lastTick) * tb.quantum      // 如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數     if tb.availableTokens > tb.capacity {        tb.availableTokens = tb.capacity     }     return  }
登錄后復制

  • 如果當前令牌數大于最大等于容量,直接返回最大容量

  • 當前令牌數 += (當前時間 – 上次取出令牌數的時間) * quannum(每次生成令牌量)

  • 如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數

實現原理

  • 加鎖 defer 解鎖

  • 判斷count(想要取出的令牌數) 是否小于等于 0,如果是直接返回 0

  • 調用函數adjustTokens 獲取可用的令牌數量

  • 如果當前可以取出的令牌數小于等于0 直接返回 0

  • 如果當前可以取出的令牌數小于當前想要取出的令牌數(count) count = 當前可以取出的令牌數

  • 當前的令牌數 -= 取出的令牌數 (count)

  • 返回 count(可以取出的令牌數)

額外介紹

take函數,能夠返回等待時間和布爾值,允許欠賬,沒有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函數,可以根據最大等待時間來進行判斷。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因為他們內部的實現都基于令牌調整,我這里不做過多介紹,如果感興趣可以自行研究一下。

測試

 func ratelimit2() func(ctx *gin.Context) {      // 生成速率 最大容量      r2 := rate2.NewBucket(time.Second, 200)      return func(ctx *gin.Context) {          //r2.Take() // 允許欠賬,令牌不夠也可以接收請求          if r2.TakeAvailable(1) == 1 {              // 如果想要取出1個令牌并且能夠取出,就放行              ctx.Next()              return          }          response(ctx, http.StatusRequestTimeout, "rate2 limit...")          ctx.Abort()          return      }  }
登錄后復制

一文聊聊go語言中的限流漏桶和令牌桶庫壓測速度過于快速,在實際過程中可以根據調整令牌生成速率來進行具體限流!

小結

令牌桶可以允許自己判斷請求是否繼續,內部不會進行睡眠操作。而漏桶需要進行睡眠,并沒有提供方法讓程序員進行判斷是否放行。

贊(0)
分享到: 更多 (0)
?
網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
国内精品久久人妻无码不卡| 久久精品国产亚洲AV忘忧草18| 极品少妇被猛得白浆直流草莓视频 | 国产亚洲精久久久久久无码7| WWW久久无码天堂MV| 嗯啊开小嫩苞HHH好深男男| 国产精品伦一区二区三级视频| 好硬好大好爽18禁免费看男男 | 亚洲欧美韩国综合色| 中文字幕无码视频手机免费看| 啊~用力CAO我CAO死我视频| 乖我们在办公室试试| 国内精品久久久久久久999| 久久午夜无码鲁丝片午夜精品| 欧美疯狂做受XXXX高潮| 少妇一级无码精品| 亚洲精品成人AV观看| 中文字幕亚洲综合久久综合| 厨房里我扒了岳的内裤| 国产又爽又黄又无遮挡的激情视频| 久久久久亚洲AV成人片乱码| 人妻丰满AV无码久久不卡| 无码喷水一区二区浪潮AV| 色噜噜噜亚洲男人的天堂| 亚洲 欧美 自拍 henhen| 无码色AV一二区在线播放| 亚洲日本高清成人AⅤ片| 99热都是精品久久久久久| 国产成人精品亚洲一区二区三区| 精品久久久久久无码专区不卡| 男女做AJ视频免费的网站| 男人J桶进女人P无遮挡在线观看| 色老99久久九九爱精品| 亚洲精品夜夜夜妓女网| AV网站免费线看| 国产区图片区小说区亚洲区| 老妇饥渴XXHDⅩXXOOO| 色AV综合AV综合无码网站| 亚洲精品无码不卡| 扒开双腿疯狂进出爽爽爽免费 | 国产一区二区波多野结衣| 免费国产黄网站在线观看视频| 熟妇毛耸耸浓密茂盛| 亚洲性色成人AV天堂| 厨房人妻HD中文字幕69XX| 精品色欲少妇一区二区三区 | 久久无码人妻一区二区三区| 老熟妇毛茸茸BBW视频| 色欲久久九色一区二区三区| 我和公GONG在厨房日本电影| 性 偷窥 间谍 tube| 999精产国品一二三产区区| 国产女人被躁到高潮的AV| 男人放进女人里面叫什么| 小东西好几天没弄了还能吃吗| 18禁黄污无遮挡无码网站| 国产精品自产拍在线18禁| 女人被暴躁C到高潮容易怀孕 | 国产人成无码视频在线| 欧美极品少妇×XXXBBB| 亚洲成色WWW久久网站| 波多野结衣在线播放| 国产99久久亚洲综合精品| 另类小说激情婷婷久久| 小雪第一次交换又粗又大老杨| 999久久久国产精品消防器材| 黑人巨大VS苍井空| 日韩精品无码一区二区中文字幕 | 俺去俺来也在线WWW色官网| 精品国产一区二区三区AV性色| 日韩一区二区三区av| 午夜男女爽爽羞羞影院在线观看| 亚洲日韩AV无码一区二区三区人| 丰满年经的继拇6| 女人丝不挂的正面裸体| 亚洲免费福利视频| 国产精华液一线二线三线| 欧美精品亚洲日韩AⅤ| 亚洲色大成网站WWW永久| 国产激情久久久久影院蜜桃AV | 波多野结衣人妻女教师4| 鲁大师在线影院免费观看| 亚洲AV无码一区毛片AV| 在线 亚洲 国产 欧美| 国内国外日产一区二区| 熟女亚洲综合精品伊人久久| EEUSS影院WWW在线观看| 久久亚洲AV成人无码软件| 亚洲AV无码专区成人网址| 顶级欧美RAPPER| 欧美噜噜久久久XXX成人高潮| 野花香高清在线观看视频播放免费| 国产无套码AⅤ在线观看 | 欧美XXXX黑人又粗又大| 亚洲一区二区三区国产精华液| 国产亚洲日韩网曝欧美台湾| 少妇短裙公车被直接进入| 亚洲色精品一区二区三区 | 成年女人18毛片毛片免费不卡| 免费人成视频网站在线18| 亚洲日韩AV无码中文字幕美国| 国产午夜影视大全免费观看 | 日本适合十八岁以上的护肤品一| 亚洲AV中文无码乱人伦在线R | AAA少妇高潮大片免费看088| 老公和小三在车上做我想卖了车| 亚洲精品无码aⅴ中文字幕蜜桃| 国产女人18毛片水真多1| 脱了老师内裤猛烈进入的软件 | 熟妇高潮一区二区麻豆Av渉谷| 被黑人巨茎日出白浆的少妇| 精品久久久中文字幕人妻| 亚洲AV片不卡无码久久| 国产品无码一区二区三区在线蜜桃| 少妇高清精品毛片在线视频| 草莓丝瓜芭乐鸭脖奶茶发型 | 粗壮挺进人妻水蜜桃成熟漫画| 人妻人人澡人人添人人爽人人玩| 2823理论片在线播放| 蜜芽亚洲日韩欧美国产高清ΑV| 一区二区三区AV| 兰姨不敢发出一点声音怕吵醒风| 亚洲色偷偷综合亚洲AV伊人| 久久不见久久见免费影院国语| 亚洲精品无码成人区久久| 娇妻被朋友玩得呻吟在线电影| 亚洲国产成人久久综合| 激情综合婷婷色五月蜜桃| 亚洲女人操BB在线| 久久精品国产精品亚洲| 一本久久伊人热热精品中文字幕| 久久久久久久女国产乱让韩| 婷婷五月综合激情| 国产精品R级最新在线观看| 无遮挡边吃奶边做的视频刺激| 国产老妇伦国产熟女老妇久| 西方137大但人文艺术| 含羞草实验室隐藏路线| 亚洲乱码日产精品BD在线看| 久久精品国产亚洲A∨麻豆| 尤物蜜芽国产成人精品区| 麻花豆传媒剧国产MV| 99精品久久久久中文字幕| 欧美一级一片内射欧美美妇3p| 被黑人猛男强伦姧人妻完整版| 色婷婷AV一区二区三区在线观看| 国产福利一区二区三区在线观看| 性生大片免费观看网站蜜芽| 近親五十路六十被亲子中出| 野花韩国日本高清免费5| 男男GAy作爱免费观看| 爱丫爱丫影院在线观看免费| 熟妇人妻精品一区二区三区颏 | 成人片黄网站色多多WWW| 天天躁日日躁狠狠久久| 成 人 黄 色 网 站 视 频| 爽爽AV浪潮AV一区二区| 国产在线精品国自产拍影院同性| 亚洲日本一线产区二线区| 免费人成视频XVIDEOS| 本免费AV无码专区一区| 无码人妻久久一区二区三区APP| 精品国产AⅤ一区二区三区在线看| 伊人久久大香线蕉AV不卡| 久久ER热在这里只有精品66| 余年周婉小说全文免费阅读完整版| 欧美内射深喉中文字幕| 国产98在线 | 传媒麻豆| 亚洲第一最快AV网站| 男男AV纯肉无码免费播放无码| 东北一家人1一6全文阅读小说| 亚洲AⅤ国产成人AV片妓女| 久久久久久久波多野结衣高潮| CHINESE老熟妇老女人HD| 天堂AV亚洲ITV在线AⅤ| 精品亚洲国产成人蜜臀AV| 亚洲欧美另类久久久精品| 女生让男生随便诵自己的名字| 丰满妇女强高潮18ⅩXXX| 亚洲成AV人片久久| 女被男狂揉吃奶胸60分钟视频 | 啊灬啊灬别停啊灬用力啊免费| 无码AV中文字幕久久AV| 久久婷婷人人澡人人爽人人爱| 波多野AV一区二区无码| 亚洲AV无码片在线播放| 女人爽到高潮的免费视频| 国产精品国产三级国产A| 一本无码人妻在中文字幕免费| 日韩精品久久久免费观看| 成人免费A级毛片无码片在线播放| 人人爽人人澡人人高潮| 国产午睡沙发系列大全| 余年周婉小说全文免费阅读完整版| 日本熟妇人妻XXXXX野外呻| 黄桃AV无码免费一区二区三区 | 99无码精品二区在线视频| 无码高潮少妇毛多水多水免费| 麻花豆传媒色午麻豆| 国产精品免费视频网站|