如果阅读完文档后,还有任何疑问,请mail to tengkai.yuan@dianping.com
tiger是一种分布式任务异步调度框架,偏重于执行层面,同一种任务可以由多台机器同时执行,并能保证一条任务不被重复执行。
tiger主要有以下三块组成:
-
zk集群管理:用于管理应用机器的在线情况,进而对机器可执行的任务节点进行自适应分配,保证一个任务同一时间只会被一台机器消费;
-
事件调度管理:用于每隔一定时间触发一次任务执行,并监听任务执行器的配置情况,一旦发生变化,即停止任务执行,重新设置后再触发任务执行;
-
任务执行管理:用于管理本机所分配到的执行器节点,进而进行任务节点捞取、任务过滤等,并对任务的执行结果进行处理;
http://www.12306.cn 上购买火车票的例子:
用户a在12306上提交订单后,会提示请在45分钟内支付,不然就会取消订单。
这样的情形很适合tiger来解决,步骤:
-
插入一条[订单取消任务],并设置执行时间45分钟后,addDispatchTask(arg0)
-
实现任务分发接口DispatchHandler,实现订单取消的业务逻辑(做订单是否已支付的判断)
45分钟后,tiger会自动触发[订单取消任务]。
业务代码逻辑判断:如果此时订单已支付,那么返回任务丢弃;如果订单没支付,那么执行订单取消逻辑,成功后返回。
<groupId>com.dianping</groupId>
<artifactId>wed-tiger</artifactId>
<version>1.2.4</version>
策略Multi: 各个执行器捞取各自的任务
策略Single: 统一捞取任务
策略Multi情况下实现接口:
配置:
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.taskStrategy.name(),ScheduleConstants.TaskFetchStrategy.Multi.getValue() + "");
则实现各自捞取任务的操作接口
com.dianping.wed.tiger.dispatch.DispatchMultiService
策略Single情况下实现接口:
配置:
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.taskStrategy.name(),ScheduleConstants.TaskFetchStrategy.Single.getValue() + "");
则实现统一捞取任务的操作接口
com.dianping.tiger.dispatch.DispatchSingleService
定义spring bean
<bean id="dispatchTaskService" class="你的实现类"/>
public long addDispatchTask(DispatchTaskEntity taskEntity);
策略Multi情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimit(String handler,List<Integer> nodeList, int limit);
策略Single情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimit(List<Integer> nodeList, int limit);
public boolean updateTaskStatus(long taskId,int status,String hostName);
public boolean addRetryTimesAndExecuteTime(long taskId,Date nextExecuteTime,String hostName);
ScheduleManagerFactory.setBackFetchFlag(true)
策略Multi情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimitByBackFetch(String handler, List<Integer> nodeList, int limit,long taskId);
策略Single情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimitByBackFetch(List<Integer> nodeList, int limit, long taskId);
com.dianping.wed.tiger.dispatch.DispatchHandler
这里用于实现 业务逻辑;
任务分发支持并行、串行两种执行策略。 默认是并行执行策略,如果需要串行执行策略(同一个任务有先后执行顺序的情况下),在实现的类里增加一个注解,如:
@ExecuteType(AnnotationConstants.Executor.CHAIN)
public class ChainTestHandler implements DispatchHandler {
@Override
public DispatchResult invoke(DispatchParam param) throws Exception {
Long taskId = param.getTaskId();
String jsonStr = param.getBizParameter();
Map<String, String> paramMap = (Map<String, String>) JSON.parse(jsonStr);
...
}
}
com.dianping.wed.tiger.ScheduleManagerFactory
example:
===========声明 ScheduleManagerFactory=======
设置30s轮询一次任务
ScheduleManagerFactory smf = new ScheduleManagerFactory(30*1000);
smf.setAppCtx(applicationcontext);
===========初始化配置==============
Properties configp = new Properties();
zk地址,必须
configp.setProperty(ScheduleManagerFactory.ZookeeperKeys.zkConnectAddress.name(),"127.0.0.1:2181,127.0.1.1:2181");
执行器名称,必须
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.handlers.name(),"handler1,hander2,hangdler3");
zk节点rootpath,必须
configp.setProperty(ScheduleManagerFactory.ZookeeperKeys.rootPath.name(),"/XXXX");
虚拟节点数,最好大于20,默认100,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.virtualNodeNum.name(),"20");
zk虚拟节点分配策略,0-散列模式,1-分块模式,默认分块模式,建议用1,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.divideType.name(),ScheduleConstants.NodeDivideMode.DIVIDE_RANGE_MODE.getValue()+"");
执行器策略,可选,默认为策略Multi(多执行器各自捞取任务策略)
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.taskStrategy.name(),ScheduleConstants.TaskFetchStrategy.Multi.getValue()+"");
总调度开关,默认true,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.scheduleFlag.name(),"true");
启用巡航模式,默认true,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.enableNavigate.name(),"true");
启用反压模式,默认false,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.enableBackFetch.name(),"false");
===========初始化启用==========
smf.initSchedule(configp);
完成以上4步,启动你的应用就可以使用了.(应用启动前要部署启动zookeeper服务)
配置tiger日志:
<appender name="TIGER" class="org.apache.log4j.DailyRollingFileAppender">
<param name="file" value="/data/applogs/tiger-demo/logs/tiger.log"/>
<param name="append" value="true"/>
<param name="encoding" value="UTF-8"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%-d{yyyy-MM-dd HH:mm:ss} [%c]-[%t]-[%M]-[%L]-[%p] %m%n"/>
</layout>
</appender>
<logger name="com.dianping.wed.tiger" additivity="false">
<level value="INFO"/>
<appender-ref ref="TIGER"/>
</logger>
应用启动后,查看tiger启动日志,看到红线标注部分(start success),代表启动成功,如图:
注意点:
-
ScheduleManagerFactory.keys.handlers.name()的名字需要和DispatchHandler接口实现类的 bean名字一样,执行器handler之间用,分隔;
-
DispatchHandler接口实现类的spring bean配置默认是 单例,所以在实现类里最好 不用成员变量,而要用局部变量, 成员变量是有状态的,会有线程安全问题;
初始化需要的配置外,tiger支持运行中的 配置改变,目前支持以下几种:
- 运行过程中执行器配置改变
ScheduleManagerFactory.reSchedule(List<String> handlers);
- 运行中调度总开关控制
ScheduleManagerFactory.setScheduleFlag(boolean flag);
- 运行中巡航开关控制
ScheduleManagerFactory.setNavigateFlag(boolean flag);
- 运行中反压措施开关控制
ScheduleManagerFactory.setBackFetchFlag(boolean flag);
自 1.2.0 版本起,tiger支持任务代码的动态修改,通过groovy来实现.
1 配置启用groovy动态加载开关:
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.enableGroovyCode.name(),"true");
2 实现groovy code操作接口
com.dianping.wed.tiger.groovy.IGroovyCodeRepo
3 接下来实现任务分发接口(同quick start step3)
groovy特别说明
- groovy代码中service注入方式
import com.dianping.wed.tiger.annotation.TService;
@TService
private WedSmsSendService wpsWedSmsSendService;
- groovy代码支持单例和多例两种方式,默认为多例,若需要使用单例,则采用如下注解的形式
import com.dianping.wed.tiger.annotation.AnnotationConstants;
import com.dianping.wed.tiger.annotation.GroovyBeanType;
@GroovyBeanType(AnnotationConstants.BeanType.SINGLE)
class GroovyTest implements DispatchHandler {
}
tiger应用运行期间,支持任务监控,部署tiger-monitor 并且在tiger应用中增加如下配置:
监控服务地址,必须
configp.setProperty(ScheduleManagerFactory.MonitorKeys.monitorIP.name(),"http://127.0.0.1:8080");
监控开关,默认关闭,必须
configp.setProperty(ScheduleManagerFactory.MonitorKeys.enableMonitor.name(),"true");
同时支持监控运行中开关控制:
scheduleManagerFactory.setMonitorFlag(boolean flag);
注意点: tiger监控用的是文件存储方式,需要对/data/appdatas/tiger/目录有读写权限
tiger监控截图:
Thanks