# go-component **Repository Path**: biturd/go-component ## Basic Information - **Project Name**: go-component - **Description**: go造轮子 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-07-04 - **Last Updated**: 2021-07-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 实现熔断器的基本思路 - 划分时间窗口,设置判断条件 - 请求进来判断是否满足熔断条件【并且拒绝or处理】 - 请求处理完后,统计时间窗口内请求失败率、延迟不达标率、请求数等指标 熔断器中最关键的部分是计数器和判断条件,这是因为它们为熔断器提供了判断依据。所以,为了实现计数器,我基于 int64 定义了一个 Counter 类型,并用原子操作为它实现了用于自增的 Add 方法、用于获取当前计数的 Load 方法、用于重置的 Reset 方法。具体代码如下: ```go type Counter int64 func (c *Counter) Add() int64 { return atomic.AddInt64((*int64)(c), 1) } func (c *Counter) Load() int64 { return atomic.LoadInt64((*int64)(c)) } func (c *Counter) Reset() { atomic.StoreInt64((*int64)(c), 0) } ``` 并且我还定义了一个 CircuitBreaker 结构体用于实现熔断器,它主要有这些字段:请求计数器 totalCounter、失败请求计数器 failsCounter、时间窗口 duration、最大延迟限制 latencyLimit、最大请求数限制 totalLimit、最大失败率限制 failsRateLimit、恢复请求的最低失败率 recoverFailsRate、时间窗口开始时间 lastTime、当前是否允许请求执行 allow。代码如下: ```go type CircuitBreaker struct { totalCounter Counter failsCounter Counter duration int64 latencyLimit int64 totalLimit int64 failsRateLimit int64 recoverFailsRate int64 lastTime int64 allow int64 } ``` 然后我实现了一个函数 NewCircuitBreaker 用来根据参数创建一个熔断器,并且定义了一个类型 CBOption 来实现可扩展的变参。具体代码如下 ```go type CBOption func(cb *CircuitBreaker) const ( minDuration = 100 minTotal = 1000 minFailsRate = 2 ) func WithDuration(duration int64) CBOption { return func(cb *CircuitBreaker) { cb.duration = duration } } func WithLatencyLimit(latencyLimit int64) CBOption { return func(cb *CircuitBreaker) { cb.latencyLimit = latencyLimit } } func WithFailsLimit(failsRateLimit int64) CBOption { return func(cb *CircuitBreaker) { cb.failsRateLimit = failsRateLimit } } func WithTotalLimit(totalLimit int64) CBOption { return func(cb *CircuitBreaker) { cb.totalLimit = totalLimit } } func NewCircuitBreaker(opts ...CBOption) *CircuitBreaker { cb := &CircuitBreaker{ totalCounter: 0, failsCounter: 0, duration: 0, lastTime: 0, failsRateLimit: 0, latencyLimit: 0, totalLimit: 0, allow: 1, } for _, opt := range opts { opt(cb) } if cb.duration < minDuration { cb.duration = minDuration } if cb.totalLimit < minTotal { cb.totalLimit = minTotal } if cb.failsRateLimit < minFailsRate { cb.failsRateLimit = minFailsRate } cb.recoverFailsRate = cb.failsRateLimit / 2 return cb } ``` **熔断器最核心的是 Allow 方法,它支持传入一个类型为函数的参数 f ,返回该请求是否允许执行。** Allow 方法先更新请求计数器和时间窗口,并判断当前是否满足执行条件。如果不满足条件则返回 false;如果满足条件,则更新当前执行状态,并根据参数 f 的执行结果来更新失败计数器。代码如下: ```go func (cb *CircuitBreaker) Allow(f func() bool) bool { fails := cb.failsCounter.Load() total := cb.totalCounter.Load() start := time.Now().UnixNano() / int64(time.Millisecond) if start > cb.lastTime+cb.duration { atomic.StoreInt64(&cb.lastTime, start) cb.failsCounter.Reset() cb.totalCounter.Reset() atomic.StoreInt64(&cb.allow, 1) } cb.totalCounter.Add() allow := !(total > 0 && fails*100/cb.failsRateLimit >= total || total >= cb.totalLimit) if atomic.LoadInt64(&cb.allow) == 0 { if fails*100/cb.recoverFailsRate > total { allow = false } else if allow { atomic.StoreInt64(&cb.allow, 1) } } else if !allow { atomic.StoreInt64(&cb.allow, 0) } if !allow { logrus.Error("not allowed") return false } ok := f() end := time.Now().UnixNano() / int64(time.Millisecond) if (cb.latencyLimit > 0 && end-start >= cb.latencyLimit) || !ok { cb.failsCounter.Add() } return true } ``` 接下来,我在 interfaces/api/middlewares/circuit_break.go 中实现熔断器中间件生成函数 NewCircuitBreakMiddleware,它主要是根据传入的熔断器返回一个熔断器中间件,在该中间件中调用熔断器的 Allow 方法,以此来控制当前请求是否需要熔断。如果发生熔断,则将请求的返回状态设置为 http.StatusServiceUnavailable,立刻中断请求并返回结果。代码如下: ```go func NewCircuitBreakMiddleware(cb *utils.CircuitBreaker) gin.HandlerFunc { return func(c *gin.Context) { ok := cb.Allow(func() bool { c.Next() if c.Writer.Status() >= http.StatusInternalServerError { return false } return true }) if !ok { c.AbortWithStatus(http.StatusServiceUnavailable) } } } ```