GoogleCloudPlatform/DataflowJavaSDK

Caused by: java.io.NotSerializableException: com.beam.example.config.ElasticsearchRestClientConfig

nithinsridhar opened this issue · 5 comments

I'm reading data from elasticsearch using rest high-level client using apache beam and I'm getting below error: Caused by: java.io.NotSerializableException: com.beam.example.config.ElasticsearchRestClientConfig

====================Elastic Search Config=====================
@configuration
public class ElasticsearchRestClientConfig {

private String elasticsearchHost;

@Bean(destroyMethod = "close")
public RestHighLevelClient client() {

    RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(new HttpHost("http://127.0.0.1:9200")));
    System.out.println("SSS");
    System.out.println(client);
    return client;

}

}
=====================Transformer==============================
@component
public class ElasticsearchReadTransform extends DoFn<String,String> implements Serializable {

private ElasticsearchRestClientConfig elasticsearchRestClientConfig = new ElasticsearchRestClientConfig();

@ProcessElement
public void processElement(ProcessContext pc) {
    String msg =  pc.element();
    String response = searchByProductId("ItemA26","INV360_Org_A27");
    pc.output(response);
}

public String searchByProductId(String productId, String orgId) {
    SearchRequest searchRequest = new SearchRequest();
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

    QueryBuilder queryBuilder = QueryBuilders
            .boolQuery()
            .must(QueryBuilders
                    .matchQuery("inventory_audit.PRODUCT_ID", productId)
            );

    searchSourceBuilder.query(queryBuilder);

    searchRequest.source(searchSourceBuilder);
    SearchResponse response = null;
    try{
        response = elasticsearchRestClientConfig.client().search(searchRequest, RequestOptions.DEFAULT);
    }catch (Exception e){

    }

    return response.toString();
}

}

Please try to use StackOverflow or user@beam.apache.org in the future for questions related to usage.

Can you please give us one example project of using Elasticsearch in apache beam with both read, write, and search functionalities...
I tried as you but getting below error:
Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequest

@nithinsridhar and @nitinsridar are the same users

I'm Using below version which is not working:
Apache beam version = 2.19.0
Elasticsearch = 7.5.1

This might be the reason I guess it's not working.

I tried below versions: and its working
Apache beam = 2.13.0
Elasticsearch = 6.0.0