Chaconne is a lightweight distributed task scheduling framework written in Java based on springboot framework. Reference chaconne components to your system can help you build a distributed task cluster very quickly.
- Decentralized deployment mode There is no fixed scheduling center node, any node in the cluster can participate in the scheduling task
- Centralized deployment mode It is divided into scheduling center and task execution node, both of which support cluster
- chaconne-spring-boot-starter The core jar package contains all the core functions of chaconne
- chaconne-console Chaconne web management interface for task management and viewing task running status
Chaconne relies on tridenter framework to realize task cluster. It uses message unicast mechanism to realize task distribution and load balancing, slice processing and other advanced features. It retains tridenter's definition of cluster and supports task calling between clusters
- Perfect support for spring boot framework (2.2.0 +)
- Support
cron
expression timing task, parameter setting timing task and delay task - Support dynamic saving and deleting tasks
- Support annotation saving task
- Built in multiple load balancing algorithms and support custom load balancing algorithms
- Support failed retrial and failed transfer
- Support log tracking
- Support task fragmentation
- Support task dependency and Simple DAG
- Support task custom termination strategy
- Support task timeout cooling and reset
- Support email alarm
<dependency>
<artifactId>chaconne-spring-boot-starter</artifactId>
<groupId>indi.atlantis.framework</groupId>
<version>1.0-RC1</version>
</dependency>
- jdk1.8 (or later)
- Spring Boot Framework 2.2.x (or later)
Redis 4.x
(or later)MySQL 5.x
(or later)
- Use annotation
@ChacJob
- Inherit
Managedjob
or implementJob
interface - Implement
NotManagedJob
interface
By using annotation @ChacJob
@ChacJob
@ChacTrigger(cron = "*/5 * * * * ?")
public class DemoCronJob {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoCronJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoCronJob's return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoCronJob is failed by cause: {}", e.getMessage(), e);
}
}
By inheriting ManagedJob
@Component
public class HealthCheckJob extends ManagedJob {
@Override
public Object execute(JobKey jobKey, Object arg, Logger log) {
log.info(info());
return UUID.randomUUID().toString();
}
@Override
public Trigger getTrigger() {
return GenericTrigger.Builder.newTrigger("*/5 * * * * ?").setStartDate(DateUtils.addSeconds(new Date(), 30)).build();
}
private String info() {
long totalMemory = Runtime.getRuntime().totalMemory();
long usedMemory = totalMemory - Runtime.getRuntime().freeMemory();
return FileUtils.formatSize(usedMemory) + "/" + FileUtils.formatSize(totalMemory);
}
@Override
public long getTimeout() {
return 60L * 1000;
}
}
By implementing NotManagedJob
interface
public class EtlJob implements NotManagedJob {
@Override
public Object execute(JobKey jobKey, Object attachment, Logger log) {
log.info("JobKey:{}, Parameter: {}", jobKey, attachment);
return null;
}
}
Task dependency is one of the important features of chaconne. Dependency patterns are divided into serial dependency and parallel dependency, The so-called serial dependency means that task a is finished and then Task B is executed, that is, Task B depends on task A. Parallel dependency means that there are three tasks, namely task A, Task B and task C. task C can only be executed after task A and Task B are finished, which is similar to a business scenario of countersign Based on the combination of serial dependency and parallel dependency, chaconne provides a simple DAG function to simulate business scenarios similar to workflow, which enriches the usage scenarios of task dependency
@ChacJob
@ChacTrigger(triggerType = TriggerType.DEPENDENT)
@ChacDependency({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoSchedJob", name = "demoSchedJob") })
public class DemoDependentJob {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoDependentJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoDependentJob's return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoDependentJob is failed by cause: {}", e.getMessage(), e);
}
}
There are three tasks, DemoTask, DemoTaskOne and DemoTaskTwo
Let DemoTaskOne
and DemoTaskTwo
finish before executing DemoTask
, and DemoTask
can obtain the values of DemoTaskOne
and DemoTaskTwo
after execution
DemoTaskOne
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskOne {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoTaskOne is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoTaskOne return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoTaskOne is failed by cause: {}", e.getMessage(), e);
}
}
DemoTaskTwo
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskTwo {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoTaskTwo is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoTaskTwo return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoTaskTwo is failed by cause: {}", e.getMessage(), e);
}
}
DemoTask
@ChacJob
@ChacTrigger(cron = "0 0/1 * * * ?", triggerType = TriggerType.CRON)
@ChacFork({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskOne", name = "demoTaskOne"),
@ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskTwo", name = "demoTaskTwo") })
public class DemoTask {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoTask is running at: {}", DateUtils.format(System.currentTimeMillis()));
TaskJoinResult joinResult = (TaskJoinResult) attachment;
TaskForkResult[] forkResults = joinResult.getTaskForkResults();
long max = 0;
for (TaskForkResult forkResult : forkResults) {
max = Long.max(max, (Long) forkResult.getResult());
}
return max;
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoTask return max value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoTask is failed by cause: {}", e.getMessage(), e);
}
}
Dag Tasks currently only support API creation
@RequestMapping("/dag")
@RestController
public class DagJobController {
@Value("${spring.application.cluster.name}")
private String clusterName;
@Value("${spring.application.name}")
private String applicationName;
@Autowired
private JobManager jobManager;
@GetMapping("/create")
public Map<String, Object> createTask() throws Exception {
Dag dag = new Dag(clusterName, applicationName, "testDag");
dag.setTrigger(new CronTrigger("0 0/1 * * * ?"));
dag.setDescription("This is only a demo of dag job");
DagFlow first = dag.startWith(clusterName, applicationName, "demoDagStart", DemoDagStart.class.getName());
DagFlow second = first.flow(clusterName, applicationName, "demoDag", DemoDag.class.getName());
second.fork(clusterName, applicationName, "demoDagOne", DemoDagOne.class.getName());
second.fork(clusterName, applicationName, "demoDagTwo", DemoDagTwo.class.getName());
jobManager.persistJob(dag, "123");
return Collections.singletonMap("ok", 1);
}
}
In addition to relying on the springboot
framework, chaconne uses MySQL
to store task information by default (currently only supports MySQL
), and Redis
to save cluster metadata and message broadcast
So no matter which deployment method is used, you need to set DataSource
and RedisConnectionFactory
in your application
Add the @EnableChaconneEmbeddedMode
annotation to the main function of your spring application and start it
Example:
@EnableChaconneEmbeddedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {
public static void main(String[] args) {
final int port = 8088;
System.setProperty("server.port", String.valueOf(port));
SpringApplication.run(YourApplicationMain.class, args);
}
}
2.1 Start the dispatch center, which requires you to create a new springboot
project, add the annotation of @EnableChaconneDetachedMode
on the main function, and specify it as the production end
Example:
@EnableChaconneDetachedMode(DetachedMode.PRODUCER)
@SpringBootApplication
public class ChaconneManagementMain {
public static void main(String[] args) {
SpringApplication.run(ChaconneManagementMain.class, args);
}
}
Don't forget to configure DataSource
and RedisConnectionFactory
2.2 Add the @EnableChaconneDetachedMode
annotation to the main function of your spring application (the default is the consumer side), and then start it
@EnableChaconneDetachedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {
public static void main(String[] args) {
final int port = 8088;
System.setProperty("server.port", String.valueOf(port));
SpringApplication.run(YourApplicationMain.class, args);
}
}