SynchroTask is a lightweight library which helps to synchronize Java routines in distributed environments.
Synchronization is one of the most important parts in software development. Programming languages offer a wide range of options to work with locks and concurrency. In Java, developers can choose between low-level features, such as synchronized or methods like Object.wait(), and high-level abstractions from java.util.concurrent.locks. However, when it comes to multi-instance environments, these mechanisms are not sufficient.
Sharing locks between different Java virtual machines requires an external agent, and SynchroTask library is one of those agents.
The library does not implement any lock primitives on its own. Instead, it relies on external lock providers and helps to seamlessly integrate them into your code.
There are two key abstractions:
- SynchroTask - represents a unit of work which requires synchronization while executing
- SynchroTaskService -
a service for running
SynchroTask
instances
A SynchroTask is uniquely identified by taskId.
Once initiated and until completed, a SynchroTask instance prevents other instances with the same taskId from being launched in parallel. An attempt to start such an instance will be blocked according to the lock timeout settings. In other words, SynchroTask instance acquires/releases a lock upon start/completion respectively.
The workload is provided in the form of Supplier
, or Runnable
for the tasks with no return value.
Use builder methods to construct SynchroTask
objects:
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> synchroTaskFromSupplier = SynchroTask
.from(() -> "foo")
.withId("bar")
.build();
SynchroTask<Void> synchroTaskFromRunnable = SynchroTask
.from(() -> System.out.println("foo"))
.withId("bar")
.build();
SynchroTaskService
executes SynchroTask
s. run(SynchroTask)
method takes a SynchroTask
and does all the
work required to run it and ensure synchronization according to the task lock timeout settings.
Check Providers section for the available SynchroTaskService
implementations.
A SynchroTask
can be configured with lockTimeout property which indicates the amount of time a SynchroTaskService
should wait if the given task can't be started because of the blocking instance running in parallel:
The simplest way is to specify a timeout as a number of milliseconds:
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> synchroTaskWithLockTimeout = SynchroTask
.from(() -> "foo")
.withId("bar")
.withLockTimeout(60000)
.build();
For zero timeout, you can either provide 0 or use withZeroLockTimeout()
initializer:
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> synchroTaskWithZeroLockTimeout = SynchroTask
.from(() -> "foo")
.withId("bar")
.withZeroLockTimeout()
.build();
When it is not possible to guess the right timeout, you may use withMaxSupportedLockTimeout()
initializer. When such
a task is blocked, the SynchroTaskService
will wait for it to unblock as long as it is supported by the underlying
lock provider:
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> synchroTaskWithMaxSupportedLockTimeout = SynchroTask
.from(() -> "foo")
.withId("bar")
.withMaxSupportedLockTimeout()
.build();
See Lock timeout defaults for the supported lock providers max timeouts.
If the timeout is not specified, the lock provider will use its own default value.
Same behaviour can be achieved by using withDefaultLockTimeout()
initializer, so that both SynchroTask
s below are
equivalent:
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> synchroTaskWithDefaultTimeout1 = SynchroTask
.from(() -> "foo")
.withId("bar")
.withDefaultLockTimeout()
.build();
SynchroTask<String> synchroTaskWithDefaultTimeout2 = SynchroTask
.from(() -> "foo")
.withId("bar")
.build();
See Lock timeout defaults for the supported lock providers default timeouts.
By default, SynchroTaskService.run(SynchroTask)
throws SynchroTaskCollisionException
if the given SynchroTask
is blocked and the waiting time exceeds LockTimeout
.
This behavior can be changed by providing false to SynchroTask.throwExceptionAfterTimeout(boolean)
initializer.
In this case the task will exit straight after lock timeout and return null. Runnable
-based tasks will exit
with no return value.
Example:
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> synchroTaskWithDefaultTimeout1 = SynchroTask
.from(() -> "foo")
.withId("bar")
.withLockTimeout(10000)
.throwExceptionAfterTimeout(false)
.build();
If the task above is blocked, SynchroTaskService.run(SynchroTask)
will wait for 10 seconds before returning null.
Previous versions of SynchroTask
supported CollisionStrategy
parameter.
Starting from version 0.3.0 it is deprecated and might be removed in the following releases in the favor of
lock timeout.
The snippets below show equivalent LockTimeout
configurations for each CollisionSrategy
value:
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> throwingSynchroTask1 = SynchroTask
.from(() -> "foo")
.withId("bar")
.onLock(CollisionStrategy.THROW) //deprecated
.build();
SynchroTask<String> throwingSynchroTask2 = SynchroTask
.from(() -> "foo")
.withId("bar")
.withZeroLockTimeout() // preferred
.build();
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> returningSynchroTask1 = SynchroTask
.from(() -> "foo")
.withId("bar")
.onLock(CollisionStrategy.RETURN) //deprecated
.build();
SynchroTask<String> returningSynchroTask2 = SynchroTask
.from(() -> "foo")
.withId("bar")
.withZeroLockTimeout() // preferred
.throwExceptionAfterTimeout(false) // preferred
.build();
import org.vg2902.synchrotask.core.SynchroTask;
...
SynchroTask<String> returningSynchroTask1 = SynchroTask
.from(() -> "foo")
.withId("bar")
.onLock(CollisionStrategy.WAIT) //deprecated
.build();
SynchroTask<String> returningSynchroTask2 = SynchroTask
.from(() -> "foo")
.withId("bar")
.withDefaultLockTimeout() // preferred
.build();
SynchroTaskJdbcService
uses a special registry table in a database to keep track of running tasks
and ensure synchronization.
You can use the SQL below to create it:
CREATE TABLE synchro_task(
task_id VARCHAR2(100 CHAR) NOT NULL
, creation_time TIMESTAMP(9)
, CONSTRAINT synchro_task_pk PRIMARY KEY (task_id));
CREATE TABLE synchro_task(
task_id VARCHAR(100) NOT NULL
, creation_time TIMESTAMP(6)
, CONSTRAINT synchro_task_pk PRIMARY KEY (task_id));
CREATE TABLE synchro_task(
task_id VARCHAR2(100 CHAR) NOT NULL
, creation_time TIMESTAMP(9)
, CONSTRAINT synchro_task_pk PRIMARY KEY (task_id));
CREATE TABLE synchro_task(
task_id VARCHAR(100) NOT NULL
, creation_time TIMESTAMP(6)
, CONSTRAINT synchro_task_pk PRIMARY KEY (task_id));
The table must have:
- both columns with exactly the same names and data types as they are defined above
- a primary key on task_id column
Column size, however, is not limited and can vary. Just make sure that task_id column is wide enough to fit anticipated values. By default, the service expects the table to be named SYNCHRO_TASK, but this can be overridden.
SynchroTaskJdbcService
requires javax.sql.DataSource
for initialization.
Every invocation of run(SynchroTask)
method first obtains a new Connection
from this DataSource
.
This connection is then used to create and immediately lock the control row with the given task_id in
the registry table.
If the row already exists and is unlocked, the service will try to reuse it.
If the row already exists and is locked by another database session, the given SynchroTask
is assumed
as being currently executed, and the operation outcome will depend on the task lock timeout
settings.
After successful SynchroTask
completion, the control row will be removed from the table, and the task result will
be returned.
The Connection
will always be closed before returning from the method.
<dependency>
<groupId>org.vg2902</groupId>
<artifactId>synchro-task-jdbc</artifactId>
<version>0.4.0</version>
</dependency>
import org.vg2902.synchrotask.core.api.SynchroTask;
import org.vg2902.synchrotask.core.api.SynchroTaskService;
import org.vg2902.synchrotask.jdbc.SynchroTaskJdbcService;
import javax.sql.DataSource;
...
DataSource ds = getDataSource();
SynchroTaskService service = SynchroTaskJdbcService
.from(ds)
.build;
SynchroTask<Void> noop = SynchroTask
.from(() -> {})
.withId("foo")
.build();
service.run(noop);
When creating a SynchroTaskJdbcService
, you can provide two optional parameters:
- registry table name, defaults to SYNCHRO_TASK if not provided
- interceptor, which gets triggered as part of
SynchroTaskJdbcService.run(SynchroTask)
invocations, capturing theSynchroTask
passed in, andjava.sql.Connection
object allocated for it. Interceptor is helpful mostly for testing and debugging purposes.
import org.vg2902.synchrotask.core.api.SynchroTask;
import org.vg2902.synchrotask.core.api.SynchroTaskService;
import org.vg2902.synchrotask.jdbc.SynchroTaskJdbcService;
import javax.sql.DataSource;
import java.sql.Connection;
...
public void test(DataSource ds) {
SynchroTaskService service = SynchroTaskJdbcService
.from(ds)
.withTableName("CUSTOM_SYNCHRO_TASK")
.withInterceptor(this::intercept)
.build();
SynchroTask<Void> noop = SynchroTask
.from(() -> {})
.withId("foo")
.build();
service.run(noop);
}
public void intercept(SynchroTask<?> task, Connection connection) {
// do something
}
The following databases are currently supported:
- H2
- MySQL
- Oracle
- PostgreSQL
Redis support is implemented by SynchroTaskRedisService
, which relies on Redisson client.
<dependency>
<groupId>org.vg2902</groupId>
<artifactId>synchro-task-redis</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.0</version>
</dependency>
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.vg2902.synchrotask.core.api.SynchroTask;
import org.vg2902.synchrotask.core.api.SynchroTaskService;
import org.vg2902.synchrotask.jdbc.SynchroTaskRedisService;
...
public void test() {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
config.setLockWatchdogTimeout(30000);
RedissonClient redissonClient = Redisson.create(config);
SynchroTaskService service = SynchroTaskRedisService
.from(redissonClient)
.withInterceptor(this::intercept)
.build();
SynchroTask<Void> noop = SynchroTask
.from(() -> {})
.withId("foo")
.build();
service.run(noop);
}
public void intercept(SynchroTask<?> task, RLock rLock) {
// do something
}
The snippet above shows how to initialize a SynchroTaskRedisService
which uses a single-server Redis instance running on
localhost at port 6379.
Note how Watchdog timeout is specified in order to clean up the locks which are not released gracefully.
Lock Provider type | Lock Provider name | Default lock timeout | Max supported lock timeout | Time unit |
---|---|---|---|---|
JDBC | H2 | 1000 | 4294967295 | milliseconds |
JDBC | MySQL | 50 | 1073741824 | seconds |
JDBC | Oracle | 0 | N/A | N/A |
JDBC | PostgreSQL | 0 | 2147483647 | milliseconds |
Redis | Redis | 0 | 9223372036854775807 | seconds |
With SynchroTask Spring extension, you don't need to construct your tasks manually - just tell Spring which methods should be running as SynchroTasks, and the framework will do it for you.
<dependency>
<groupId>org.vg2902</groupId>
<artifactId>synchro-task-spring</artifactId>
<version>0.4.0</version>
</dependency>
Spring setup requires two steps:
- enable SynchroTask support at the application level using
@EnableSynchroTask
annotation - define at least one
SynchroTaskService
bean
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.vg2902.synchrotask.core.api.SynchroTaskService;
import org.vg2902.synchrotask.jdbc.SynchroTaskJdbcService;
import org.vg2902.synchrotask.spring.EnableSynchroTask;
import javax.sql.DataSource;
import java.sql.Connection;
@Configuration
@EnableSynchroTask
public class SynchroTaskConfig {
@Bean
public SynchroTaskService jdbcSynchroTaskService(DataSource ds) {
return SynchroTaskJdbcService.from(ds).build();
}
}
Then use @SynchroTask
annotation to indicate a method that you want to run as a SynchroTask
.
The framework will wrap the method invocations in individual SynchroTask
instances
and execute them with an eligible SynchroTaskService
bean.
The method must have a @TaskId
-annotated parameter to designate taskId of the resulting SynchroTask
objects.
Note, that the object containing annotated methods has to be a Spring bean.
import org.vg2902.synchrotask.spring.SynchroTask;
import org.vg2902.synchrotask.spring.TaskId;
@Component
public class SynchroTaskRunner {
@SynchroTask
public Integer defaultTask(@TaskId long taskId) {
return 42;
}
}
@SynchroTask
without parameters will create SynchroTask
instances with default lock timeout
and run them using a primary SynchroTaskService
bean.
These can be overridden with serviceName
, lockTimeout
and throwExceptionAfterTimeout
parameters:
import org.vg2902.synchrotask.spring.SynchroTask;
import org.vg2902.synchrotask.spring.TaskId;
@Component
public class SynchroTaskRunner {
@SynchroTask(lockTimeout = 10000, throwExceptionAfterTimeout = false, serviceName = "jdbcSynchroTaskService")
public Integer throwingTask(@TaskId long taskId) {
return 42;
}
}
Spring versions from 3.0.0 and above are supported
SynchroTask uses SLF4J logging facade.
You can get the most detailed logs by setting org.vg2902.synchrotask
log level to DEBUG
.
logback.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %-5level [%t] %C{1.}: %msg%n%throwable</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
<!-- Setting SynchroTask log level to DEBUG -->
<logger name="org.vg2902.synchrotask" level="debug" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
</configuration>
log4j2.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %-5level [%t] %C{1.}: %msg%n%throwable"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
<!-- Setting SynchroTask log level to DEBUG -->
<Logger name="org.vg2902.synchrotask" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>
There is an open issue which causes H2 to wait for a specified timeout twice.
As a workaround, provide a halved lock timeout value
- Adding support for Redis
- Discontinuing taskName attribute
- Addressing vulnerabilities
- Addressing vulnerabilities
- Addressing vulnerabilities
- JDBC provider now supports MySQL database
- Lock timeout support added
- CollisionStrategy property is deprecated
- JDBC provider now supports PostgreSQL database
- Initial release
- JDBC provider with support of Oracle and H2 databases
- Extension for Spring Framework