首页 文章详情


Go语言精选 | 393 2021-10-08 21:24 0 0 0
UniSMS (合一短信)

这是golang 源码中实现的限流器,是基于令牌桶算法的:

官方地址: golang.org/x/time/rate


   r := rate.Every(100 * time.Millisecond)   limit := rate.NewLimiter(r, 20)   for {       if limit.AllowN(time.Now(), 8) {           log.Info("log:event happen")       } else {           log.Info("log:event not allow")       }

一秒内产生10 个令牌,桶的容量是20,当前时刻取8个token




// NewLimiter returns a new Limiter that allows events up to rate r and permits// bursts of at most b tokens.func NewLimiter(r Limit, b int) *Limiter {  return &Limiter{    limit: r,    burst: b,  }}


type Limiter struct {  mu     sync.Mutex  limit  Limit  burst  int  tokens float64  // last is the last time the limiter's tokens field was updated  last time.Time  // lastEvent is the latest time of a rate-limited event (past or future)  lastEvent time.Time}


func Every(interval time.Duration) Limit {  if interval <= 0 {    return Inf  }  return 1 / Limit(interval.Seconds())}



// Allow is shorthand for AllowN(time.Now(), 1).func (lim *Limiter) Allow() bool {  return lim.AllowN(time.Now(), 1)}
// AllowN reports whether n events may happen at time now.// Use this method if you intend to drop / skip events that exceed the rate limit.// Otherwise use Reserve or Wait.func (lim *Limiter) AllowN(now time.Time, n int) bool { return lim.reserveN(now, n, 0).ok}


// reserveN is a helper method for AllowN, ReserveN, and WaitN.// maxFutureReserve specifies the maximum reservation wait duration allowed.// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {  lim.mu.Lock()
if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } }
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request. tokens -= float64(n)
// Calculate the wait duration var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) }
// Decide result ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) }
// Update state if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last }
lim.mu.Unlock() return r}

1,如果lim.limit == Inf,返回Reservation对象

// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.// A Reservation may be canceled, which may enable the Limiter to permit additional events.type Reservation struct {  ok        bool  lim       *Limiter  tokens    int  timeToAct time.Time  // This is the Limit at reservation time, it can change later.  limit Limit}


// advance calculates and returns an updated state for lim resulting from the passage of time.// lim is not changed.// advance requires that lim.mu is held.func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {  last := lim.last  if now.Before(last) {    last = now  }
// Calculate the new number of tokens, due to time that passed. elapsed := now.Sub(last) delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta if burst := float64(lim.burst); tokens > burst { tokens = burst } return now, last, tokens}



 delta := lim.limit.tokensFromDuration(elapsed)
type Limit float64// tokensFromDuration is a unit conversion function from a time duration to the number of tokens// which could be accumulated during that duration at a rate of limit tokens per second.func (limit Limit) tokensFromDuration(d time.Duration) float64 {  return d.Seconds() * float64(limit)}







ok := n <= lim.burst && waitDuration <= maxFutureReserve







1,AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 token,false。也就是前面介绍的方法。

func (lim *Limiter) Allow() boolfunc (lim *Limiter) AllowN(now time.Time, n int) bool

2,当使用 Wait 方法消费 token 时,如果此时桶内 token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 token 满足条件。如果充足则直接返回。

func (lim *Limiter) Wait(ctx context.Context) (err error)func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
// WaitN blocks until lim permits n events to happen.// It returns an error if n exceeds the Limiter's burst size, the Context is// canceled, or the expected wait time exceeds the Context's Deadline.// The burst limit is ignored if the rate limit is Inf.func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {  lim.mu.Lock()  burst := lim.burst  limit := lim.limit  lim.mu.Unlock()
if n > burst && limit != Inf { return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) } // Check if ctx is already cancelled select { case <-ctx.Done(): return ctx.Err() default: } // Determine wait limit now := time.Now() waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) } // Reserve r := lim.reserveN(now, n, waitLimit) if !r.ok { return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) } // Wait if necessary delay := r.DelayFrom(now) if delay == 0 { return nil } t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: // We can proceed. return nil case <-ctx.Done(): // Context was canceled before we could proceed. Cancel the // reservation, which may permit other events to proceed sooner. r.Cancel() return ctx.Err() }}



C,调用r := lim.reserveN(now, n, waitLimit) 获取Reserve对象



// DelayFrom returns the duration for which the reservation holder must wait// before taking the reserved action.  Zero duration means act immediately.// InfDuration means the limiter cannot grant the tokens requested in this// Reservation within the maximum wait time.func (r *Reservation) DelayFrom(now time.Time) time.Duration {  if !r.ok {    return InfDuration  }  delay := r.timeToAct.Sub(now)  if delay < 0 {    return 0  }  return delay}


3,ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 token 是否充足,都会返回一个 Reservation * 对象。

你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间。如果等待时间为 0,则说明不用等待。


或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 token 归还。

func (lim *Limiter) Reserve() *Reservationfunc (lim *Limiter) ReserveN(now time.Time, n int) *Reservation


func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {  r := lim.reserveN(now, n, InfDuration)  return &r}



我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

good-icon 0
favorite-icon 0
回复数量: 0