A framework for building fault-tolerant distributed systems for the JVM
Atomix 2.1 documentation is currently under development. Check the website for updates!
- Overview
- Background
- Architecture
- Java API
- Cluster setup
- Cluster management
- Cluster communication
- Coordination primitives
DistributedLock
LeaderElection
LeaderElector
WorkQueue
- Data primitives
AtomicCounter
AtomicCounterMap
AtomicValue
ConsistentMap
ConsistentTreeMap
ConsistentMultimap
DistributedSet
DocumentTree
- Transactions
- Serialization
- REST API
- Docker container
Atomix 2.1 is a fully featured framework for building fault-tolerant distributed systems. Combining ZooKeeper's consistency with Hazelcast's usability and performance, Atomix uses a set of custom communication APIs, a sharded Raft cluster, and a multi-primary protocol to provide a series of high-level primitives for building distributed systems. These primitives include:
- Cluster management and failure detection
- Cluster communication (direct and pub-sub) via Netty
- Strongly consistent reactive distributed coordination primitives (locks, leader elections, etc)
- Efficient partitioned distributed data structures (maps, sets, trees, etc)
- REST API
- Interactive CLI
Atomix was originally conceived in 2014 along with its sister project Copycat (deprecated) as a hobby project. Over time, Copycat grew into a mature implementation of the Raft consensus protocol, and both Copycat and Atomix were put into use in various projects. In 2017, development of a new version was begun, and Copycat and Atomix were combined in Atomix 2.x. Additionally, significant extensions to the projects originally developed for use in ONOS were migrated into Atomix 2.x. Atomix is now maintained as a core component of ONOS by the Open Networking Foundation.
At its core, Atomix is built on a series of replicated state machines backed by consensus and primary-backup protocols. The most critical component of Atomix is one of the most complete implementations of the Raft consensus protocol. A single Raft cluster is used for internal coordination, and multiple additional Raft clusters are used to store state for consistent primitives. The primary-backup protocol uses the core Raft cluster to elect and balance primaries and backups for all data partitions. Primitives can be configured for consistency, persistence, and replication, modifying the underlying protocol according to desired semantics.
Atomix clusters consist of two types of nodes:
DATA
nodes store persistent and ephemeral primitive stateCLIENT
nodes do not store any state but must connect toDATA
nodes to store state remotely
Primitive partitions (both Raft and primary-backup) are evenly distributed among the DATA
nodes
in a cluster. Initially, an Atomix cluster is formed by bootstrapping a set of DATA
nodes. Thereafter,
additional DATA
or CLIENT
nodes may join and leave the cluster at will by simply starting and
stopping Atomix
instances. Atomix provides a ClusterService
that can be used to learn about new
CLIENT
and DATA
nodes joining and leaving the cluster.
Atomix relies heavily upon builder APIs to build high-level objects used to communicate and coordinate distributed systems.
To create a new Atomix instance, create an Atomix builder:
Atomix.Builder builder = Atomix.builder();
The builder should be configured with the local node configuration:
builder.withLocalNode(Node.builder("server1")
.withType(Node.Type.DATA)
.withAddress("localhost:5000")
.build());
In addition to configuring the local node information, each instance must be configured with a
set of bootstrap nodes from which to form a cluster. When first starting a cluster, all instances
should provide the same set of bootstrap nodes. Bootstrap nodes must be DATA
nodes:
builder.withBootstrapNodes(
Node.builder("server1")
.withType(Node.Type.DATA)
.withAddress("localhost:5000")
.build(),
Node.builder("server2")
.withType(Node.Type.DATA)
.withAddress("localhost:5001")
.build(),
Node.builder("server3")
.withType(Node.Type.DATA)
.withAddress("localhost:5002")
.build());
Additionally, the Atomix instance can be configured with the data directory, the number of Raft
partitions, the number of primary-backup partitions, and other options. Once the instance has
been configured, build the instance by calling build()
:
Atomix atomix = builder.build();
Finally, call start()
on the instance to start the node:
atomix.start().join();
Note that in order to form a cluster, a majority of instances must be started concurrently
to allow Raft partitions to form a quorum. The future returned by the start()
method will
not be completed until all partitions are able to form. If your Atomix
instance is blocking
indefinitely at startup, ensure you enable DEBUG
logging to debug the issue.
Atomix also supports client nodes which can connect to an Atomix cluster and operate on distributed
primitives but does not itself store primitive state. To create a client node, simply indicate in the
node builder that the node is a CLIENT
:
Atomix atomix = Atomix.builder()
.withLocalNode(Node.builder("client")
.withType(Node.Type.CLIENT)
.withAddress("localhost:5003")
.build())
.withBootstrapNodes(
Node.builder("server1")
.withType(Node.Type.DATA)
.withAddress("localhost:5000")
.build(),
Node.builder("server2")
.withType(Node.Type.DATA)
.withAddress("localhost:5001")
.build(),
Node.builder("server3")
.withType(Node.Type.DATA)
.withAddress("localhost:5002")
.build())
.build();
atomix.start().join();
This example connects a client node aptly named client
to a set of data nodes. Once the instance
is started, the client node will be visible to all data nodes and vice versa, and primitives created
by the client node will be managed by the data nodes.
Atomix provides a set of APIs for discovering the nodes in the cluster. The ClusterMetadataService
provides information about the set of data nodes in the cluster. The ClusterService
provides information
about all nodes in the cluster, including the local node, data nodes, and client nodes.
To get the set of nodes in the cluster, use the ClusterService
:
Set<Node> nodes = atomix.clusterService().getNodes();
The Node
objects provided by the ClusterService
provide a Node.State
that can indicates the
liveliness of the given node. The cluster service uses a phi accrual failure detection algorithm
internally to detect failures, and nodes' states are updated as failures are detected:
Node.State state = atomix.clusterService().getNode(NodeId.from("foo")).state();
Additionally, listeners can be added to the ClusterService
to react to changes to both the set
of nodes in the cluster and the states of individual nodes:
atomix.clusterService().addListener(event -> {
if (event.type() == ClusterEvent.Type.NODE_ACTIVATED) {
// A node's state was changed to ACTIVE
} else if (event.type() == ClusterEvent.Type.NODE_DEACTIVATED) {
// A node's state was changed to INACTIVE
}
});
Atomix provides three services that can be used for general cluster communication:
MessagingService
is a low-level IP address based communication APIClusterCommunicationService
is a high-level point-to-point/unicast/multicast/broadcast messaging APIClusterEventService
is a high-level messaging API modelled on the Vert.x event bus that abstracts producers from consumers
The default implementation of communication abstractions uses Netty for all inter-node communication.
Atomix provides the ClusterCommunicationService
for point-to-point messaging between Atomix
nodes. It provides support for unicast, multicast, broadcast, and request-reply messaging patterns.
To register a message subscriber, use the subscribe
methods:
atomix.messagingService().subscribe("test", message -> {
return CompletableFuture.completedFuture(message);
});
Three types of subscribers can be registered:
- A synchronous subscriber that returns a result and must provide an
Executor
on which to consume messages - An asynchronous subscriber that must return
CompletableFuture
- A consumer that must provide an
Executor
on which to consume messages
As noted above, messages can be sent using a variety of different communication patterns:
unicast
sends a message to a single node without awaiting a responsemulticast
sends a message to a set of nodes without awaiting any responsesbroadcast
sends a message to all nodes known to the localClusterService
without awaiting any responsessendAndReceive
sends a message to a single node and awaits a response
// Send a request-reply message to node "foo"
atomix.messagingService().send("test", "Hello world!", NodeId.from("foo")).thenAccept(response -> {
System.out.println("Received " + response);
});
The ClusterCommunicationService
uses a default serializer to serialize a variety of core data
structures, but often custom objects need to be communicated across the wire. The ClusterCommunicationService
provides overloaded methods for providing arbitrary message encoders/decoders for requests/replies:
Serializer serializer = Serializer.using(KryoNamespace.builder()
.register(KryoNamespaces.BASIC)
.register(NodeId.class)
.register(ClusterHeartbeat.class)
.build());
ClusterHeartbeat heartbeat = new ClusterHeartbeat(atomix.clusterService().getLocalNode().id());
atomix.messagingService().broadcast("test", heartbeat, serializer::encode);
Publish-subscribe messaging is done using the ClusterEventService
API, which is closely modelled
on the ClusterCommunicationService
API. Indeed, while the two appear to be almost the exact same,
their semantics differ significantly. Rather than sending messages to specific nodes using NodeId
s,
the ClusterEventService
actually replicates subscriber information and routes messages internally.
Point-to-point messages sent via the ClusterEventService
are delivered in a round-robin fashion,
and multicast messages do not require any specific node information. This separates specific nodes from
senders.
// Add an event service subscriber
atomix.eventingService().subscribe("test", message -> {
return CompletableFuture.completedFuture(message);
});
// Send a request-reply message via the event service
atomix.eventingService().send("test", "Hello world!").thenAccept(response -> {
System.out.println("Received " + response);
});
// Broadcast a message to all event subscribers
atomix.eventingService().broadcast("test", "Hello world!");
Coordination primitives can be used to elect leaders and synchronize access to shared resources in the cluster. These primitives typically require the use of consensus but allow certain consistency models to be relaxed, e.g. on reads. Coordination primitives include:
DistributedLock
- coarse grained fair distributed lockLeaderElection
- single leader electionLeaderElector
- multiple leader election with automatic leader balancingWorkQueue
- distributed persistent work queue with ack/fail mechanisms
As with many other objects in Atomix, primitives are constructed using a builder pattern. All
core primitive builders are exposed via the Atomix
API:
DistributedLock lock = atomix.lockBuilder("test-lock")
.withLockTimeout(Duration.ofSeconds(2))
.build();
The lock timeout is the approximate time it takes to determine whether a lock has failed. If a lock's underlying Raft session is expired, the lock will be released and granted to the next waiting process.
lock.lock();
try {
// Do something
} finally {
lock.unlock();
}
As with all Atomix primitives, a fully asynchronous version of the lock can be constructed by
simply calling the async()
method on the lock:
AsyncDistributedLock asyncLock = lock.async();
lock.lock().thenRun(() -> {
// Do something
lock.unlock().thenRun(() -> {
// Unlocked
});
});
The LeaderElector
primitive is used to elect multiple leaders within a cluster, automatically
balancing leaders among all instances of the primitive across the cluster.
LeaderElector<NodeId> elector = atomix.leaderElectorBuilder("test-elector")
.withElectionTimeout(Duration.ofSeconds(2))
.build();
As with DistributedLock
, the LeaderElector
(and LeaderElection
) primitive supports a timeout
after which inactive leaders will be demoted and a new leader will be elected, e.g. in the case of
a crash or network partition. Similarly, as with all primitives, an AsyncLeaderElector
can be
created via the async()
getter:
AsyncLeaderElector<NodeId> asyncElector = elector.async();
To enter into an election, use the run
method:
asyncElector.run("foo", atomix.clusterService().getLocalNode().id()).thenAccept(leadership -> {
if (leadership.leader().id().equals(atomix.clusterService().getLocalNode().id())) {
// Local node elected leader!
}
});
The topic passed as the first argument to the run
method is an identifier for a named election in
which to participate. Multiple elections can be managed by a single leader elector, and leaders'
locations will be automatically balanced among all elections.
To get the current leader for a topic, use getLeadership
:
Leadership<NodeId> leadership = elector.getLeadership("foo");
The Leadership
provided by an elector contains both a Leader
and a list of candidates in the
election. For each unique Leader
for a topic, a monotonically increasing unique term and wall clock
time will be provided.
Finally, to listen for changes to leaderships managed by a LeaderElector
, register a LeadershipEventListener
:
elector.addListener(event -> {
// Either a leader or candidates changed
Leadership leadership = event.newLeadership();
});
Data primitives store state and can be replicated either using the partitioned Raft cluster or a multi-primary replication protocol. These primitives are typically data structures, including:
AtomicCounter
- simple distributed atomic counterAtomicValue
- simple distributed atomic valueConsistentMap
- partitionedMap
with change events and support for optimistic lockingConsistentTreeMap
- distributedTreeMap
with change events and support for optimistic lockingConsistentMultimap
- partitioned multimap with change eventsDistributedSet
- partitionedSet
with change eventsDocumentTree
- partitioned tree-like structure with change events and support for optimistic locking
The ConsistentMap
primitive is modelled on Java's Map
and provides support for listening for
changes to the map and optimistic locking. To create a map, use the ConsistentMapBuilder
:
ConsistentMap<String, String> map = atomix.consistentMapBuilder("test-map")
.withPersistence(Persistence.EPHEMERAL)
.withBackups(2)
.build();
map.put("foo", "Hello world!");
Map values returned by the ConsistentMap
are wrapped in a Versioned
object which can be
used for optimistic locking:
Versioned<String> value = map.get("foo");
map.replace("foo", "Hello world again!", value.version());
As with all other primitives, an AsyncConsistentMap
can be created via the async()
method:
AsyncConsistentMap<String, String> asyncMap = map.async();
asyncMap.put("bar", "baz").thenRun(() -> {
// put complete
});
To listen for changes to the map, add a MapEventListener
to the map:
map.addListener(event -> {
switch (event.type()) {
case INSERT:
...
case UPDATE:
...
case REMOVE:
...
}
});
Atomix supports transactional operations over multiple primitives. Transactions are committed
using a two-phase commit protocol. To create a transaction, use the TransactionBuilder
:
Transaction transaction = atomix.transactionBuilder()
.withIsolation(Isolation.REPEATABLE_READS)
.build();
To begin the transaction, call begin
:
transaction.begin();
Once a transaction has been started, primitives can be created via the Transaction
primitive
builders. Once all operations have been performed, call commit()
to commit the transaction:
CommitStatus status = transaction.commit();
The CommitStatus
returned by the commit()
call indicates whether the transaction was committed
successfully. Transactions may not commit successfully if a concurrent transaction is already modifying
the referenced resources.
To abort a transaction, call abort()
:
transaction.abort();
To create a transactional map, use the mapBuilder()
method:
Transaction transaction = atomix.transactionBuilder()
.withIsolation(Isolation.REPEATABLE_READS)
.build();
transaction.begin();
TransactionalMap<String, String> transactionalMap = transaction.mapBuilder("my-map")
.withSerializer(Serializers.using(KryoNamespaces.BASIC))
.build();
transactionalMap.put("foo", "bar");
transactionalMap.put("bar", "baz");
if (transaction.commit() == CommitStatus.SUCCESS) {
...
}
The map will inherit the isolation level configured for the transaction. Changes to the named map will be reflected in non-transactional maps of the same name upon commit.
To create a transactional set, use the setBuilder()
method:
Transaction transaction = atomix.transactionBuilder()
.withIsolation(Isolation.REPEATABLE_READS)
.build();
transaction.begin();
TransactionalSet<String> transactionalSet = transaction.setBuilder("my-set")
.withSerializer(Serializers.using(KryoNamespaces.BASIC))
.build();
transactionalSet.add("foo");
transactionalSet.add("bar");
if (transaction.commit() == CommitStatus.SUCCESS) {
...
}
The set will inherit the isolation level configured for the transaction. Changes to the named set will be reflected in non-transactional set of the same name upon commit.
All data primitives support custom serialization via a Serializer
provided to the builder:
Serializer dataSerializer = new Serializer() {
@Override
public <T> byte[] encode(T object) {
return ...;
}
@Override
public <T> T decode(byte[] bytes) {
return ...;
}
};
ConsistentMap<String, Data> map = atomix.consistentMapBuilder("data")
.withSerializer(dataSerializer)
.build();
Atomix also provides a core Kryo based serialization abstraction:
Serializer dataSerializer = Serializer.using(KryoNamespace.builder()
.register(KryoNamespaces.BASIC)
.register(Data.class)
.build());
ConsistentMap<String, Data> map = atomix.consistentMapBuilder("data")
.withSerializer(dataSerializer)
.build();
By default, the KryoNamespace
uses Kryo's FieldSerializer
, but custom Kryo serializers may
be provided via the builder. Additionally, a set of default serializers can be registered using
the KryoNamespaces.BASIC
constant.
The Atomix agent is a standalone server or client node with a REST API through which Java or HTTP clients can manage Atomix primitives remotely.
The atomix
script is used to run the Atomix agent and CLI:
mvn clean package
bin/atomix agent
To start a cluster, the atomix agent
command must be provided with a local node name, host, and port and
a set of nodes with which to bootstrap the cluster.
The format of addresses passed to the agent command is:
{name}:{host}:{port}
Node names must be unique among all nodes in the cluster and will default to the local host name if not explicitly specified.
bin/atomix agent a:localhost:5000 --bootstrap a:localhost:5000 b:localhost:5001 c:localhost:5002 --http-port 6000 --data-dir data/a
bin/atomix agent b:localhost:5001 --bootstrap a:localhost:5000 b:localhost:5001 c:localhost:5002 --http-port 6001 --data-dir data/b
bin/atomix agent c:localhost:5002 --bootstrap a:localhost:5000 b:localhost:5001 c:localhost:5002 --http-port 6002 --data-dir data/c
Client nodes provide the same HTTP API as server nodes but do not themselves store state. Client nodes can be useful for deploying an Atomix node with which to communicate locally and which can use more efficient binary communication when transporting requests across the network.
bin/atomix agent --client --bootstrap a:localhost:5000 b:localhost:5001 c:localhost:5002
This section provides a series of example usages of primitives via the HTTP API:
curl -XPOST http://localhost:5678/v1/primitives/locks/my-lock
curl -XDELETE http://localhost:5678/v1/primitives/locks/my-lock
curl -XPUT http://localhost:5678/v1/primitives/maps/my-map/foo -d value="Hello world!" -H "Content-Type: text/plain"
curl -XGET http://localhost:5678/v1/primitives/maps/my-map/foo
curl -XPOST http://localhost:5678/v1/events/something-happened -d "Something happened!" -H "Content-Type: text/plain"
curl -XGET http://localhost:5678/v1/events/something-happened
mvn clean package
docker build -t atomix .
docker run -p 5678:5678 -p 5679:5679 atomix docker:0.0.0.0
Thank you to the Open Networking Foundation and ONOS for continued support of Atomix!
YourKit supports open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of YourKit Java Profiler and YourKit .NET Profiler, innovative and intelligent tools for profiling Java and .NET applications.