在Spring Boot项目中整合Elasticsearch,要确保Springboot版本与Elasticsearch客户端兼容。我用的是Elasticsearch 6.2.2版本,Springboot 2.1版本

1.添加依赖

        在pom.xml文件中添加Spring Data Elasticsearch的依赖

	    <!--	elasticsearch	-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
			<version>2.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>elasticsearch-rest-high-level-client</artifactId>
			<version>6.2.2</version>
		</dependency>

2.配置application.yml

spring:
  data:
    elasticsearch:
      repositories:
        enabled: true
      cluster-nodes:  127.0.0.1:9300

3.创建配置类

import org.apache.http.HttpHost;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import javax.annotation.PostConstruct;
/**
 * ES的配置类
 */
@Configuration
//指定RedisConfig文件
@AutoConfigureBefore(RedisConfig.class)
@EnableCaching
public class ElasticsearchConfig {
    /**
     * 如果@Bean没有指定bean的名称,那么这个bean的名称就是方法名
     */
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );
    }
    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }
    @Bean(name = "elasticsearchTemplate")
    public ElasticsearchTemplate elasticsearchTemplate(Client client,ElasticsearchConverter converter) {
        try {
            return new ElasticsearchTemplate(client, converter);
        } catch (Exception ex) {
            throw new IllegalStateException(ex);
        }
    }
    @Bean
    public ElasticsearchConverter elasticsearchConverter(
            SimpleElasticsearchMappingContext mappingContext) {
        return new MappingElasticsearchConverter(mappingContext);
    }
    @Bean
    public SimpleElasticsearchMappingContext mappingContext() {
        return new SimpleElasticsearchMappingContext();
    }
}

4.工具类

package com.isdepci.project.elasticsearch.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.util.StringUtil;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.Charset;

public class EsBackupUtils {
    //备份地址
    public static String ES_BAK_PATH = "D:/elasticsearch-6.2.2/backups/es_backup";
    //ES地址
    public static String ES_URL = "http://127.0.0.1:9200";
    //备份仓库
    public static String ES_REPO_NAME = "es_backup";

    /**
     * 查询索引列表
     */
    public List<JSONObject> selectIndicesList() {
        ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster()
                .prepareState().get().getState().getMetaData().getIndices();
       String[] indicesNames = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices();
        List<JSONObject> result = new ArrayList<>();
        for (String name : indicesNames) {
            JSONObject item = new JSONObject();
            item.put("name", name);
            IndexMetaData info = indices.get(name);
            if (info == null) {
                item.put("state", "NULL");
                item.put("numberOfShards", 0);
                item.put("numberOfReplicas", 0);
                item.put("index", "");
                result.add(item);
                continue;
            }
            item.put("state", info.getState());
            //索引要做多少个分片,只能在创建索引时指定,后期无法修改。
            item.put("numberOfShards", info.getNumberOfShards());
            //每个分片有多少个副本,后期可以动态修改
            item.put("numberOfReplicas", info.getNumberOfReplicas());
            item.put("index", info.getIndex());
            item.put("aliases", info.getAliases());
            item.put("version", info.getVersion());
            item.put("setting", info.getSettings().toString());
//            item.put("dataInfo", JSONObject.toJSONString(info));
            result.add(item);
        }
        return result;
    }

    /**
     * 备份索引 
     *  1.判断快照仓库是否存在
     *  2.没有则创建快照仓库
     *  3.创建快照   
     */
    public String backupsByIndices(String[] indices,int type) {
        int repoExists = isSnapshotRepoExists();
        if (repoExists == 1) {
            String msg = createSnapshotRepo();
            if (StringUtils.isNotEmpty(msg)) {
                return msg;
            }
        }
        String backupNo = "snapshot_" + DateUtils.dateTimeNow();
       String indicesName;
        if(indices!=null&&indices.length>0){
            indicesName =  String.join(",", indices);
        }else {
            indicesName =  String.join(",", indicesNames);
        }
        String msg = createSnapshot(backupNo, indicesName);
        if (StringUtils.isNotEmpty(msg)) {
            return msg;
        }
        //TODO 记录备份信息
        
        return null;
    }


