基于resilience4j实现的代码级别retry,以及基于db存储+定时任务的retry补偿,可用于多种场景下的重试实现,并可自定义重试次数、衰减时间、重试调用的方法等
整体思路如下:
common-retry组件并未上传到公开mvn仓库,可下载后通过 mvn clean deploy 命令上传到本地私服
fund_retry_info
CREATE TABLE "retry_info" (
"id" int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
"biz_id" varchar(45) NOT NULL COMMENT '业务ID,对应的业务ID',
"bean_class_name" varchar(128) NOT NULL,
"bean_name" varchar(45) NOT NULL COMMENT '重试需要调用的方法所在的bean',
"method_name" varchar(45) NOT NULL COMMENT '重试需要调用的方法',
"retry_count" int(11) NOT NULL DEFAULT '0' COMMENT '已重试次数',
"max_retry_count" int(11) NOT NULL COMMENT '最大重试次数',
"status" varchar(20) NOT NULL COMMENT '当前状态\nRETRYING - 重试中\nRETRY_FAIL - 超出最大重试次数,不再重试\nFINISH - 重试成功,结束',
"priority" int(11) DEFAULT '1' COMMENT '重试优先级,重试时会优先处理,目前暂时 3-最高,2 -中等 1 - 普通 这三个级别',
"req_params" varchar(1024) NOT NULL COMMENT '重试时的请求参数',
"req_params_class_name" varchar(128) NOT NULL,
"decay_type" varchar(15) DEFAULT 'no' COMMENT '衰减类型 no - 不衰减 increase -- 递增衰减 average -- 间隔恒定',
"decay_interval" int(11) DEFAULT '60' COMMENT '衰减间隔,单位为秒,在递增衰减和间隔恒定衰减时均有用,默认为60秒',
"retry_time" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '重试开始的时间,比如5分钟后',
"create_time" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
"update_time" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '修改时间',
PRIMARY KEY ("id"),
KEY "biz_idx" ("biz_id") USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='fund层重试信息保存,在retryable失败之后落库,然后通过定时任务继续发起重试';
重试记录表
CREATE TABLE "retry_record" (
"id" int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
"biz_id" varchar(45) NOT NULL,
"retry_info_id" int(11) NOT NULL COMMENT '关联的retry_info信息',
"operate_type" varchar(45) NOT NULL,
"reason" varchar(256) DEFAULT NULL,
"create_time" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY ("id"),
KEY "retry_info_idx" ("retry_info_id","biz_id")
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=utf8 COMMENT='重试记录';
dependencies
# libraries.gradle:
gradle: common_retry : "com.retry.common:common-retry:0.0.1-SNAPSHOT"
# build.gradle 中需要引入common-retry依赖
compile(libraries.common_retry){
exclude group: 'com.google.guava'
}
# 注意: 如果原系统中引入了guava,且版本低于26.0-jre,则需要在引入common-retry时排除guava
# 如果是maven,则引入(guava的排除一样要做):
<dependency>
<groupId>com.retry.common</groupId>
<artifactId>common-retry</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
# 加入import
<import resource="classpath*:applicationContext-common-retry.xml"/>
# 增加repository.java文件的配置,在basePackage中加入retry相关路径,用逗号隔开
<property name="basePackage" value="XXX,com.retry.common.repository" />
common-retry组件中,所有对外暴露的服务都在RetryFacade接口中定义。详情如下:
/**
* 用于retry时做各种重试实现的分发
*/
public interface RetryFacade {
/**
* 重试分发,会扫描数据,按照优先级及时间处理
* @return
*/
String retryDispatch();
/**
* 暂停指定的重试记录,如果已经是暂停状态,幂等
* 如果已经是成功状态,不允许重试
* @param pauseRetryModel
* @return
*/
RetryResult pauseRetry(PauseRetryModel pauseRetryModel);
/**
* 恢复指定的重试记录,如果不是暂停状态,则不允许resume
* @param resumeRetryModel
* @return
*/
RetryResult resumeRetry(PauseRetryModel resumeRetryModel);
/**
* 根据id bizId status查询重试列表
* @param queryRetryInfoModel
* @return
*/
RetryResult queryRetryListByParams(QueryRetryModel queryRetryInfoModel);
/**
* 根据id bizId 查询重试记录列表
* @param queryRetryInfoModel
* @return
*/
RetryResult queryRetryRecordByParams(QueryRetryModel queryRetryInfoModel);
/**
* 保存重试数据
* @param retryInfoModel
* @return
*/
RetryResult saveRetryInfo(RetryInfoModel retryInfoModel);
}
一些常量都定义在了common-retry中的enum类,比如是否需要衰减:
@Getter
@AllArgsConstructor
@NoArgsConstructor
public enum DecayTypeEnum {
NO("no"), // 不需要衰减
INCREASE("increase"), // 递增衰减
AVERAGE("average") // 间隔恒定
;
private String type;
}
这里详细说明下衰减的时间计算规则,比如初始的重试开始时间为 2018-12-03 21:00:00
如果不需要衰减,那么此重试记录的执行时间为 2018-12-03 21:00:00 重试间隔根据对应的job定时任务间隔时间来确定。
如果是需要间隔恒定,这里允许传入每次间隔的时间,单位为秒,比如为60秒, 也就是每次重试失败后,下一次的重试时间需要加一分钟,假设2018-12-03 21:00:00马上执行且失败,则最新的重试开始时间为:2018-12-03 21:01:00。
如果是需要递增衰减,同样的允许传入每次间隔的时间间隔区间,单位为秒,比如为60秒,那么在重试失败一次之后,其最新的重试时间为:2018-12-03 21:01:00 。如果再执行一次失败,那么重试时间为: 2018-12-03 21:01:00 + 当前失败次数(2) * 时间间隔区间(60秒) = 2018-12-03 21:03:00
在使用定时任务重试之前,一般会先在代码中重试几次,毕竟定时任务还是有一定的延迟。
common-retry中基于resilience4j-retry封装了一个简单的代码重试方法,而不是简单粗暴的for循环,sleep重试实现方式,另外在重试失败之后,会自动将数据插入到retry_info表中,以备后续定时任务扫描处理。下面简单介绍下。
common-retry中提供了一个抽象类 AbstractRetryComponent<T,R>,其中T代表请求,R代表response响应,代码级别的重试需要继承这个抽象类,并实现其中的抽象方法。
@Component
@Data
@Slf4j
@NoArgsConstructor
public abstract class AbstractRetryComponent<T, R> {
@Autowired
private RetryFacade retryFacade;
/**
* 模板类,用于实现retry及最终失败后的重试数据入库
* @param t 请求
* @return 处理结果
*/
public R retryTemplate(T t) {
log.info(" request:{}",
JSONObject.toJSONString(t));
RetryComponentConfig retryComponentConfig = initRetryConfig();
RetryConfig retryConfig = RetryConfig.custom().maxAttempts(retryComponentConfig.getRetryTimes())
.waitDuration(Duration.of(retryComponentConfig.getDuration(), retryComponentConfig.getDurationUnit()))
.retryExceptions(CommonRetryException.class)
.build();
RetryRegistry registry = RetryRegistry.of(retryConfig);
Retry retry = registry.retry("retry-template" + ThreadLocalRandom.current().nextLong());
CheckedFunction1<T, R> decorated =
Retry.decorateCheckedFunction(retry, t1 -> handleBiz(t1));
R response = null;
try {
response = decorated.apply(t);
} catch (CommonRetryException throwable) {
log.info("after retry failed, go go go save retry info......");
log.error("error:", throwable.getMessage());
RetryInfoModel retryInfoModel = wrapRetryInfoInDb(t);
retryInfoModel.check();
retryFacade.saveRetryInfo(retryInfoModel);
} catch (Throwable throwable) {
log.error("error:", throwable.getMessage());
}
return response;
}
/**
* 处理实际的retry操作,这里只需要调用super.retryTemplate(t) 即可。外层业务调用也直接调用当前bean的这个方法
*
* 主要目的是指定T R 泛型对应的具体请求及响应类
* @param t
* @return
*/
public abstract R handleBizWithRetry(T t);
/**
* 初始化retry相关配置,需要传入对应的重试次数、间隔时间
*
* @return
*/
public abstract RetryComponentConfig initRetryConfig();
/**
* 具体的retry业务逻辑,各个业务方自己实现,定时任务job也要调用这个方法
* 注意,如果有异常,一定是要封装成CommonRetryException抛出来,因为重试框架指定处理了这个异常。
* @param t
* @return
* @throws CommonRetryException
*/
public abstract R handleBiz(T t) throws CommonRetryException;
/**
* retry N次失败后,需要将对应数据存入db 用于后续的批处理重试,这里构造对应的数据
*
* @param t
* @return
*/
public abstract RetryInfoModel wrapRetryInfoInDb(T t);
}
可以看到,需要实现其中的四个抽象方法,具体的使用方法见上面注释。
看个实现类demo:
@Slf4j
@Service(value = "creditQueryService")
public class CreditQueryServiceImpl extends AbstractRetryComponent<CreditResultQueryRequest, CreditResultQueryResponse> {
@Override
public CreditResultQueryResponse handleBizWithRetry(CreditResultQueryRequest creditResultQueryRequest) {
return super.retryTemplate(creditResultQueryRequest);
}
@Override
public RetryComponentConfig initRetryConfig() {
return new RetryComponentConfig(3, 5, ChronoUnit.SECONDS);
}
// 处理业务逻辑
@Override
public CreditResultQueryResponse handleBiz(CreditResultQueryRequest request) throws CommonRetryException {
// 触发重试
if ("983345059".equals(request.getUserId())) {
System.out.println("retry..........");
throw new CommonRetryException("UserId Error");
}
System.out.println("invoke success");
return new CreditResultQueryResponse();
}
@Override
public RetryInfoModel wrapRetryInfoInDb(CreditResultQueryRequest creditResultQueryRequest) {
RetryInfoModel retryInfoModel = new RetryInfoModel();
retryInfoModel.setBeanClassName(this.getClass());
retryInfoModel.setBeanName("hxbCreditServiceRetryService");
retryInfoModel.setBizId(creditResultQueryRequest.getRequestNo());
retryInfoModel.setDecayType(FalloffTypeEnum.NO.getType());
retryInfoModel.setMaxRetryCount(99);
retryInfoModel.setDecayInterval(30);
retryInfoModel.setPriority(RetryPriorityEnum.LOW.getPriority());
retryInfoModel.setMethodName("handleBiz");
retryInfoModel.setReqParams(JSON.toJSONString(creditResultQueryRequest));
retryInfoModel.setReqParamsClassName(CreditResultQueryRequest.class);
return retryInfoModel;
}
}
外面业务层调用,就直接使用 creditQueryService.handleBizWithRetry(T t) 即可。
在4.1重试多次依然失败时,定时任务会作为补充继续重试下去,上面实现的wrapRetryInfoInDb方法就构造了一个重试需要设置的内容。
注意其中的请求数据:
private String bizId; // 业务ID,必传
private Class beanClassName; // bean对应的class
private String beanName; // 重试时需要调用的beanName
private String methodName; // 重试时需要调用的methodName,如果是实现了上面的抽象类,则这里一定要传handleBiz,而不是handleBizWithRetry,避免有重复的重试数据入库。
private Integer priority; // 当前重试的优先级 不传的话默认是3 --普通
private String reqParams; // 请求参数,json格式
private Class reqParamsClassName; // 请求参数对应的class
private String decayType; // 超时时间类型,详见FalloffTypeEnum
private Integer decayInterval; // 延时时间,默认为60秒,单位为秒
private Integer maxRetryCount; // 最大重试次数,不传默认为99
private int retryTime; // 多长时间后重试,单位为秒,不传的话默认为30,即30秒之后重试
这里后续的重试发起,是采用java反射获取到对应的类、方法,并传入参数,所以对应的重试代码需要自己控制幂等、数据落地。
在单元测试的情况下,可能出现通过线程池发起的多线程代码未被调用的情况,可以参考这里: https://blog.csdn.net/evan_leung/article/details/51835085
简单来说,解决方案就是在test方法最后面,加入 System.in.read() 或者 Thread.sleep(5000) 来防止test 主线程退出过早,导致线程池还未被正常初始化。其中Thread.sleep()中指定的时间最好根据不同业务做相应评估