引言

尽管Elasticsearch官方在8.x版本中推荐使用新的elasticsearch-java客户端,但许多遗留项目仍依赖elasticsearch-rest-high-level-client(以下简称High Level Client)。本文将手把手教你开发一个Spring Boot Starter,集成High Level Client,并封装通用工具类,支持索引管理、文档操作和复杂查询。。


第一部分:High Level Rest Client与Spring Boot集成背景
1.1 High Level Client的定位与现状
  • 功能特性:High Level Client是Elasticsearch 7.x及以下版本的官方Java客户端,提供丰富的API(如IndexRequestSearchRequest),支持同步/异步操作。
  • 版本兼容性:适用于ES 7.x版本,在8.x中已被标记为Deprecated,但仍是许多项目的技术选型。
  • 适用场景:需要直接操作ES原生API的复杂查询场景,或从旧版本迁移的过渡期项目。
1.2 开发目标
  • 简化配置:通过Starter一键注入RestHighLevelClient实例。
  • 工具类封装:提供CRUD、批量操作、条件查询等通用方法。
  • 扩展性:支持多集群连接、请求日志拦截、连接池优化。

第二部分:搭建Elasticsearch Starter项目
2.1 项目初始化与依赖管理

创建Maven项目es-highlevel-spring-boot-starter,添加以下依赖:

<dependencies>
    <!-- Spring Boot基础依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
        <version>2.7.0</version>
    </dependency>
    <!-- Elasticsearch High Level Client -->
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.17.7</version>
    </dependency>
    <!-- 连接池优化 -->
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.13</version>
    </dependency>
</dependencies>
2.2 定义属性配置类

通过@ConfigurationProperties绑定ES连接参数:

@ConfigurationProperties(prefix = "spring.elasticsearch")
public class ElasticsearchProperties {
    private String[] hosts = {"localhost:9200"};
    private String username;
    private String password;
    private int connectTimeout = 5000; // 连接超时时间(毫秒)
    private int socketTimeout = 10000; // 请求超时时间
    
    // Getter和Setter省略
}
2.3 自动配置类实现

创建ElasticsearchAutoConfiguration,初始化RestHighLevelClient

@Configuration
@EnableConfigurationProperties(ElasticsearchProperties.class)
@ConditionalOnClass(RestHighLevelClient.class)
public class ElasticsearchAutoConfiguration {

    @Bean(destroyMethod = "close")
    @ConditionalOnMissingBean
    public RestHighLevelClient restHighLevelClient(ElasticsearchProperties properties) {
        // 解析主机配置
        List<HttpHost> httpHosts = Arrays.stream(properties.getHosts())
                .map(host -> {
                    String[] parts = host.split(":");
                    return new HttpHost(parts[0], Integer.parseInt(parts[1]), "http");
                })
                .collect(Collectors.toList());

        // 配置连接池和认证
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]))
                .setRequestConfigCallback(requestConfigBuilder -> 
                    requestConfigBuilder
                        .setConnectTimeout(properties.getConnectTimeout())
                        .setSocketTimeout(properties.getSocketTimeout())
                )
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    // 基础认证
                    if (StringUtils.hasText(properties.getUsername())) {
                        CredentialsProvider provider = new BasicCredentialsProvider();
                        provider.setCredentials(AuthScope.ANY,
                                new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword()));
                        httpClientBuilder.setDefaultCredentialsProvider(provider);
                    }
                    // 连接池配置
                    return httpClientBuilder
                            .setMaxConnTotal(50)
                            .setMaxConnPerRoute(10);
                });

        return new RestHighLevelClient(builder);
    }
}
2.4 注册自动配置

注意:springboot3以后设置spring.factories.

src/main/resources/META-INF/spring.factories中添加:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.eshighlevel.ElasticsearchAutoConfiguration

第三部分:封装Elasticsearch工具类
3.1 工具类设计目标
  • 简化API调用:封装高频操作(如索引文档、批量插入、复杂查询)。
  • 异常统一处理:捕获ES异常并转换为业务异常。
  • 支持泛型:自动序列化/反序列化POJO。
3.2 核心工具类实现
@Component
public class ElasticsearchTemplate {