    /**
     * 判断快照仓库是否存在
     * int   0存在仓库
     *       1不存在仓库
     *       2获取失败
     *       3获取异常
     */
    public static int isSnapshotRepoExists(){
        CloseableHttpClient client = HttpClients.createDefault();
        HttpGet get = new HttpGet( ES_URL+"/_snapshot/" + ES_REPO_NAME);
        CloseableHttpResponse response = null;
        try {
            response = client.execute(get);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                if (response.getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
                    return 1;
                }
                return 2;
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            JSONObject json = JSON.parseObject(result);
            if (json.containsKey(ES_REPO_NAME)) {
                return 0;
            }
            return 1;
        } catch (Exception e) {
            return 3;
        } finally {
            try {
                get.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 创建快照仓库
     */
    public static String createSnapshotRepo()   {
        CloseableHttpClient client =  HttpClients.createDefault();
        HttpPut put = new HttpPut(ES_URL+"/_snapshot/es_backup");
        CloseableHttpResponse response = null;
        try {
            JSONObject sendObj = new JSONObject();
            sendObj.put("type", "fs");
            JSONObject settingObj = new JSONObject();
            settingObj.put("location",ES_BAK_PATH);
            sendObj.put("settings", settingObj);
            ByteArrayEntity entity = new ByteArrayEntity(sendObj.toJSONString().getBytes("UTF-8"));
            entity.setContentType("application/json");
            put.setEntity(entity);
            response = client.execute(put);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                return "创建备份仓库失败!" + response.getStatusLine();
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            if (result.indexOf("acknowledged") == -1 || result.indexOf("true") == -1) {
                return "创建备份仓库失败!" + result;
            }
            return null;
        } catch (Exception e) {
            return "创建备份仓库异常!" + e.getMessage();
        } finally {
            try {
                put.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 创建快照
     *
     * @param snapshotName:快照名称
     */
    public static String createSnapshot(String snapshotName,String  indicesName)  {
        if(StringUtil.isEmpty(indicesName)){
            return "未选择要备份的索引";
        }
        CloseableHttpClient client = HttpClients.createDefault();
        HttpPut put = new HttpPut(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName+"?wait_for_completion=true");
        //特殊操作,设置超时
        RequestConfig.Builder requestConfig = RequestConfig.custom();
        requestConfig.setConnectTimeout(5000);
        requestConfig.setConnectionRequestTimeout(5000);
        requestConfig.setSocketTimeout(360000);
        put.setConfig(requestConfig.build());
        CloseableHttpResponse response = null;
        put.setEntity(new StringEntity("{\"indices\":\""+indicesName+"\"}", ContentType.APPLICATION_JSON));
        try {
            response = client.execute(put);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                return "创建快照失败!" + response.getStatusLine();
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            JSONObject retObj = JSON.parseObject(result);
            JSONObject snapshotObj = retObj.getJSONObject("snapshot");
            if (snapshotObj.containsKey("snapshot") && snapshotObj.getString("snapshot").equals(snapshotName)) {
                return null;
            }
            return "创建快照失败!" + result ;
        } catch (Exception e) {
            return "创建快照异常!" + e.getMessage() ;
        } finally {
            try {
                put.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 查看备份明细
     * curl -XGET http://192.168.43.125:9200/_snapshot/es_backup/snapshot_1
     */
    public static String selectSnapshot(String snapshotName)  {
        CloseableHttpClient client = HttpClients.createDefault();
        HttpGet get = new HttpGet(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName);
        //特殊操作,设置超时
        RequestConfig.Builder requestConfig = RequestConfig.custom();
        requestConfig.setConnectTimeout(5000);
        requestConfig.setConnectionRequestTimeout(5000);
        requestConfig.setSocketTimeout(360000);
        get.setConfig(requestConfig.build());
        CloseableHttpResponse response = null;
        JSONObject result1 = new JSONObject();
        try {
            response = client.execute(get);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                result1.put("msg","查询快照失败!" + response.getStatusLine());
                return result1.toJSONString();
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            JSONObject retObj = JSON.parseObject(result);
            JSONArray snapshots = retObj.getJSONArray("snapshots");
            if(snapshots.size()>0){
                return JSONObject.toJSONString(snapshots.get(0));
            }
            result1.put("msg","查询快照失败!" + result);
            return result1.toJSONString();
        } catch (Exception e) {
            result1.put("msg", "删除快照异常!" + e.getMessage());
            return result1.toJSONString();
        } finally {
            try {
                get.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 还原
     * curl -XPOST http://192.168.43.125:9200/_snapshot/es_backup/snapshot_1/_restore
     */
    public static String restoreSnapshot(String snapshotName)  {
        CloseableHttpClient client = HttpClients.createDefault();
        HttpPost post = new HttpPost(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName+"/_restore");
        //特殊操作,设置超时
        RequestConfig.Builder requestConfig = RequestConfig.custom();
        requestConfig.setConnectTimeout(5000);
        requestConfig.setConnectionRequestTimeout(5000);
        requestConfig.setSocketTimeout(360000);
        post.setConfig(requestConfig.build());
        CloseableHttpResponse response = null;
        try {
            response = client.execute(post);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                return "数据还原失败!" + response.getStatusLine();
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            JSONObject retObj = JSON.parseObject(result);
            Boolean snapshotObj = retObj.getBoolean("accepted");
            if (snapshotObj) {
                return null;
            }
            return "数据还原失败!" + result ;
        } catch (Exception e) {
            return "数据还原异常!" + e.getMessage() ;
        } finally {
            try {
                post.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 删除
     */
    public static String deleteSnapshot(String snapshotName)  {
        CloseableHttpClient client = HttpClients.createDefault();
        HttpDelete del = new HttpDelete(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName);
        //特殊操作,设置超时
        RequestConfig.Builder requestConfig = RequestConfig.custom();
        requestConfig.setConnectTimeout(5000);
        requestConfig.setConnectionRequestTimeout(5000);
        requestConfig.setSocketTimeout(360000);
        del.setConfig(requestConfig.build());
        CloseableHttpResponse response = null;
        try {
            response = client.execute(del);
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
                return null;
            }else if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                return "删除快照失败!" + response.getStatusLine();
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            JSONObject retObj = JSON.parseObject(result);
            Boolean snapshotObj = retObj.getBoolean("acknowledged");
            if (snapshotObj) {
                return null;
            }
            return "删除快照失败!" + result ;
        } catch (Exception e) {
            return "删除快照异常!" + e.getMessage() ;
        } finally {
            try {
                del.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 开启关闭索引
     *  1开启   2关闭
     *  POST http://localhost:9200/<index_name>/_close
     *  POST http://localhost:9200/<index_name>/_open
     */
    public static String openCloseIndices(int type,String indexName)  {
        CloseableHttpClient client = HttpClients.createDefault();
        String url ;
        if(type==1){
            url = ES_URL+"/"+indexName+"/_open";
        }else  if(type==2){
            url = ES_URL+"/"+indexName+"/_close";
        }else{
            return "未知操作状态";
        }
        HttpPost post = new HttpPost(url);
        //特殊操作,设置超时
        RequestConfig.Builder requestConfig = RequestConfig.custom();
        requestConfig.setConnectTimeout(5000);
        requestConfig.setConnectionRequestTimeout(5000);
        requestConfig.setSocketTimeout(360000);
        post.setConfig(requestConfig.build());
        CloseableHttpResponse response = null;
        try {
            response = client.execute(post);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                return "索引操作失败!" + response.getStatusLine();
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            JSONObject retObj = JSON.parseObject(result);
            Boolean snapshotObj = retObj.getBoolean("acknowledged");
            if (snapshotObj) {
                return null;
            }
            return "索引操作失败!" + result ;
        } catch (Exception e) {
            return "索引操作异常!" + e.getMessage() ;
        } finally {
            try {
                post.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 设置查询数量
     * ip:端口/索引名称/_settings
     *
     * PUT 索引名称/_settings
     * {
     *   "index":{
     *     "max_result_window":1000000
     *   }
     * }
     *
     */
    public static String settingMaxRows(String indexName,int rowsNum)  {
        CloseableHttpClient client = HttpClients.createDefault();
        String url ;
        if(StringUtils.isEmpty(indexName)){
            url = ES_URL+"/_settings";
        }else{
            url = ES_URL+"/"+indexName+"/_settings";
        }
        HttpPut put = new HttpPut(url);
        //特殊操作,设置超时
        RequestConfig.Builder requestConfig = RequestConfig.custom();
        requestConfig.setConnectTimeout(5000);
        requestConfig.setConnectionRequestTimeout(5000);
        requestConfig.setSocketTimeout(360000);
        put.setConfig(requestConfig.build());
        put.setEntity(new StringEntity("{\"index\":{\"max_result_window\":"+rowsNum+" }}", ContentType.APPLICATION_JSON));
        CloseableHttpResponse response = null;
        try {
            response = client.execute(put);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                return "设置最大查询数失败!" + response.getStatusLine();
            }
            String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
            JSONObject retObj = JSON.parseObject(result);
            Boolean snapshotObj = retObj.getBoolean("acknowledged");
            if (snapshotObj) {
                return null;
            }
            return "设置最大查询数失败!" + result ;
        } catch (Exception e) {
            return "设置最大查询数异常!" + e.getMessage() ;
        } finally {
            try {
                put.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            if(response!=null){
                try {
                    response.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

5.实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(indexName = "plc_his")
public class PlcPointHisVo implements Serializable {
    @Id
    private String id;
    //标识
    @Field(type = FieldType.Keyword)
    private String plcAddress;
    //时间戳
    @Field(type = FieldType.Keyword)
    private Long timestamp;
    //值
    @Field(type = FieldType.Text, index = false)
    private String val;

}

6.ElasticsearchRepository

import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;

@Component
public interface PlcPointHisVoSearch extends ElasticsearchRepository<PlcPointHisVo,String> {

    /**
     * 自定义方法里By后面的字符串,必须存在泛型类ArticleEsDto里,而且可以用and连接;
     * 比如 findByTitle、findByContent、findByTitleAndContent
     * List<ArticleEsDto> findByP(String title);//会报错
     * 否则报如下错:
     * Caused by: org.springframework.data.mapping.PropertyReferenceException:
     * No property p found for type ArticleEsDto! Did you mean 'id'?
     *
     */
    Page<PlcPointHisVo> getPageByTimestampBetween(long time, long time1, PageRequest of);
    Page<PlcPointHisVo> getPageByPlcAddressAndTimestampBetween(String code, long time, long time1, PageRequest of);
    Page<PlcPointHisVo> getPageByPlcAddressInAndTimestampBetween(String[] codes, long time, long time1, PageRequest of);

}

7.实体类操作Controller

@RestController
@RequestMapping("/es/plcPoint")
public class EsPlcPointController extends BaseController {

    @Autowired
    private PlcPointHisVoSearch plcPointHisVoSearch;

    @GetMapping("history")
    public TableDataInfo selectHistoryData(String code, Integer pageNum, Integer pageSize, Long startTime,Long endTime) {
        if(pageNum==null||pageNum<1){
            pageNum = 0;
        }else {
            pageNum = pageNum-1;
        }
        if(pageSize==null){
            pageSize = 10;
        }
        Sort sort = new Sort(Sort.Direction.ASC, "timestamp");
        Page<PlcPointHisVo> page;
        if(StringUtils.isEmpty(code)){
            page = plcPointHisVoSearch.getPageByTimestampBetween(startTime,endTime, PageRequest.of(pageNum,pageSize,sort));
        }else {
            page = plcPointHisVoSearch.getPageByPlcAddressAndTimestampBetween(code,startTime,endTime,PageRequest.of(pageNum,pageSize,sort));
        }
        TableDataInfo result = getDataTable(page.getContent());
        result.setTotal(page.getTotalElements());
        return result;
    }
}

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