This repo contains an implementation of something similar to the BulkProcessor
included in Elasticsearch 2.x. The intent
is to make it easier to carry out bulk actions against Elasticsearch using just the REST client which doesn't yet include
an easy way to carry out _bulk
requests.
This implementation has been in use in production at scale (roughly 2000 documents per second) for approximately 6 months without issue at the time of writing (September 2017). If you do find any issues, please file an issue (or a PR!) and I'll try to fix it up as soon as possible.
Just add the dependency as usual (you might have to check for the latest version, rather than what's shown below):
<dependency>
<groupId>io.whitfin</groupId>
<artifactId>elasticsearch-bulk-operator</artifactId>
<version>1.0.0</version>
</dependency>
The interface is deliberately small; you will only interact with a couple of classes. A BulkOperator
will carry out
requests periodically, based on rules you provide during construction (see the docs for info on what you can set). An
operator will queue many BulkAction
instances into a single bulk request to Elasticsearch.
import io.whitfin.elasticsearch.bulk.BulkAction;
import io.whitfin.elasticsearch.bulk.BulkOperation;
import io.whitfin.elasticsearch.bulk.BulkOperator;
import io.whitfin.elasticsearch.bulk.lifecycle.RequeueLifecycle;
import org.elasticsearch.client.RestClient;
public class BulkExample {
public static void main(String[] args) {
// create your ES REST client somehow
RestClient restClient = createRestClient();
// create an operator to flush each minute
BulkOperator operator = BulkOperator
.builder(restClient)
.concurrency(3)
.interval(60_000)
.lifecycle(new RequeueLifecycle())
.maxActions(10_000)
.build();
// create an index action (or whatever)
BulkAction action = BulkAction
.builder()
.operation("index")
.index("my_test_index")
.type("my_test_type")
.source("{\"test\":true}")
.build();
// queue it up!
operator.add(action);
}
}
For any other functionality, please see the documentation or the code itself.
There are several options which can be applied to an operator to control how flushing occurs;
- You can define
maxActions
on an operator to provide a limit on the buffer stored internally before flushing. - You can define
interval
on an operator to provide a schedule (in millis) on which to flush. - You can opt (default) to manually flush by calling
flush()
on an operator. - You can do any of the above in any combination to work with multiple flush triggers.
You can attach BulkLifecycle
instances to your operator to hook into various stages of the operator lifecycle.
There are currently only two implementations shipped; NoopImplementation
(which does nothing) and RequeueLifecycle
which will add failed requests back to the operator (for version conflicts, etc). You can easily create your own by using
the interface class and registering it on your operator:
public class CustomLifecycle implements BulkLifecycle {
@Override
public void beforeBulk(long executionId, BulkOperator operator, BulkOperation bulkOperation) {
// executed before the batch is sent
}
@Override
public void afterBulk(long executionId, BulkOperator operator, BulkOperation bulkOperation, Response response) {
// executed after a success batch request
}
@Override
public void afterBulk(long executionId, BulkOperator operator, BulkOperation bulkOperation, Throwable failure) {
// executed after a failed batch request
}
}