    @Autowired
    private RestHighLevelClient client;

    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * 创建索引
     */
    public boolean createIndex(String indexName, Map<String, Object> mapping) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(indexName)
                .mapping(mapping);
        return client.indices().create(request, RequestOptions.DEFAULT).isAcknowledged();
    }

    /**
     * 索引文档(自动生成ID)
     */
    public String indexDocument(String index, Object document) throws IOException {
        IndexRequest request = new IndexRequest(index)
                .source(objectMapper.convertValue(document, Map.class));
        IndexResponse response = client.index(request, RequestOptions.DEFAULT);
        return response.getId();
    }

    /**
     * 条件查询
     */
    public <T> List<T> search(String index, QueryBuilder queryBuilder, Class<T> clazz) throws IOException {
        SearchRequest request = new SearchRequest(index)
                .source(new SearchSourceBuilder().query(queryBuilder));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        return Arrays.stream(response.getHits().getHits())
                .map(hit -> objectMapper.convertValue(hit.getSourceAsMap(), clazz))
                .collect(Collectors.toList());
    }

    /**
     * 批量插入
     */
    public BulkResponse bulkIndex(String index, List<?> documents) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (Object doc : documents) {
            bulkRequest.add(new IndexRequest(index).source(objectMapper.convertValue(doc, Map.class)));
        }
        return client.bulk(bulkRequest, RequestOptions.DEFAULT);
    }
}
package com.toplion.archives.utils;

import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.http.HttpStatus;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;

/**
 * @author chang
 * @since 2021/8/12
 * es操作工具类
 */

@Slf4j
public class EsUtil {

    //在 nacos.properties 文件中配置的持久化类型模板
    private static final String PERSIST_TYPE_TEMP = "es.support.datasource";

    public static RestHighLevelClient getClient() {
        RestHighLevelClient bean = SpringUtil.getBean(RestHighLevelClient.class);
        bean.indices();
        return bean;
    }


