Enterprise Integration Patterns, EIP, provide reusable architecture patterns to construct complicated systems from simple components. It has been a great success in the enterprise world, and although EIP itself is language-agnostic, there have been many systems written in Java following or extending from EIP.
However, in recent years there has been a trend to rewrite systems in more stream-based fashion, given a massive scale of data enterprise systems need to process every day, and an increasing business demand for real-time analysis.
Alpakka is a great fit in this area - it is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka. It allows you easily connect your systems with other external systems and services, and perform stream-based operations.
In this article, we introduce Alpakka's Cassandra connector as an example, and see what Alpakka gives you over Cassandra's plain Java driver.
Alpakka is a community based effort collaborating with Akka maintainers at Lightbend, and provides a large, and ever increasing number of connectors for files, queues including AMQP and Kafka, AWS/GCP/Azure services, and more.
You can see how Alpakka grew over time from Akka team's blog post, and the increased number of connectors as in the below screenshot.
Since Alpakka provides the connectors as Akka Streams operators, it's not just easy to connect your application to these other systems and services, but you can also get benefit from Akka Streams's back-pressure support and fine-grained control over the stream at any level you want. Akka Streams's flexible DSL makes it easy to combine different operators to perform things like buffering, throttling, branching, pub/sub, etc. What's more, you can even create your own operators.
Cassandra is a database product originally created by Facebook, and known for its great write performance and distributed nature by design.
Although Cassandra is different from relational databases in many aspects, its query language CQL has some similarities to SQL, and Cassandra indeed has a concept of tables.
If you already have existing data stored in Cassandra and want to introduce stream-based operations in your system, or you have Akka-based or Akka Streams-based systems and looking for a database with great scalability and fault tolerance, this blog post can be useful for you.
To follow the examples in this article, you should firstly bring up Cassandra. If you already have Cassandra up and running, you can skip this section.
The easiest way to bring up Cassandra, if you already have Docker installed, is run it via Docker. Make sure you expose the port 9042 so that the example code can connect to Cassandra via the port.
docker pull cassandra
docker run -p 127.0.0.1:9042:9042 -d cassandra
If you are not familiar with Docker, download Cassandra, unarchive it, set PATH to the Cassandra bin directory
The example source code repository includes pom.xml, so that you can use Maven to create a runnable jar file. If you don't have Maven set up, go to the Maven main page, download and install it.
Then, git clone
the example code repository, and invoke the mvn package
command as follows:
> git clone https://github.com/richardimaoka/alpakka-blog-post.git
> cd alpakka-blog-post
> mvn package
You will see that under the target
directory, Maven created alpakka-blog-post-1.jar
and the lib
directory which contains
all jar dependencies.
To run one of the examples, you can do this (replace CassandraSourceExample
with CassandraSinkExample
or CassandraFlowExample
if you want to run the other examples.)
> java -cp "target/alpakka-blog-post-1.jar;target/lib*" com.example.CassandraSourceExample
If you already have Cassandra up and running, it should work here.
Alpakka Cassandra has three different connectors, CassandraSource
, CassandraSink
and CasssandraFlow
.
The first example we talk about is CassandraSource
, which could be useful when you perform a batch-like operation
against a very large data set.
As you see in the animation, CassandraSource
lets you run a CQL query, which fetches a ResultSet
from Cassandra,
and passes each Row
from the ResultSet
as an element going through Akka Streams.
Note that the CQL query is only run once. It is not something that keeps polling Cassandra given some filtering criteria,
and that's why it is suitable for batch-like operations.
In a nutshell, you can create and run a stream with Alpakka Cassandra connector like below:
final Statement stmt =
new SimpleStatement("SELECT * FROM akka_stream_java_test.users").setFetchSize(100);
final RunnableGraph<NotUsed> runnableGraph =
CassandraSource.create(stmt, session)
.to(Sink.foreach(row -> System.out.println(row)));
runnableGraph.run(materializer);
and we'll see how it works in more detail as follows.
The full code example can be found here.
To go through the example code, you firstly need to add following import statements,
// Alpakka Cassandra connector
import akka.stream.alpakka.cassandra.javadsl.CassandraSource;
// For Akka and Akka Streams
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
// For Java Cassandra driver
import com.datastax.driver.core.*;
and you need to initialize the following stuff before running CassandraSource
to connect to Cassandra.
// Make sure you already brought up Cassandra, which is accessible via the host and port below.
// The host and port would be driven from a config in a production environment
// but hardcoding them here for simplicity.
final Session session = Cluster.builder()
.addContactPoint("127.0.0.1").withPort(9042)
.build().connect();
// ActorSystem and Materializer are necessary as the underlying infrastructure to run Akka Streams
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
If you are not familiar with ActorSystem
and Materializer
, you can assume that
they are like the underlying infrastructure to run Akka Streams.
Typically there is only one instance of ActorSystem
and only one instance of Materializer
in your application, more precisely,
in your (OS) process.
In a production environment, you should already have a data set in Cassandra, but in this example,
we prepare a data set by ourselves before running Akka Streams with CassandraSource
.
So let's create a keyspace and a table in Cassandra as follows:
final Statement createKeyspace = new SimpleStatement(
"CREATE KEYSPACE IF NOT EXISTS akka_stream_java_test WITH REPLICATION = "
+ "{ 'class' : 'SimpleStrategy', 'replication_factor': 1 };"
);
session.execute(createKeyspace);
final Statement createTable = new SimpleStatement(
"CREATE TABLE akka_stream_java_test.users (" +
"id int, " +
"name text, " +
"age int, " +
"PRIMARY KEY (id)" +
");"
);
session.execute(createTable);
In the above example code, we use the Cassandra Java driver to execute them so that you don't need to install CQL client yourself to connect to Cassandra. Keyspace is what contains Cassandra tables, and you need to declare a replication strategy when you create a keyspace. After creating the keyspace, you can create a table under it.
Now you can insert data into the table:
IntStream.range(1, 1000).forEach(i -> {
// For simplicity we use the same name and age in this example
String name = "John";
int age = 35;
// Prepared statement is typical in parameterized queries in CQL (Cassandra Query Language).
// In production systems, it can be used to guard the statement from injection attacks, similar to SQL prepared statement.
BoundStatement bound = prepared.bind(i, name, age);
session.execute(bound);
});
Here, if you execute the following CQL query,
select * FROM akka_stream_java_test.users ;
you will get the result set like below,
id | age | name
-----+-----+------
769 | 35 | John
23 | 35 | John
114 | 35 | John
660 | 35 | John
893 | 35 | John
53 | 35 | John
987 | 35 | John
878 | 35 | John
110 | 35 | John
...
...
but we will execute this query using CassandraSource
.
To supply the query to CassandraSource
, you should create a Statement beforehand,
using setFetchSize to set the paging size.
//https://docs.datastax.com/en/developer/java-driver/3.2/manual/paging/
final Statement stmt =
new SimpleStatement("SELECT * FROM akka_stream_java_test.users").setFetchSize(100);
Cassandra Java driver already has a paging feature, (the below picture is cited from the reference Cassandra article)
so that you don't need to be afraid of your Cassandra client going out of memory by fetching a huge data set in one go. Cassandra's paging works nicely with Akka Streams, and on top of it, Akka Streams allows fully non-blocking execution without Cassandra driver's imperative async-paging interface.
Finally, you can run the stream like below:
final RunnableGraph<NotUsed> runnableGraph =
CassandraSource.create(stmt, session)
.to(Sink.foreach(row -> System.out.println(row)));
runnableGraph.run(materializer);
and get the following output.
...
Row[829, 35, John]
Row[700, 35, John]
Row[931, 35, John]
Row[884, 35, John]
Row[760, 35, John]
Row[628, 35, John]
Row[498, 35, John]
Row[536, 35, John]
...
Sometimes you may want to perform filtering on elements from CassandraSource
, based on certain rules but the filtering
rules cannot simply be expressed as a CQL where
clause. For example, the filtering rule depends on a return from an external
service call.
You can achieve this by the mapAsync
operator, which is for external and async service call,
and the filter
operator to filter elements.
CassandraSource
.create(stmt, session)
.map(row -> new User(
row.getInt("id"),
row.getString("name"),
row.getInt("age")
))
.mapAsync(1, user -> {
... //make an external service call
})
.filter(serviceResult -> {
... //perform complicated filtering
})
.to(Sink.foreach(row -> System.out.println(row)));
Another example which could be useful is performing aggregation operations.
Cassandra's CQL doesn't have native support for aggregation operations unlike SQL, which has GROUP BY
and other mechanisms to support aggregation.
Using Akka Streams's groupBy operator, we can perform aggregation like below:
CassandraSource
.create(stmt, session)
.map(row -> new User(
row.getInt("id"),
row.getString("name"),
row.getInt("age")
))
.groupBy(200, user -> user.age) //group by user's age
.fold(
akka.japi.Pair.create(0 /*age*/, 0 /*count*/), //the initial value
(accumulated, user) -> akka.japi.Pair.create(user.age, accumulated.second() + 1) //accumulation operation
)
.to(Sink.foreach(accumulated ->
System.out.println("age: " + accumulated.first() + " count: " + accumulated.second()
)));
In the example, the groupBy(200, user -> user.age)
operator creates substreams where each substream only
flows through user
elements with the same user.age
. For example, if the we had the following elements from the upstream:
new User(1, "John", 35),
new User(2, "Chris", 21),
new User(3, "Marie", 13),
new User(4, "Carol", 35),
new User(5, "Ben" , 35),
new User(6, "Julia", 13),
There are three substreams created and corresponding elements will be as below:
// These go through the substream with age = 35
new User(1, "John", 35),
new User(4, "Carol", 35),
new User(5, "Ben" , 35),
// This goes through the substream with age = 21
new User(2, "Chris", 21),
// These go through the substream with age = 13
new User(3, "Marie", 13),
new User(6, "Julia", 13),
The next fold
operator is for aggregating calculation, but we will come back to this after looking at the Sink
.
In this stream, the Sink
prints out the age for the substream (accumlated.first()
) and the number of elements went through the substream (accumulated.second()
):
System.out.println("age: " + accumulated.first() + " count: " + accumulated.second()
So, the earlier fold
operator needs count up the number of elements in each substream, which is stored in the second()
parameter of akka.japi.Pair
.
Note that the accumulation operation defined in the fold
operator is applied to each substream in the same manner.
That means the aggregation is done individually at the substream level, not for the stream as a whole.
.fold(
akka.japi.Pair.create(0 /*age*/, 0 /*count*/), //the initial value
(accumulated, user) -> akka.japi.Pair.create(user.age, accumulated.second() + 1) //accumulation operation
)
The last example about CassandraSource
is for throttling.
Assume you have an externalSink
which calls an non-streaming based external service on each element,
and the external service is slow in processing elements, it could be overwhelmed when you send elements too fast.
To avoid that, you can put a throttling
operator in the middle, to control the throughput.
This is a useful technique if you know the safe throttling level in advance:
CassandraSource
.create(stmt, session)
.map(row -> new User(
row.getInt("id"),
row.getString("name"),
row.getInt("age")
))
.throttle(10, java.time.Duration.ofSeconds(1))
.to(externalSink); //Sink representing external system, like RDB, ElasticSearch, HTTP API, etc
The above throttle(10, java.time.Duration.ofSeconds(1))
means that the throughput is capped at
10 elements within 1 second. There are other overloaded throttle
methods available, so that you can specify
the throughput limit in various ways, which are explained in the javadoc of Flow
and that of Source
.
The full code example can be found here.
The next example we see is CassandraSink, which lets you insert Row
s into Cassandra as a Sink
of the stream.
In contrast to CassandraSource
, this is useful for more like a real-time system that keeps running where your data goes from another data source and
eventually written into Cassandra.
To run CassandraSink, the code would look like below:
final PreparedStatement insertTemplate = session.prepare(
"INSERT INTO akka_stream_java_test.user_comments (id, user_id, comment) VALUES (uuid(), ?, ?)"
);
BiFunction<UserComment, PreparedStatement, BoundStatement> statementBinder =
(userData, preparedStatement) -> preparedStatement.bind(userData.userId, userData.comment);
final Sink<UserComment, CompletionStage<Done>> cassandraSink =
CassandraSink.create(2, insertTemplate, statementBinder, session);
source.to(cassandraSink).run(materializer);
In this example, we use a different table from what we used in the CassandraSource
example.
final Statement createTable = new SimpleStatement(
"CREATE TABLE akka_stream_java_test.user_comments (" +
"id uuid, " +
"user_id int, " +
"comment text, " +
"PRIMARY KEY (id)" +
");"
);
session.execute(createTable);
This table is associated with previous users
table, where user_comments.user_id
is reference to users.id
.
However, there is no concept of foreign keys in Cassandra, so your application code needs to make sure the association
is kept tight (i.e. every user_id
value in the user_comments
table must also exist in the users
table's id
column).
Anyway, that is beyond the scope of this article, so let's come back to the CassandraSink
stuff.
As you have the table in Cassandra, you can now define an associated model class in Java.
public static class UserComment {
int userId;
String comment;
UserComment(int userId, String comment) {
this.userId = userId;
this.comment = comment;
}
}
You need to create a prepared statement, to insert parameterized rows into Cassandra. Prepared statements in Cassandra is similar to that of SQL for relational databases, and they are strong against injection attacks.
final PreparedStatement insertTemplate = session.prepare(
"INSERT INTO akka_stream_java_test.user_comments (id, user_id, comment) VALUES (uuid(), ?, ?)"
);
Next, you need this (probably) unfamiliar-looking BiFunction
.
BiFunction<UserComment, PreparedStatement, BoundStatement> statementBinder =
(userComment, preparedStatement) -> preparedStatement.bind(userData.userId, userData.comment);
The signature of this BiFunction
is bit complicated, but it means:
- take
UserComment
as input - "bind" it to
PreparedStatement
- so that the bound CQL statement can be executed
Using statementBinder
, now you can create CassandraSink
.
final Sink<UserComment, CompletionStage<Done>> cassandraSink =
CassandraSink.create(2, insertTemplate, statementBinder, session);
The parameter 2
in CassandraSink.create(2, ...)
specifies the parallelism on writing into Cassandra.
We'll discuss about the parallelism bit later in this article.
For easiness, we can provide a data source as simple as below and run the stream:
Source<UserComment, NotUsed> source =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.map(i -> new UserComment(i, "some comment"))
source.to(cassandraSink).run(materializer);
however, let's do something smarter and more useful here.
You can use Source.actorRef
to connect a Sink
to an Actor
,
final Source<UserComment, ActorRef> source = Source.actorRef(4, OverflowStrategy.fail());
// Stream 1
final ActorRef actorRef =
source
.to(cassandraSink) //to() takes the left materialized value, (i.e.) source's ActorRef
.run(materializer);
and pass this ActorRef
to provide input from whatever data source you like.
// Stream 2
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// throttling the stream so that the Source.actorRef() does not overflow
.throttle(1, Duration.of(50, ChronoUnit.MILLIS))
.map(i -> new UserComment(i, "some comment"))
//actorRef below is connected to CassandraSink
.to(Sink.actorRef(actorRef, "stream completed"))
.run(materializer);
For example, instead of sending elements from such a simple int
list,
your data source can be HTTP requests from an (Akka) HTTP server,
and you can pass requests to this ActorRef
after transforming requests to UserComment
.
Or it could be a task queue like RabbitMQ or Kafka, which works as the data source, and you can perform necessary operations on items from the queue and persist them to Cassandra.
One thing to note about this example is that you can use the parallelism
parameter of CassandraSink
to improve throughput of the stream.
As discussed previously, Cassandra is known for its great write performance, and is distributed by nature so that your
writes are balanced across different nodes in the Cassandra cluster, not hammering a single node, as long as your table
defines the appropriate Cassandra persistence key.
So, chances are that you can insert into Cassandra parallelly to achieve faster CassandraSink
than your data source,
which is a good thing and contributes to the stability of your entire stream.
It is often the case that you want to persist or send elements to multiple different destinations.
source
.alsoTo(elasticSearchSink)
.alsoTo(jdbcSink)
.to(cassandraSink)
.run(materializer);
Be careful on using alsoTo
though, because:
- failure on a single destination Sink can make the entire stream fail,
- and if a single destination gets stuck and does not pull demand, all the other destinations get stuck too with back-pressure
So, you would probably need to control the failure for each destination.
Another example can be using CassandraSink
for inserting test data for the earlier CassandraSource
example.
Instead of executing the CQL insert into
operation directly within the IntStream.range(1, 1000).forEach(i -> {...})
loop,
import akka.Done;
import akka.event.Logging;
import akka.stream.Attributes;
import akka.stream.alpakka.cassandra.javadsl.CassandraSink;
import akka.stream.javadsl.StreamConverters;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
// Setup step 1: Firstly make sure the keyspace exists
// Cassandra keyspace is something that holds tables inside, and defines replication strategies
final Statement createKeyspace = new SimpleStatement(
"CREATE KEYSPACE IF NOT EXISTS akka_stream_java_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': 1 };"
);
session.execute(createKeyspace);
// Step 2: Make sure the target table exists, and empty before the step 3
// Dropping and creating the table is the easiest way to make sure the table is empty
final Statement dropTable = new SimpleStatement(
"DROP TABLE IF EXISTS akka_stream_java_test.users;"
);
final Statement createTable = new SimpleStatement(
"CREATE TABLE akka_stream_java_test.users (" +
"id int, " + // Typically in Cassandra, UUID type is used for id, but we use int for simplicity
"name text, " +
"age int, " +
"PRIMARY KEY (id)" +
");"
);
session.execute(dropTable);
session.execute(createTable);
final PreparedStatement insertTemplate = session.prepare(
"insert into akka_stream_java_test.users( id, name, age ) values ( ?, ?, ? )"
);
// A function to create a BoundStatement, from:
// - UserComment, input data
// - PreparedStatement, template to generate BoundStatement by supplying UserComment
BiFunction<UserData, PreparedStatement, BoundStatement> statementBinder =
(userData, preparedStatement) -> preparedStatement.bind(userData.id, userData.name, userData.age);
final Sink<UserData, CompletionStage<Done>> cassandraSink =
CassandraSink.create(2, insertTemplate, statementBinder, session);
// Step 3: Insert the data, 1000 rows into the table
StreamConverters
.fromJavaStream(() -> IntStream.range(1, 1000))
.map(i -> new UserData(i, "John", 35))
.to(cassandraSink)
.run(materializer);
The full code example can be found here.
The last example we see is CassandraFlow
. CassandraFlow
allows you persist each element coming through
the CassandraFlow
operator similar to CassandraSink
, but the difference is that it also emits the element
after the CQL insert statement is finished.
In short, you can run CassandraFlow
like below.
final PreparedStatement insertTemplate = session.prepare(
"INSERT INTO akka_stream_java_test.user_comments (id, user_id, comment) VALUES (uuid(), ?, ?)"
);
BiFunction<UserComment, PreparedStatement, BoundStatement> statementBinder =
(userData, preparedStatement) -> preparedStatement.bind(userData.userId, userData.comment);
final Flow<UserComment, UserComment, NotUsed> cassandraFlow =
CassandraFlow.createWithPassThrough(2, insertTemplate, statementBinder, session, system.dispatcher());
source.via(cassandraFlow).to(sink).run(materializer);
The above example is similar to CassandraSink
, so we are not going too much detail about the example again.
Also, what we discussed in the note about the CassandraSink
parallelism applies to CassandraFlow
too.
One good use case of CassandraFlow
is replacement for DB polling.
It is a common requirement that you want to perform a certain operation whenever there is a new row inserted into a database. A traditional way to achieve this is to periodical DB polling - query the database (e.g.) every X minutes, and if you find new rows inserted, perform operations on them. To see if there are new rows inserted, the client which polls the database remembers the last element processed, and only fetches rows which are newer than that timestamp.
Using CassandraFlow
, you can achieve such an operation "triggered by new insertion" in a more straightforward manner.
Whenever insertion to Cassandra succeeds, you can perform the operation.
source
.via(cassandraFlow)
.via(someOperation)
.to(sink)
.run(materializer);
Hopefully the examples shown in this article are interesting to you. There are a lot more capabilities in Akka Streams, and also Alpakka is not limited to Cassandra connectors. Please visit the Akka Streams documentation and the Alpakka project page if you want to explore more.