一周一package(2024版本)
Opened this issue · 3 comments
001 failsafe-go(2024W30)
https://github.com/failsafe-go/failsafe-go
熔断, 限流,降级 可谓是后端高可用的三件套, 故障复盘基本都会归结到这三点+告警缺失. failsafe-go 是个把相关组件封装的比较好的packege, 主要提供了以下几个package.
- Failure handling: Retry, Fallback
- Load limiting: Circuit Breaker, Bulkhead, Rate Limiter, Cache
- Time limiting: Timeout, Hedge
推荐的执行顺序是:
其也符合高可用的目标: 熔断—>重试 —> 降级. 从上面的结构可以看出这里的关键是最内层的func执行完之后基于一定的policy去做上层的执行, 具体 Policy 的接口如下:
type FailurePolicyBuilder[S any, R any] interface {
// HandleErrors specifies the errors to handle as failures. Any errors that evaluate to true for errors.Is and the
// execution error will be handled.
HandleErrors(errors ...error) S
// HandleResult specifies the results to handle as failures. Any result that evaluates to true for reflect.DeepEqual and
// the execution result will be handled. This method is only considered when a result is returned from an execution, not
// when an error is returned.
HandleResult(result R) S
// HandleIf specifies that a failure has occurred if the predicate matches the execution result or error.
HandleIf(predicate func(R, error) bool) S
// OnSuccess registers the listener to be called when the policy determines an execution attempt was a success.
OnSuccess(listener func(ExecutionEvent[R])) S
// OnFailure registers the listener to be called when the policy determines an execution attempt was a failure, and may
// be handled.
OnFailure(listener func(ExecutionEvent[R])) S
}
通过定制特定的policy 可以实现很灵活的定制. 下面就对每种模式做一个简单的整理.
Retry
重试这里最关键的一点是避免无脑的定时重试, 这种很容易演变成ddos 攻击. 在retry policy 可以通过
WithMaxRetries, WithBackoff
来指定最大重试次数和重试策略(这里比较推荐的是backoff算法).
// Retry on ErrConnecting up to 3 times with a 1 second delay between attempts
retryPolicy := retrypolicy.Builder[Connection]().
HandleErrors(ErrConnecting).
WithDelay(time.Second).
WithMaxRetries(3).
Build()
// Get with retries
connection, err := failsafe.Get(Connect, retryPolicy)
Circuit Breaker
熔断的目的就是在系统超过负载(已经返回了错误)的时候在请求侧做一层保护,避免系统进一步的恶化, failsafe-go 提供了两种模式的熔断 time based 和counter based. 当最近请求失败超过阈值进入熔断器 Opened 的状态, 再过一段时间之后进入half-open 状态, 这种状态下会处理一部分请求, 如果继续成功就进入closed 状态(全量放行), 如果在half-open 状态检测到失败就继续回到open 状态.
Closed——> Opened —delay—> Half-opened
// Opens after 5 failures, half-opens after 1 minute, closes after 2 successes
breaker := circuitbreaker.Builder[any]().
HandleErrors(ErrSending).
WithFailureThreshold(5).
WithDelay(time.Minute).
WithSuccessThreshold(2).
Build()
// Run with circuit breaking
err := failsafe.Run(SendMessage, breaker)
failsafe-go 也是提供了比较灵活的配置:
// 最近5个请求,3个失败进入open 状态
builder.WithFailureThresholdRatio(3, 5)
// 一分钟内有三个失败进入open 状态
builder.WithFailureThresholdPeriod(3, time.Minute)
// 1分钟内 20个请求, 其中5个失败, 进入open 状态
builder.WithFailureRateThreshold(20, 5, time.Minute)
// Half-open 状态, 持续30s
builder.WithDelay(30*time.Second)
//在half-open 状态 进入close 状态的配置, 否则再次进入open
builder.WithSuccessThreshold(5)
builder.WithSuccessThresholdRatio(3, 5)
Rate Limiter
熔断是检测到异常之后出于保护系统的目的俩降低请求数, 限流的目的则是避免系统进入overload 状态. failsafe-go 提供了两种模式的ratelimit, smooth 模式和bursty 模式.
// 1s 100个请求, 也就是10ms 1个请求, 保证100个请求在1s内相对平滑, 10ms 之内超过1个则进行限制
limiter := ratelimiter.Smooth(100, time.Second)
// 1s 最多处理10个请求(可以是瞬时的), 超过10个则限制
limiter := ratelimiter.Bursty(10, time.Second)
默认情况下达到ratelimit 之后会直接返回 ratelimiter.ErrExceeded, 不过你也可以配置一个最大wait间隔, 使其后面有机会执行:
// Wait up to 1 second for execution permission
builder.WithMaxWaitTime(time.Second)
TimeOut
timeout 机制基本就是context 的Timeout机制差不多,不同点在用 failsafe里你可以和其他机制打配合.
// Timeout after 10 seconds
timeout := timeout.With[any](10*time.Second)
err := failsafe.Run(Connect, timeout)
Fallback
在有主备的地方使用Fallback 就比较合适, 主挂了, 使用备
connection, err := failsafe.Get(Connect, fallback)
Hedge
类似赛马机制, 同时放n个请求到后端, 以第一个返回左右结果, 其他的request 直接cancel掉, 这种模式虽然会提高相应速度但是会增加后端整体的负载, 在使用时还需谨慎.
// Hedge up to 2 times with a 1 second delay between attempts
hedgePolicy := hedgepolicy.BuilderWithDelay[any](time.Second).
WithMaxHedges(2).
Build()
// Run with hedges
err := failsafe.Run(SendRequest, hedgePolicy)
Bulkhead
并发控制, 尤其是go func() xxx 只需要控制在10个 goroutine的场景
// Permit 10 concurrent executions
bulkhead := bulkhead.With[any](10)
err := failsafe.Run(SendRequest, bulkhead)
// 默认超过10个之后会立马返回错误(ErrFull), 当然这里可以设置wait time
bulkhead := bulkhead.Builder[any](10).
WithMaxWaitTime(time.Second).
Build()
这里maxWaitTimeout 也是很容易遇到坑, for loop 限制 go routine 并发的数量多场景还是推荐其他package.
Cache
不是特别推荐, 有更好的package.
总结
整理来看除 Cache, Bulkhead 之外的几个机制都可以尝试. 整体代码使用范型, 通用性也比较强.
002 conc(2024W32)
https://github.com/sourcegraph/conc
golang 确实是入门很快, 精通很难的语言. 尤其是在并发场景下很容易写出go routine泄漏的代码, souregraph 的这个库的目标也是和这个相关:
- Not cleaning up resources correctly.
- Causing deadlocks.
- Crashing the entire program because of a panic in a single goroutine.
这三点老手写代码的时候也是很容易犯错, 通过一个 package 来规范这些也是日常所需.
conc在处理这块儿代码确实很简洁(少即是多, 减少犯错的空间):
通过pool来控制并发就更方便了:
func fetchLastNames_pool(ctx context.Context, firstNames []string) ([]string, error) {
p := pool.NewWithResults[string]().WithContext(ctx)
for _, firstName := range firstNames {
firstName := firstName
p.Go(func(ctx context.Context) (string, error) {
return fetchLastName(ctx, firstName)
})
}
return p.Wait()
}
使用Iter 就更丝滑:
func fetchLastNames2(ctx context.Context, firstNames []string) ([]string, error) {
return iter.MapErr(firstNames, func(firstName *string) (string, error) {
return fetchLastName(ctx, *firstName)
})
}
不用太多冗余的代码(控制写入的并发控制), 也可以通过自定义MapIter的方式控制并发数量:
input := []int{1, 2, 3, 4}
mapper := Mapper[int, bool]{
MaxGoroutines: len(input) / 2,
}
results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
另外也提供了通过stream模式来请求数据(异步网络请求+尽快将结果显示给用户的场景):
func streamFileContents(ctx context.Context, fileNames <-chan string, fileContents chan<- string) {
s := stream.New()
for fileName := range fileNames {
fileName := fileName
s.Go(func() stream.Callback {
contents := fetchFileContents(ctx, fileName)
return func() { fileContents <- contents }
})
}
s.Wait()
}
总结:
pool, iter, stream 基本能满足go routine的并发控制
003 otter(2024W33)
https://github.com/maypok86/otter
缓存作为一种提高性能的利器, 在各种性能优化场合都是广泛提及. 也是有各种实现:
缓存淘汰方面基本上也是依赖LRU做缓存淘汰策略. 而LRU基本都是通过一个双链表+map 实现, 其在多读的情况性能相对比较差(缓存命时移动到表头, 也伴随着锁):
于是有了基于FIFO的各种改进(ARC,2Q,LIRS, TinyLFU), s3-fifo 基于观测数据(大部份缓存数据实际上只访问了一次)重新设计的一种缓存淘汰策略. 将这部分key快速从缓存中淘汰掉.
通过FIFO 避免了链表的复杂操作, 元素在插入时先放small FIFO这样可以快速淘汰(基于之前的统计72%的元素只访问一次).
otter 就是其中一个实现, 其在75% read, 25% writer 的场景下性能很很多:
在接口层面使用也是比较简单(构建/读写):
package main
import (
"fmt"
"time"
"github.com/maypok86/otter"
)
func main() {
// create a cache with capacity equal to 10000 elements
cache, err := otter.MustBuilder[string, string](10_000).
CollectStats().
Cost(func(key string, value string) uint32 {
return 1
}).
WithTTL(time.Hour).
Build()
if err != nil {
panic(err)
}
// set item with ttl (1 hour)
cache.Set("key", "value")
// get value from cache
value, ok := cache.Get("key")
if !ok {
panic("not found key")
}
fmt.Println(value)
// delete item from cache
cache.Delete("key")
// delete data and stop goroutines
cache.Close()
}