/elasticsearch-java-demo

<artifactId>elasticsearch-java</artifactId> 使用示例

Primary LanguageJava

elasticsearch-java 使用示例

maven 依赖

<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>7.17.17</version>
</dependency>

ESUtil.java

package com.wangrui027.utils;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.Builder;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * ES 工具封装
 */
@Builder
public class ESUtil {

    /**
     * ES 服务器 IP
     */
    private String ip;

    /**
     * ES 服务器端口号
     */
    @Builder.Default
    private Integer port = 9200;

    /**
     * http 请求协议,不传则为 http
     */
    @Builder.Default
    private String scheme = "http";

    /**
     * ES 服务器用户
     */
    private String username;

    /**
     * ES 服务器密码
     */
    private String password;

    /**
     * 默认索引名,仅操作一个索引时可以对该属性赋值,后续操作即可以不传索引名称
     */
    private String indicesName;

    private final AtomicReference<ElasticsearchTransport> transport = new AtomicReference<>(null);
    /**
     * 客户端对象
     */
    private final AtomicReference<ElasticsearchClient> client = new AtomicReference<>(null);

    public static ESUtil of(Function<ESUtilBuilder, ESUtilBuilder> fn) {
        return fn.apply(new ESUtilBuilder()).build();
    }

    /**
     * 获取 ES 客户端
     *
     * @return ES 客户端
     */
    public ElasticsearchClient getClient() {
        if (client.get() == null) {
            RestClientBuilder builder = RestClient
                    .builder(new HttpHost(ip, port, scheme))
                    .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                            .setDefaultHeaders(Collections.singletonList(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())))
                            .addInterceptorLast((HttpResponseInterceptor) (request, context) -> request.addHeader("X-Elastic-Product", "Elasticsearch"))
                    );
            if (username != null && password != null) {
                builder.setDefaultHeaders(new Header[]{
                        new BasicHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8))),
                });
            }
            transport.set(new RestClientTransport(builder.build(), new JacksonJsonpMapper()));
            client.set(new ElasticsearchClient(transport.get()));
        }
        return client.get();
    }

    /**
     * 关闭 transport
     *
     * @throws IOException
     */
    public void close() throws IOException {
        if (transport.get() != null) {
            transport.get().close();
        }
    }

    /**
     * ping ES 服务器
     *
     * @return 是否 ping 通
     * @throws IOException
     */
    public boolean ping() throws IOException {
        BooleanResponse response = getClient().ping();
        return response.value();
    }

    /**
     * 检测索引是否存在
     *
     * @param indicesName 索引名
     * @return 索引是否存在
     * @throws IOException
     */
    public boolean indicesExist(String indicesName) throws IOException {
        ElasticsearchIndicesClient indicesClient = getClient().indices();
        BooleanResponse response = indicesClient.exists(ExistsRequest.of(e -> e.index(indicesName)));
        return response.value();
    }

    /**
     * 检测索引是否存在
     *
     * @return 索引是否存在
     * @throws IOException
     */
    public boolean indicesExist() throws IOException {
        return indicesExist(indicesName);
    }

    /**
     * 创建索引
     *
     * @param indicesName 索引名
     * @return 索引是否创建成功
     * @throws IOException
     */
    public boolean indicesCreate(String indicesName) throws IOException {
        return indicesCreate(indicesName, null);
    }

    /**
     * 创建索引
     *
     * @return 索引是否创建成功
     * @throws IOException
     */
    public boolean indicesCreate() throws IOException {
        return indicesCreate(indicesName);
    }

    /**
     * 创建索引
     *
     * @param indicesName 索引名
     * @param jsonBody    创建索引的请求 body
     * @return 索引是否创建成功
     * @throws IOException
     */
    public boolean indicesCreateWithJson(String indicesName, String jsonBody) throws IOException {
        return indicesCreate(indicesName, jsonBody);
    }

    /**
     * 创建索引
     *
     * @param jsonBody 创建索引的请求 body
     * @return 索引是否创建成功
     * @throws IOException
     */
    public boolean indicesCreateWithJson(String jsonBody) throws IOException {
        return indicesCreate(indicesName, jsonBody);
    }

    /**
     * 创建索引
     *
     * @param indicesName 索引名
     * @param jsonBody    创建索引的请求 body
     * @return 索引是否创建成功
     * @throws IOException
     */
    public boolean indicesCreate(String indicesName, String jsonBody) throws IOException {
        ElasticsearchIndicesClient indicesClient = getClient().indices();
        CreateIndexRequest.Builder builder = new CreateIndexRequest.Builder().index(indicesName);
        if (jsonBody != null) {
            try (InputStream is = new ByteArrayInputStream(jsonBody.getBytes(StandardCharsets.UTF_8))) {
                builder.withJson(is);
            }
        }
        CreateIndexResponse response = indicesClient.create(builder.build());
        return response.acknowledged();
    }

    /**
     * 删除索引
     *
     * @param indicesName 索引名
     * @return 索引是否删除成功
     * @throws IOException
     */
    public boolean indicesDelete(String indicesName) throws IOException {
        ElasticsearchIndicesClient indicesClient = getClient().indices();
        DeleteIndexResponse response = indicesClient.delete(DeleteIndexRequest.of(e -> e.index(indicesName)));
        return response.acknowledged();
    }

    /**
     * 删除索引
     *
     * @return 索引是否删除成功
     * @throws IOException
     */
    public boolean indicesDelete() throws IOException {
        return indicesDelete(indicesName);
    }


    /**
     * 统计索引中的文档总数
     *
     * @param indicesName 索引名
     * @return 文档总数
     * @throws IOException
     */
    public long count(String indicesName) throws IOException {
        if (indicesName == null) {
            return count();
        }
        CountResponse count = getClient().count(e -> e.index(indicesName));
        return count.count();
    }

    /**
     * 统计索引中的文档总数
     *
     * @return 文档总数
     * @throws IOException
     */
    public long count() throws IOException {
        return count(indicesName);
    }

    /**
     * 统计整个 ES 服务器中的文档总数
     *
     * @return 文档总数
     * @throws IOException
     */
    public long countAll() throws IOException {
        CountResponse count = getClient().count();
        return count.count();
    }

    /**
     * 保存文档
     *
     * @param indicesName 索引名
     * @param object      要保存的对象
     * @param id          文档 ID
     * @return 文档 ID
     * @throws IOException
     */
    public <T> String save(String indicesName, T object, String id) throws IOException {
        IndexResponse response = getClient().index(e -> e
                .index(indicesName)
                .id(id)
                .document(object)
        );
        return response.id();
    }

    /**
     * 保存文档
     *
     * @param indicesName 索引名
     * @param object      要保存的对象
     * @return 文档 ID
     * @throws IOException
     */
    public <T> String save(String indicesName, T object) throws IOException {
        return save(indicesName, object, null);
    }

    /**
     * 保存文档
     *
     * @param object 要保存的对象
     * @return 文档 ID
     * @throws IOException
     */
    public <T> String save(T object, String id) throws IOException {
        return save(indicesName, object, id);
    }

    /**
     * 保存文档
     *
     * @param object 要保存的对象
     * @return 文档 ID
     * @throws IOException
     */
    public <T> String save(T object) throws IOException {
        return save(indicesName, object, null);
    }

    /**
     * 更新文档
     *
     * @param indicesName 索引名
     * @param id          文档id
     * @param object      文档对象
     * @return 是否更新成功
     * @throws IOException
     */
    public <T> boolean update(String indicesName, String id, T object) throws IOException {
        UpdateResponse<?> response = getClient().update(UpdateRequest.of(ur -> ur
                .index(indicesName)
                .id(id)
                .doc(object)), object.getClass());
        return Result.Updated.equals(response.result());
    }

    /**
     * 更新文档
     *
     * @param id     文档id
     * @param object 文档对象
     * @return 是否更新成功
     * @throws IOException
     */
    public <T> boolean update(String id, T object) throws IOException {
        return update(indicesName, id, object);
    }

    /**
     * 删除文档
     *
     * @param indicesName 索引名
     * @param id          文档 ID
     * @return 文档是否删除成功
     * @throws IOException
     */
    public boolean delete(String indicesName, String id) throws IOException {
        DeleteResponse response = getClient().delete(e -> e
                .index(indicesName)
                .id(id)
        );
        return Result.Deleted.equals(response.result());
    }

    /**
     * 删除文档
     *
     * @param id 文档 ID
     * @return 文档是否删除成功
     * @throws IOException
     */
    public boolean delete(String id) throws IOException {
        return delete(indicesName, id);
    }

    /**
     * 批量删除文档
     *
     * @param indicesName 索引名
     * @param ids         批量删除的文档 ID 集合
     * @return 批量操作返回对象
     * @throws IOException
     */
    public BulkResponse delete(String indicesName, Collection<String> ids) throws IOException {
        List<BulkOperation> operations = ids.stream().map(id -> BulkOperation.of(bo -> bo
                .delete(e -> e
                        .index(indicesName)
                        .id(id)))
        ).collect(Collectors.toList());
        return bulk(operations);
    }

    /**
     * 批量删除文档
     *
     * @param ids 批量删除的文档 ID 集合
     * @return 批量操作返回对象
     * @throws IOException
     */
    public BulkResponse delete(Collection<String> ids) throws IOException {
        return delete(indicesName, ids);
    }

    /**
     * 批量保存文档
     *
     * @param indicesName 索引名
     * @param list        批量保存的文档集合
     * @return 批量操作返回对象
     * @throws IOException
     */
    public <T> BulkResponse save(String indicesName, List<T> list) throws IOException {
        return save(indicesName, list, Arrays.asList(new String[list.size()]));
    }

    /**
     * 批量保存文档
     *
     * @param indicesName 索引名
     * @param list        批量保存的文档集合
     * @param ids         批量保存的 ID 集合
     * @return 批量操作返回对象
     * @throws IOException
     */
    public <T> BulkResponse save(String indicesName, List<T> list, List<String> ids) throws IOException {
        if (list == null || list.isEmpty()) {
            return null;
        }
        if (ids == null) {
            ids = Arrays.asList(new String[list.size()]);
        } else if (ids.size() != list.size()) {
            throw new RuntimeException("ids 集合大小和 list 集合大小不一致,bulk 中止");
        }
        List<BulkOperation> operations = new ArrayList<>();
        for (int i = 0; i < list.size(); i++) {
            int finalI = i;
            List<String> finalIds = ids;
            BulkOperation operation = BulkOperation.of(bo -> bo.index(e -> e
                    .index(indicesName)
                    .id(finalIds.get(finalI))
                    .document(list.get(finalI))
            ));
            operations.add(operation);
        }
        return bulk(operations);
    }

    /**
     * 批量操作
     *
     * @param operations 批量操作
     * @return 批量操作返回对象
     * @throws IOException
     */
    public BulkResponse bulk(List<BulkOperation> operations) throws IOException {
        return getClient().bulk(BulkRequest.of(br -> br.operations(operations)));
    }

    /**
     * 批量保存文档
     *
     * @param list 批量保存的文档集合
     * @return 批量操作返回对象
     * @throws IOException
     */
    public <T> BulkResponse save(List<T> list) throws IOException {
        return save(indicesName, list, Arrays.asList(new String[list.size()]));
    }

    /**
     * 批量保存文档
     *
     * @param list 批量保存的文档集合
     * @param ids  批量保存的 ID 集合
     * @return 批量操作返回对象
     * @throws IOException
     */
    public <T> BulkResponse save(List<T> list, List<String> ids) throws IOException {
        return save(indicesName, list, ids);
    }

    /**
     * 通过 ID 检索文档
     *
     * @param indicesName 索引名
     * @param id          文档 ID
     * @param tClass      返回的对象类型
     * @return 文档对象
     * @throws IOException
     */
    public <T> T getById(String indicesName, String id, Class<T> tClass) throws IOException {
        GetResponse<T> response = getClient().get((gr -> gr.index(indicesName).id(id)), tClass);
        return response.source();
    }

    /**
     * 通过 ID 检索文档
     *
     * @param id     文档 ID
     * @param tClass 返回的对象类型
     * @return 文档对象
     * @throws IOException
     */
    public <T> T getById(String id, Class<T> tClass) throws IOException {
        return getById(indicesName, id, tClass);
    }

    /**
     * 通条件检索文档
     *
     * @param searchRequest 检索条件
     * @param tClass        返回的对象类型
     * @return 文档对象集合
     * @throws IOException
     */
    public <T> List<T> search(SearchRequest searchRequest, Class<T> tClass) throws IOException {
        SearchResponse<T> response = getClient().search(searchRequest, tClass);
        return response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
    }

}