为什么要重试?
重试是在工程实践中用来提高系统容错能力的一种手段,在微服务的部署架构中,一个请求链路往往需要经过多个服务进行处理返回,而每个服务之间的信息交换依赖于网络通信,网络大部分情况下是稳定的,请求通常能够正常达到和返回到每个服务,但实际情况下可能因为各种原因网络状态会出现波动,使请求可能无法正常达到服务端,或是服务端没有及时返回给客户端使请求链路发生超时,最终导致请求失败,当请求链路中的服务数量越来越多时,意味着请求更容易受到这种影响而失败,而这种情况下导致的失败往往可以通过重试的方式解决,因此实现服务间的自动重试某种意义上提高了整个系统故障恢复能力。
典型的重试场景如下:
首先,是不是所有请求都可以重试呢?显然不是,通常重试依赖于服务端接口的幂等性,假设对于一个接口,多次进行调用会违反了数据约束,那么自动重试则可能会使结果在我们的预期之外。那么,对于幂等的接口,是不是所有请求都可以立即发起重试呢?对于第一种单纯因为网络抖动引起的失败我们通常能简单地发起重试,但是对于其他两种场景,似乎不太适合直接发起重试了,对于系统负载高导致的超时,此时进行重试往往会进一步提高系统的负载,进而导致后续请求发生雪崩,降低了系统的可用性,此时重试便失去了其意义;对于系统当前发生故障的情况下,如果系统没能及时恢复对外提供服务能力,那么失败后的立即重试就变成了“无用功”,如果当前的请求需要保证成功时,此时重复的重试就变成了忙等待,占用着服务端资源。由此可见,不是所有请求都适合立即处理重试。
因此,对于是否可以重试上可以划分出:「可重试」与「不可重试」,在是否可以立即重试上可以划分出:「同步重试」和「异步重试」,最后对重试的等待间隔还需要制定一个「重试策略」。
「可重试」与「不可重试」
对于是否可以重试,常用的方式是通过错误码来区分哪些调用可以重试,哪些不可以重试,例如在配置重试规则时,可以指定需要重试的错误码,重试接口依据错误码来决定是否进行重试:
或者依据重试命令(RPC Command)来决定是否进行重试:
当上游服务过载时,处理速度往往会变慢,此时重试会加大服务的压力,因此需要采集历史的耗时和成功率数据,定义发送重试的阈值,避免对被调用服务的冲击,根据组件采集到的历史数据(平均耗时和成功率)进行计算,配置进行重试的数据阈值:
重试策略
对于重试间隔等待的策略,可以简单地划分为以下几种:
「同步重试」与「异步重试」
对于最普遍的重试方式,也就是同步重试,很好理解,其过程如下所示:
在这种方式下,重试流程会贯穿在请求的整个生命周期中,因此重试过程会随着请求终止而随之终止。那么现在假如整个请求链路中的每一层都实现了这种重试机制,会发生什么呢?
假设当前有 S1->S2->S3 三个服务,最大重试次数为3次,当 S2 调用 S3 失败时,会重试调用 S3 3次,如果都失败的话,那么 S2 会给 S1 返回失败,这时候 S1 又会重试调用 S2,假如后续请求都失败的话,那么这个链路中会发起3x3=9次请求,足足放大了9倍,假如服务数量或者最大重试次数更多之后,这种现象就会更加明显,甚至影响了整个系统。对于重试风暴,需要以一种合适的方式及时中断重试,防止重试次数以线性方式增长,比如用指数退避的方式进行重试,同时需要为整个链路的重试次数设置一个硬上限,避免重试次数过多,另外当上层服务已经发生超时返回错误的话,底层服务因为无法感知到请求已经失败而继续进行重试,如果这时候重试成功则会出现不一致的情况,因此底层服务应该及时中断重试,避免更多无意义调用。
而对于那些不需要立即返回状态的调用,或者短时间内不能立即成功的重试,将重试持久化起来,异步进行重试是一个不错的选择,在这种方案下,可以引入一个独立的重试服务,接受重试消息将其重试持久化下来,并且及时通知使用重试的使用方进行重试,如下所示:
实现一个异步重试服务
有了以上基础之后,接下来就可以根据这些理论来编写一个简单的异步重试服务,目标如下:
主要实现如下:
1.接收重试请求:
接收一个重试请求后,首先根据请求的重试策略获得重试的下一次调度时间以及是否应该重试,如果不需要重试则直接返回,否则将重试持久化,并写入到延迟队列,到期设置为下一次调度时间。
func Retry(req *retrypb.RetryRequest) error {
if err := validateRetry(req); err != nil {
return err
}
retryTime, shouldRetry := getReqNextRetryTime(req)
if !shouldRetry {
return errors.New("should not retry")
}
queueName := formatQueueName(req)
startQueueWorkerChan <- queueName
if err := store.StoreRetry(req.GetRetryId(), req); err != nil {
return err
}
if err := store.ScheduleRetry(queueName, req.GetRetryId(), retryTime); err != nil {
return err
}
return nil
}
2.处理重试:
处理重试时,可以批量从延迟队列中获取到期的重试请求,取的一个重试请求后,判断重试次数是否超过最大上限,如果超过则从调度表中删除该重试并返回,否则将重试次数加1,更新到数据库中,接下来把重试通知写入到消息队列,回调给使用方,若发送失败,则直接跳过该请求,下一次处理时继续进行重试,发送成功后,继续获取重试下一个周期的调度时间,更新到延迟队列。
func handleExpiredRetries(queueName string) error {
retryIDs, err := store.GetExpiredRetries(queueName, fetchRetriesSize)
if err != nil {
return err
}
for _, retryID := range retryIDs {
req, err := store.GetRetry(retryID)
if err != nil {
continue
}
msg := req.GetMsg()
retryCnt := msg.GetRetryCnt()
if retryCnt > maxRetryCnt {
store.RemoveRetrySchedule(queueName, retryID)
continue
}
msg.RetryCnt = retryCnt + 1
store.StoreRetry(retryID, req)
if err := mq.SendMessage(req.Marshal()); err != nil {
continue
}
retryTime, shouldRetry := getReqNextRetryTime(req)
if !shouldRetry {
store.RemoveRetrySchedule(queueName, retryID)
continue
}
store.ScheduleRetry(queueName, retryID, retryTime)
}
return nil
}
3.删除重试:
当接收到使用方的重试回应时,应删除延迟队列中的重试请求。
func Remove(req *retrypb.RetryRequest) error {
if err := validateRetry(req); err != nil {
return err
}
queueName := formatQueueName(req)
return store.RemoveRetrySchedule(queueName, req.GetRetryId())
}
4.重试策略
简单重试:重试间隔应该永远返回0(代表立即重试)
func (p *simplePeriodPolicy) NextPeriod() (time.Duration, bool) {
if p.retryCnt >= p.policy.GetMaxRetryCnt() {
return 0, false
}
return 0, true
}
线性退避:
func (p *linearPeriodPolicy) NextPeriod() (time.Duration, bool) {
return time.Duration(p.policy.GetPeriod()) * time.Millisecond, true
}
随机退避:
func (p *randomPeriodPolicy) NextPeriod() (time.Duration, bool) {
if p.policy.GetMinPeriod() >= p.policy.GetMaxPeriod() {
return 0, false
}
return time.Duration(rand.Intn(int(p.policy.GetMaxPeriod()-p.policy.GetMinPeriod()))+int(p.policy.GetMinPeriod())) * time.Millisecond, true
}
指数退避:
func (p *exponentialBackOffPeriodPolicy) NextPeriod() (time.Duration, bool) {
attempts := p.retryCnt
exp := math.Pow(2, float64(attempts))
result := p.policy.GetWaitExponentialMultiplier() * uint32(exp)
if result > p.policy.GetWaitExponentialMax() {
return 0, false
}
return time.Duration(result) * time.Millisecond, true
}
自定义间隔:
func (p *customPolicy) NextPeriod() (time.Duration, bool) {
attempts := p.retryCnt
retryPeriods := p.policy.GetPeriods()
if attempts >= uint32(len(retryPeriods)) {
return 0, false
}
retryPeriod := retryPeriods[attempts]
return time.Duration(retryPeriod) * time.Millisecond, true
}
总结
要实现正确的重试并不是易事,落实到操作上还有许多细节要考虑,本文主要介绍了重试这一提高容错能力的手段,以及利用其中一种重试方式去编写一个可用的服务,仅供参考和拓展思路。