Update 2022: Elasticsearch has once again replaced their core library, this time with a new Java API.
Similar to the last migration, this omits utility around the BulkProcessor and so I will be updating this library in
the meantime. You can track the official addition here. This
time around most of the core ES classes are reusable, which makes it a much smaller effort. If you need the 7.x version
of this library, please use the 1.x releases.
Update 2021: As of the most recent Elasticsearch versions, there is once again an official implementation
of BulkProcessor shipped in the latest releases. I'd encourage users to update and migrate over.
This repo contains an implementation of something similar to the BulkProcessor included in Elasticsearch 2.x. The intent
is to make it easier to carry out bulk actions against Elasticsearch using just the REST client which doesn't yet include
an easy way to carry out _bulk requests.
This implementation has been in use in production at scale (roughly 2000 documents per second) for approximately 6 months without issue at the time of writing (September 2017). If you do find any issues, please file an issue (or a PR!) and I'll try to fix it up as soon as possible.
Just add the dependency as usual (you might have to check for the latest version, rather than what's shown below):
<dependency>
<groupId>io.whitfin</groupId>
<artifactId>elasticsearch-bulk-operator</artifactId>
<version>2.0.0</version>
</dependency>The interface is deliberately small; you will only interact with a couple of classes. A BulkOperator will carry out
requests periodically, based on rules you provide during construction (see the docs for info on what you can set). An
operator will queue many BulkOperation instances into a single bulk request to Elasticsearch.
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import io.whitfin.elasticsearch.bulk.BulkOperator;
import io.whitfin.elasticsearch.bulk.lifecycle.RequeueLifecycle;
public class BulkExample {
public static void main(String[] args) {
// create your ES Java client somehow
ElasticsearchClient client = createClient();
// create an operator to flush each minute
BulkOperator operator = BulkOperator
.builder(client)
.concurrency(3)
.interval(60_000)
.lifecycle(new RequeueLifecycle())
.maxActions(10_000)
.build();
Map<String, Object> document = new HashMap<>();
document.put("test", true);
// create an index action (or whatever else you want)
IndexOperation<Map<String, Object>> index = new IndexOperation
.Builder<Map<String, Object>>()
.index("my_test_index")
.document(document)
.build();
// create a bulk operation from the action we have
BulkOperation action = new BulkOperation.Builder()
.index(index)
.build();
// queue it up!
operator.add(action);
}
}For any other functionality, please see the documentation or the code itself.
There are several options which can be applied to an operator to control how flushing occurs;
- You can define
maxActionson an operator to provide a limit on the buffer stored internally before flushing. - You can define
intervalon an operator to provide a schedule (in millis) on which to flush. - You can opt (default) to manually flush by calling
flush()on an operator. - You can do any of the above in any combination to work with multiple flush triggers.
You can attach BulkLifecycle instances to your operator to hook into various stages of the operator lifecycle.
There are currently only two implementations shipped; NoopImplementation (which does nothing) and RequeueLifecycle
which will add failed requests back to the operator (for version conflicts, etc). You can easily create your own by using
the interface class and registering it on your operator:
public class CustomLifecycle implements BulkLifecycle {
@Override
public void beforeBulk(long executionId, BulkOperator operator, BulkRequest request) {
// executed before the batch is sent
}
@Override
public void afterBulk(long executionId, BulkOperator operator, BulkRequest request, BulkResponse response) {
// executed after a success batch request
}
@Override
public void afterBulk(long executionId, BulkOperator operator, BulkRequest request, Throwable failure) {
// executed after a failed batch request
}
}