    public static <T> IndexResponse save(String index, T object, String id) {
        IndexResponse response = null;
        String json = JSONObject.toJSONString(object, SerializerFeature.DisableCircularReferenceDetect);
        try {
            RestHighLevelClient client = getClient();
            IndexRequest indexRequest = new IndexRequest(index).source(json, XContentType.JSON);
            if (StringUtils.isNotEmpty(id)) {
                indexRequest.id(id);
            }
            response = client.index(indexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
            log.error(e.getMessage());
            log.error("保存错误, 索引: [{}], 文档: [{}]", index, json);
        }
        return response;
    }
//    updateByQuery
    /**
     * 异步
     *
     * @return
     */
    public static BulkByScrollResponse updateByQuery(String index, Long typeId,QueryBuilder queryBuilder) {
        try {
            RestHighLevelClient client = getClient();
            UpdateByQueryRequest request = new UpdateByQueryRequest(index);
            //设置版本冲突时继续执行
            request.setConflicts("proceed");
            //设置更新完成后刷新索引 ps很重要如果不加可能数据不会实时刷新
            request.setRefresh(true);
            request.setQuery(queryBuilder);
            request.setScript(new Script("ctx._source['documentTypeId']='" + typeId + "'"));
            BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
            return bulkByScrollResponse;
        } catch (Exception e) {
            log.info("当前记录不存在==========");
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 异步
     *
     * @param index
     * @param key
     * @param value
     * @param id
     * @return
     */
    public static UpdateResponse update(String index, String key, Object value, String id) {
        try {
            RestHighLevelClient client = getClient();
            UpdateRequest request = new UpdateRequest();
            request.index(index) //索引名
                    .id(id)//id
                    .doc(
                            XContentFactory.jsonBuilder()
                                    .startObject()
                                    .field(key, value)//要修改的字段 及字段值
                                    .endObject()
                    ).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).retryOnConflict(3).docAsUpsert(true);
            UpdateResponse update = client.update(request, RequestOptions.DEFAULT);
            return update;
        } catch (Exception e) {
            log.info("当前记录不存在==========");
            e.printStackTrace();
        }
        return null;
    }

    public static UpdateResponse update(String index, Map map, String id) {
        try {
            RestHighLevelClient client = getClient();
            UpdateRequest request = new UpdateRequest();
            XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject();
            for (Object key : map.keySet()) {
                xContentBuilder.field(key.toString(), map.get(key));
            }
            xContentBuilder.endObject();
            request.index(index) //索引名
                    .id(id)//id
                    .doc(xContentBuilder);
            UpdateResponse update = client.update(request, RequestOptions.DEFAULT);
            return update;
        } catch (Exception e) {
            log.info("当前记录不存在==========");
            e.printStackTrace();
        }
        return null;
    }
    public static BulkByScrollResponse recoverBatchByIds(String index, QueryBuilder queryBuilder) {
        try {
            RestHighLevelClient client = getClient();
            UpdateByQueryRequest request = new UpdateByQueryRequest();
            request.indices(index);
            request.setQuery(queryBuilder);
            request.setBatchSize(20);
            //设置版本冲突时继续执行
            request.setConflicts("proceed");
            //设置更新完成后刷新索引 ps很重要如果不加可能数据不会实时刷新
            request.setRefresh(true);
            request.setScript(new Script("ctx._source['deleteFlag']='0'"));
            BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
            return bulkByScrollResponse;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    public static BulkByScrollResponse updateBatchByIds(String index, QueryBuilder queryBuilder) {
        try {
            RestHighLevelClient client = getClient();
            UpdateByQueryRequest request = new UpdateByQueryRequest();
            request.indices(index);
            request.setQuery(queryBuilder);
            request.setBatchSize(20);
            //设置版本冲突时继续执行
            request.setConflicts("proceed");
            //设置更新完成后刷新索引 ps很重要如果不加可能数据不会实时刷新
            request.setRefresh(true);
            request.setScript(new Script("ctx._source['deleteFlag']='1'"));
            BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
            return bulkByScrollResponse;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 创建索引和映射
     *
     * @param index
     * @param mapping
     * @return
     */
    public static CreateIndexResponse createIndex(String index, Map<String, ?> mapping) {
        RestHighLevelClient client = getClient();
        CreateIndexRequest request = new CreateIndexRequest(index);
        settingBuilder(request);
        request.mapping(mapping);
        CreateIndexResponse createIndexResponse = null;
        try {
            createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return createIndexResponse;
    }

    /**
     * @param
     * @return
     * @author CHENSL
     * @date 2022/1/27 10:16
     */
    public static void settingBuilder(CreateIndexRequest request) {
        request.settings(Settings.builder()
                .put("number_of_shards", 1)
                .put("number_of_replicas", 1)
                .put("analysis.analyzer.special_analyzer.tokenizer", "ngram_tokenizer")
                .put("analysis.tokenizer.ngram_tokenizer.type", "ngram")
                .put("analysis.tokenizer.ngram_tokenizer.min_gram", 1)
                .put("analysis.tokenizer.ngram_tokenizer.max_gram", 2)
        );
    }

    /**
     * 强制创建索引
     *
     * @param index
     * @param mapping
     * @return
     */
    public static CreateIndexResponse forceCreateIndex(String alias, String index, Map<String, ?> mapping) {
        if (isExistsIndex(index)) {
            EsUtil.getIndexTodelete(alias);
        }

        if (mapping == null) {
            throw new RuntimeException("索引:" + index + "数据为空,无法抽取");
        }
        return createIndex(index, mapping);
    }

    public static void getIndexTodelete(String alias) {
        RestHighLevelClient client = getClient();
        GetIndexRequest indexRequest = new GetIndexRequest(alias);
        GetIndexResponse getIndexResponse = null;
        try {
            getIndexResponse = client.indices().get(indexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ElasticsearchException e) {
            e.printStackTrace();
        }
        if (getIndexResponse != null) {
            String[] indices = getIndexResponse.getIndices();
            for (String index : indices) {
                deleteIndex(index);
            }
        }
    }

    /**
     * 创建索引和映射
     *
     * @param index
     * @param mapping
     * @return
     */
    public static CreateIndexResponse createNewIndex(String index, Map<String, ?> mapping) {
        RestHighLevelClient client = getClient();
        CreateIndexRequest request = new CreateIndexRequest(index);
        settingBuilder(request);
        request.mapping(mapping);
        CreateIndexResponse createIndexResponse = null;
        try {
            createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return createIndexResponse;
    }

    /**
     * 创建别名
     */
    public static boolean createAlias(String index, String alias) {
        RestHighLevelClient client = getClient();
        IndicesAliasesRequest request = new IndicesAliasesRequest();
        IndicesAliasesRequest.AliasActions aliasAction =
                new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                        .index(index)
                        .alias(alias);
        request.addAliasAction(aliasAction);
        try {
            AcknowledgedResponse indicesAliasesResponse =
                    client.indices().updateAliases(request, RequestOptions.DEFAULT);
            return indicesAliasesResponse.isAcknowledged();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }


    /**
     * 更新索引
     *
     * @Param indexName:索引名,list: 索引字段
     */
    public static <T> AcknowledgedResponse updateIndex(String index, Map<String, ?> mapping) {
        RestHighLevelClient client = getClient();
        PutMappingRequest request = new PutMappingRequest(index);
        request.source(mapping);
        AcknowledgedResponse putMappingResponse = null;
        try {
            putMappingResponse = client.indices().putMapping(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return putMappingResponse;
    }


    /**
     * 删除索引
     *
     * @param index
     * @return
     */
    public static AcknowledgedResponse deleteIndex(String index) {
        RestHighLevelClient client = getClient();
        DeleteIndexRequest deleteRequest = new DeleteIndexRequest(index);
        AcknowledgedResponse deleteIndexResponse = null;
        try {
            deleteIndexResponse = client.indices().delete(deleteRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return deleteIndexResponse;
    }

    public static <T> BulkResponse deleteByIds(String index, List<T> ids) {
        try {
            RestHighLevelClient client = getClient();
            BulkRequest bulkRequest = new BulkRequest(index);
            for (T id : ids) {
                if (Objects.isNull(id)) {
                    log.error("删除错误, 索引: [{}], 文档ID为空", index);
                    continue;
                }
                DeleteRequest deleteRequest = new DeleteRequest(index);
                deleteRequest.id((String) id);
                bulkRequest.add(deleteRequest);
            }
            BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
            return bulk;
        } catch (IOException e) {
            e.printStackTrace();
            log.error(e.getMessage());
            log.error("删除错误, 索引: [{}], 文档ID: [{}]", index);
        }
        return null;
    }

    public static DeleteResponse delete(String index, String id) {
        DeleteResponse deleteResponse = null;
        try {
            RestHighLevelClient client = getClient();
            DeleteRequest deleteRequest = new DeleteRequest(index);
            deleteRequest.id(id);
            deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
            log.error(e.getMessage());
            log.error("删除错误, 索引: [{}], 文档ID: [{}]", index, id);
        }
        return deleteResponse;
    }


    //根据id查询
    public static <T> T get(String index, String id, Class<T> clzss) {
        try {
            RestHighLevelClient client = getClient();
            GetRequest getRequest = new GetRequest(index, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
            if (getResponse.isExists()) {
                String source = getResponse.getSourceAsString();
                return JSONObject.parseObject(source, clzss);
            }
        } catch (IOException e) {
            e.printStackTrace();
            log.error(e.getMessage());
            log.error("获取错误, 索引: [{}], 文档ID: [{}]", index, id);
        }
        return null;
    }

    /**
     * 判断索引是否存在
     *
     * @Param indexName:索引名
     */
    public static boolean isExistsIndex(String indexName) {
        try {
            RestHighLevelClient client = getClient();
            GetIndexRequest request = new GetIndexRequest(indexName);
            boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
            return exists;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 查询
     *
     * @param index
     * @param searchSourceBuilder
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> List<T> search(String index, SearchSourceBuilder searchSourceBuilder, Class<T> clazz) {
        SearchResponse response = getSearchResponse(index, searchSourceBuilder);
        SearchHits hits = response.getHits();
        List<T> list = new ArrayList<T>((int) hits.getTotalHits().value);
        Iterator<SearchHit> iterator = hits.iterator();
        while (iterator.hasNext()) {
            SearchHit searchHit = iterator.next();
            String source = searchHit.getSourceAsString();
            T t = JSONObject.parseObject(source, clazz);
            list.add(t);
        }
        return list;
    }

    public static <T> Page<T> searchPage(String index, SearchSourceBuilder searchSourceBuilder, Class<T> clazz) {
        SearchResponse response = getSearchResponse(index, searchSourceBuilder);
        SearchHits hits = response.getHits();
        List<T> list = new ArrayList<T>((int) hits.getTotalHits().value);
        Iterator<SearchHit> iterator = hits.iterator();
        while (iterator.hasNext()) {
            SearchHit searchHit = iterator.next();
            String source = searchHit.getSourceAsString();
            T t = JSONObject.parseObject(source, clazz);
            list.add(t);
        }
        Page<T> page = new Page<>();
        page.setRecords(list);
        page.setTotal(hits.getTotalHits().value);
        return page;
    }

    private static SearchResponse getSearchResponse(String index, SearchSourceBuilder searchSourceBuilder) {
        SearchResponse response = null;
        try {
            RestHighLevelClient client = getClient();
            SearchRequest searchRequest = new SearchRequest(index);
            searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH);
            searchRequest.source(searchSourceBuilder);
            response = client.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }


}

3.3 扩展功能:注解自动映射

通过自定义注解实现实体类与索引的自动关联:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface EsDocument {
    String indexName();
    String type() default "_doc";
}

在工具类中根据注解处理元数据:

public String getIndexName(Class<?> clazz) {
    EsDocument annotation = clazz.getAnnotation(EsDocument.class);
    return annotation != null ? annotation.indexName() : clazz.getSimpleName().toLowerCase();
}

第四部分:高级功能扩展
4.1 多数据源支持

通过动态配置支持连接多个ES集群:

@Bean("secondaryEsClient")
@ConfigurationProperties(prefix = "spring.elasticsearch.secondary")
public RestHighLevelClient secondaryEsClient() { 
    // 配置逻辑同主数据源
}
4.2 请求日志拦截

记录ES请求的详细日志用于调试:

RestClient restClient = client.getLowLevelClient();
restClient.setRequestLogger(new RequestLogger() {
    @Override
    public void logResponse(HttpRequest request, HttpResponse response, HttpContext context) {
        log.info("ES请求:{} {},响应状态:{}", 
            request.getRequestLine().getMethod(),
            request.getRequestLine().getUri(),
            response.getStatusLine().getStatusCode());
    }
});
4.3 健康检查集成

通过Actuator暴露ES健康状态:

@Bean
public HealthIndicator esHealthIndicator(RestHighLevelClient client) {
    return () -> {
        try {
            boolean alive = client.ping(RequestOptions.DEFAULT);
            return alive ? Health.up().build() : Health.down().build();
        } catch (IOException e) {
            return Health.down(e).build();
        }
    };
}

第五部分:测试与应用
5.1 引入Starter依赖

在业务项目中添加:

<dependency>
    <groupId>com.example</groupId>
    <artifactId>es-highlevel-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>
5.2 配置文件示例
spring:
  elasticsearch:
    hosts:
      - "es-node1:9200"
      - "es-node2:9200"
    username: "admin"
    password: "securepass"
    connect-timeout: 3000
    socket-timeout: 5000
5.3 业务代码示例
@EsDocument(indexName = "products")
public class Product {
    private String id;
    private String name;
    private Double price;
}

@Service
public class ProductService {
    @Autowired
    private ElasticsearchTemplate esTemplate;

    public void saveProduct(Product product) throws IOException {
        esTemplate.indexDocument(esTemplate.getIndexName(Product.class), product);
    }
}

第六部分:最佳实践与问题排查
6.1 最佳实践
  • 版本对齐:确保ES服务端与客户端版本一致。
  • 连接池调优:根据业务负载调整MaxConnTotalMaxConnPerRoute
  • 索引设计:预先定义Mapping和Settings,避免动态推断。
6.2 常见问题
  • Q:客户端抛出NoNodeAvailableException
    A:检查ES节点地址是否正确,防火墙是否开放端口。
  • Q:批量插入性能差?
    A:增大批量操作的文档数(建议每批500-1000条),或调整线程池配置。
  • Q:如何处理字段类型冲突?
    A:使用@EsDocument预先定义索引Mapping,或在插入数据前创建索引。

第七部分:总结

通过封装elasticsearch-rest-high-level-client的Spring Boot Starter,开发者可以快速集成ES功能,同时通过工具类屏蔽底层API的复杂性。本文提供的方案支持灵活配置、多数据源管理和生产级优化,适用于中大型项目。未来可扩展的方向包括:

  • 异步操作支持:封装RestClient的异步API。
  • 索引生命周期管理:自动创建/更新索引。
  • 监控集成:对接Prometheus监控ES性能指标。
Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