Spring Boot整合Elasticsearch
在Spring Boot项目中整合Elasticsearch,要确保Springboot版本与Elasticsearch客户端兼容。我用的是Elasticsearch 6.2.2版本,Springboot 2.1版本。
·
在Spring Boot项目中整合Elasticsearch,要确保Springboot版本与Elasticsearch客户端兼容。我用的是Elasticsearch 6.2.2版本,Springboot 2.1版本
1.添加依赖
在pom.xml文件中添加Spring Data Elasticsearch的依赖
<!-- elasticsearch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.2</version>
</dependency>
2.配置application.yml
spring:
data:
elasticsearch:
repositories:
enabled: true
cluster-nodes: 127.0.0.1:9300
3.创建配置类
import org.apache.http.HttpHost;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import javax.annotation.PostConstruct;
/**
* ES的配置类
*/
@Configuration
//指定RedisConfig文件
@AutoConfigureBefore(RedisConfig.class)
@EnableCaching
public class ElasticsearchConfig {
/**
* 如果@Bean没有指定bean的名称,那么这个bean的名称就是方法名
*/
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")
)
);
}
@PostConstruct
void init() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
}
@Bean(name = "elasticsearchTemplate")
public ElasticsearchTemplate elasticsearchTemplate(Client client,ElasticsearchConverter converter) {
try {
return new ElasticsearchTemplate(client, converter);
} catch (Exception ex) {
throw new IllegalStateException(ex);
}
}
@Bean
public ElasticsearchConverter elasticsearchConverter(
SimpleElasticsearchMappingContext mappingContext) {
return new MappingElasticsearchConverter(mappingContext);
}
@Bean
public SimpleElasticsearchMappingContext mappingContext() {
return new SimpleElasticsearchMappingContext();
}
}
4.工具类
package com.isdepci.project.elasticsearch.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.util.StringUtil;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.Charset;
public class EsBackupUtils {
//备份地址
public static String ES_BAK_PATH = "D:/elasticsearch-6.2.2/backups/es_backup";
//ES地址
public static String ES_URL = "http://127.0.0.1:9200";
//备份仓库
public static String ES_REPO_NAME = "es_backup";
/**
* 查询索引列表
*/
public List<JSONObject> selectIndicesList() {
ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster()
.prepareState().get().getState().getMetaData().getIndices();
String[] indicesNames = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices();
List<JSONObject> result = new ArrayList<>();
for (String name : indicesNames) {
JSONObject item = new JSONObject();
item.put("name", name);
IndexMetaData info = indices.get(name);
if (info == null) {
item.put("state", "NULL");
item.put("numberOfShards", 0);
item.put("numberOfReplicas", 0);
item.put("index", "");
result.add(item);
continue;
}
item.put("state", info.getState());
//索引要做多少个分片,只能在创建索引时指定,后期无法修改。
item.put("numberOfShards", info.getNumberOfShards());
//每个分片有多少个副本,后期可以动态修改
item.put("numberOfReplicas", info.getNumberOfReplicas());
item.put("index", info.getIndex());
item.put("aliases", info.getAliases());
item.put("version", info.getVersion());
item.put("setting", info.getSettings().toString());
// item.put("dataInfo", JSONObject.toJSONString(info));
result.add(item);
}
return result;
}
/**
* 备份索引
* 1.判断快照仓库是否存在
* 2.没有则创建快照仓库
* 3.创建快照
*/
public String backupsByIndices(String[] indices,int type) {
int repoExists = isSnapshotRepoExists();
if (repoExists == 1) {
String msg = createSnapshotRepo();
if (StringUtils.isNotEmpty(msg)) {
return msg;
}
}
String backupNo = "snapshot_" + DateUtils.dateTimeNow();
String indicesName;
if(indices!=null&&indices.length>0){
indicesName = String.join(",", indices);
}else {
indicesName = String.join(",", indicesNames);
}
String msg = createSnapshot(backupNo, indicesName);
if (StringUtils.isNotEmpty(msg)) {
return msg;
}
//TODO 记录备份信息
return null;
}
/**
* 判断快照仓库是否存在
* int 0存在仓库
* 1不存在仓库
* 2获取失败
* 3获取异常
*/
public static int isSnapshotRepoExists(){
CloseableHttpClient client = HttpClients.createDefault();
HttpGet get = new HttpGet( ES_URL+"/_snapshot/" + ES_REPO_NAME);
CloseableHttpResponse response = null;
try {
response = client.execute(get);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
return 1;
}
return 2;
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
JSONObject json = JSON.parseObject(result);
if (json.containsKey(ES_REPO_NAME)) {
return 0;
}
return 1;
} catch (Exception e) {
return 3;
} finally {
try {
get.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 创建快照仓库
*/
public static String createSnapshotRepo() {
CloseableHttpClient client = HttpClients.createDefault();
HttpPut put = new HttpPut(ES_URL+"/_snapshot/es_backup");
CloseableHttpResponse response = null;
try {
JSONObject sendObj = new JSONObject();
sendObj.put("type", "fs");
JSONObject settingObj = new JSONObject();
settingObj.put("location",ES_BAK_PATH);
sendObj.put("settings", settingObj);
ByteArrayEntity entity = new ByteArrayEntity(sendObj.toJSONString().getBytes("UTF-8"));
entity.setContentType("application/json");
put.setEntity(entity);
response = client.execute(put);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
return "创建备份仓库失败!" + response.getStatusLine();
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
if (result.indexOf("acknowledged") == -1 || result.indexOf("true") == -1) {
return "创建备份仓库失败!" + result;
}
return null;
} catch (Exception e) {
return "创建备份仓库异常!" + e.getMessage();
} finally {
try {
put.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 创建快照
*
* @param snapshotName:快照名称
*/
public static String createSnapshot(String snapshotName,String indicesName) {
if(StringUtil.isEmpty(indicesName)){
return "未选择要备份的索引";
}
CloseableHttpClient client = HttpClients.createDefault();
HttpPut put = new HttpPut(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName+"?wait_for_completion=true");
//特殊操作,设置超时
RequestConfig.Builder requestConfig = RequestConfig.custom();
requestConfig.setConnectTimeout(5000);
requestConfig.setConnectionRequestTimeout(5000);
requestConfig.setSocketTimeout(360000);
put.setConfig(requestConfig.build());
CloseableHttpResponse response = null;
put.setEntity(new StringEntity("{\"indices\":\""+indicesName+"\"}", ContentType.APPLICATION_JSON));
try {
response = client.execute(put);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
return "创建快照失败!" + response.getStatusLine();
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
JSONObject retObj = JSON.parseObject(result);
JSONObject snapshotObj = retObj.getJSONObject("snapshot");
if (snapshotObj.containsKey("snapshot") && snapshotObj.getString("snapshot").equals(snapshotName)) {
return null;
}
return "创建快照失败!" + result ;
} catch (Exception e) {
return "创建快照异常!" + e.getMessage() ;
} finally {
try {
put.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 查看备份明细
* curl -XGET http://192.168.43.125:9200/_snapshot/es_backup/snapshot_1
*/
public static String selectSnapshot(String snapshotName) {
CloseableHttpClient client = HttpClients.createDefault();
HttpGet get = new HttpGet(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName);
//特殊操作,设置超时
RequestConfig.Builder requestConfig = RequestConfig.custom();
requestConfig.setConnectTimeout(5000);
requestConfig.setConnectionRequestTimeout(5000);
requestConfig.setSocketTimeout(360000);
get.setConfig(requestConfig.build());
CloseableHttpResponse response = null;
JSONObject result1 = new JSONObject();
try {
response = client.execute(get);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
result1.put("msg","查询快照失败!" + response.getStatusLine());
return result1.toJSONString();
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
JSONObject retObj = JSON.parseObject(result);
JSONArray snapshots = retObj.getJSONArray("snapshots");
if(snapshots.size()>0){
return JSONObject.toJSONString(snapshots.get(0));
}
result1.put("msg","查询快照失败!" + result);
return result1.toJSONString();
} catch (Exception e) {
result1.put("msg", "删除快照异常!" + e.getMessage());
return result1.toJSONString();
} finally {
try {
get.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 还原
* curl -XPOST http://192.168.43.125:9200/_snapshot/es_backup/snapshot_1/_restore
*/
public static String restoreSnapshot(String snapshotName) {
CloseableHttpClient client = HttpClients.createDefault();
HttpPost post = new HttpPost(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName+"/_restore");
//特殊操作,设置超时
RequestConfig.Builder requestConfig = RequestConfig.custom();
requestConfig.setConnectTimeout(5000);
requestConfig.setConnectionRequestTimeout(5000);
requestConfig.setSocketTimeout(360000);
post.setConfig(requestConfig.build());
CloseableHttpResponse response = null;
try {
response = client.execute(post);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
return "数据还原失败!" + response.getStatusLine();
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
JSONObject retObj = JSON.parseObject(result);
Boolean snapshotObj = retObj.getBoolean("accepted");
if (snapshotObj) {
return null;
}
return "数据还原失败!" + result ;
} catch (Exception e) {
return "数据还原异常!" + e.getMessage() ;
} finally {
try {
post.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 删除
*/
public static String deleteSnapshot(String snapshotName) {
CloseableHttpClient client = HttpClients.createDefault();
HttpDelete del = new HttpDelete(ES_URL+"/_snapshot/"+ES_REPO_NAME+"/"+snapshotName);
//特殊操作,设置超时
RequestConfig.Builder requestConfig = RequestConfig.custom();
requestConfig.setConnectTimeout(5000);
requestConfig.setConnectionRequestTimeout(5000);
requestConfig.setSocketTimeout(360000);
del.setConfig(requestConfig.build());
CloseableHttpResponse response = null;
try {
response = client.execute(del);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
return null;
}else if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
return "删除快照失败!" + response.getStatusLine();
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
JSONObject retObj = JSON.parseObject(result);
Boolean snapshotObj = retObj.getBoolean("acknowledged");
if (snapshotObj) {
return null;
}
return "删除快照失败!" + result ;
} catch (Exception e) {
return "删除快照异常!" + e.getMessage() ;
} finally {
try {
del.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 开启关闭索引
* 1开启 2关闭
* POST http://localhost:9200/<index_name>/_close
* POST http://localhost:9200/<index_name>/_open
*/
public static String openCloseIndices(int type,String indexName) {
CloseableHttpClient client = HttpClients.createDefault();
String url ;
if(type==1){
url = ES_URL+"/"+indexName+"/_open";
}else if(type==2){
url = ES_URL+"/"+indexName+"/_close";
}else{
return "未知操作状态";
}
HttpPost post = new HttpPost(url);
//特殊操作,设置超时
RequestConfig.Builder requestConfig = RequestConfig.custom();
requestConfig.setConnectTimeout(5000);
requestConfig.setConnectionRequestTimeout(5000);
requestConfig.setSocketTimeout(360000);
post.setConfig(requestConfig.build());
CloseableHttpResponse response = null;
try {
response = client.execute(post);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
return "索引操作失败!" + response.getStatusLine();
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
JSONObject retObj = JSON.parseObject(result);
Boolean snapshotObj = retObj.getBoolean("acknowledged");
if (snapshotObj) {
return null;
}
return "索引操作失败!" + result ;
} catch (Exception e) {
return "索引操作异常!" + e.getMessage() ;
} finally {
try {
post.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 设置查询数量
* ip:端口/索引名称/_settings
*
* PUT 索引名称/_settings
* {
* "index":{
* "max_result_window":1000000
* }
* }
*
*/
public static String settingMaxRows(String indexName,int rowsNum) {
CloseableHttpClient client = HttpClients.createDefault();
String url ;
if(StringUtils.isEmpty(indexName)){
url = ES_URL+"/_settings";
}else{
url = ES_URL+"/"+indexName+"/_settings";
}
HttpPut put = new HttpPut(url);
//特殊操作,设置超时
RequestConfig.Builder requestConfig = RequestConfig.custom();
requestConfig.setConnectTimeout(5000);
requestConfig.setConnectionRequestTimeout(5000);
requestConfig.setSocketTimeout(360000);
put.setConfig(requestConfig.build());
put.setEntity(new StringEntity("{\"index\":{\"max_result_window\":"+rowsNum+" }}", ContentType.APPLICATION_JSON));
CloseableHttpResponse response = null;
try {
response = client.execute(put);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
return "设置最大查询数失败!" + response.getStatusLine();
}
String result = EntityUtils.toString(response.getEntity(), Charset.forName("UTF-8"));
JSONObject retObj = JSON.parseObject(result);
Boolean snapshotObj = retObj.getBoolean("acknowledged");
if (snapshotObj) {
return null;
}
return "设置最大查询数失败!" + result ;
} catch (Exception e) {
return "设置最大查询数异常!" + e.getMessage() ;
} finally {
try {
put.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
if(response!=null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
5.实体类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(indexName = "plc_his")
public class PlcPointHisVo implements Serializable {
@Id
private String id;
//标识
@Field(type = FieldType.Keyword)
private String plcAddress;
//时间戳
@Field(type = FieldType.Keyword)
private Long timestamp;
//值
@Field(type = FieldType.Text, index = false)
private String val;
}
6.ElasticsearchRepository
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface PlcPointHisVoSearch extends ElasticsearchRepository<PlcPointHisVo,String> {
/**
* 自定义方法里By后面的字符串,必须存在泛型类ArticleEsDto里,而且可以用and连接;
* 比如 findByTitle、findByContent、findByTitleAndContent
* List<ArticleEsDto> findByP(String title);//会报错
* 否则报如下错:
* Caused by: org.springframework.data.mapping.PropertyReferenceException:
* No property p found for type ArticleEsDto! Did you mean 'id'?
*
*/
Page<PlcPointHisVo> getPageByTimestampBetween(long time, long time1, PageRequest of);
Page<PlcPointHisVo> getPageByPlcAddressAndTimestampBetween(String code, long time, long time1, PageRequest of);
Page<PlcPointHisVo> getPageByPlcAddressInAndTimestampBetween(String[] codes, long time, long time1, PageRequest of);
}
7.实体类操作Controller
@RestController
@RequestMapping("/es/plcPoint")
public class EsPlcPointController extends BaseController {
@Autowired
private PlcPointHisVoSearch plcPointHisVoSearch;
@GetMapping("history")
public TableDataInfo selectHistoryData(String code, Integer pageNum, Integer pageSize, Long startTime,Long endTime) {
if(pageNum==null||pageNum<1){
pageNum = 0;
}else {
pageNum = pageNum-1;
}
if(pageSize==null){
pageSize = 10;
}
Sort sort = new Sort(Sort.Direction.ASC, "timestamp");
Page<PlcPointHisVo> page;
if(StringUtils.isEmpty(code)){
page = plcPointHisVoSearch.getPageByTimestampBetween(startTime,endTime, PageRequest.of(pageNum,pageSize,sort));
}else {
page = plcPointHisVoSearch.getPageByPlcAddressAndTimestampBetween(code,startTime,endTime,PageRequest.of(pageNum,pageSize,sort));
}
TableDataInfo result = getDataTable(page.getContent());
result.setTotal(page.getTotalElements());
return result;
}
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)