elasticSearch之java客户端详细使用:文档通过ID操作基本API
对目标文档(由 routing 或文档 ID 定位到的主分片),随机选择该主分片或其所有副本分片之一执行查询,且会在多个请求间做「轮询负载均衡」,避免单个分片 / 节点过载。只有当集群中至少有这么多分片(主分片+副本分片)处于活跃状态时,才会执行写操作;,强制文档存储到 / 查询时定位到「唯一的主分片」—— 简单说,就是给文档分配一个 “分片定位标识”,让 ES 不用默认逻辑,而是按你指定的规则找
文章目录
- 一、文档与入门
- 二、 高级客户端的创建与配置API
- 三、文档基本API
- 四、文档API常用核心参数详解
一、文档与入门
1、官网
本文基于7.10.0版本使用与实践。(阿里云es稳定版本就是7.10.0)
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.10.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.0</version>
</dependency>
二、 高级客户端的创建与配置API
1、创建客户端
// 一个 RestHighLevelClient 实例需要一个 REST 低级客户端构建器,构建方式如下:
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
// 多个节点可以写多个地址
new HttpHost("localhost", 9201, "http")));
// 高级客户端将内部创建用于执行请求的低级客户端,该低级客户端维护一个连接池并启动一些线程
// 使用完毕之后需要关掉
client.close();
2、参数设置
(1)设置超时
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200))
.setRequestConfigCallback(
new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
// 连接超时 默认1秒
.setConnectTimeout(5000)
// 套接字超时 默认30秒
.setSocketTimeout(60000);
}
});
// 超时也可以通过 RequestOptions 按请求设置,这将覆盖 RestClient 的 customizeRequestConfig。
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(5000)
.setSocketTimeout(60000)
.build();
RequestOptions options = RequestOptions.DEFAULT.toBuilder()
.setRequestConfig(requestConfig)
.build();
(2)设置线程数
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig.custom()
// // 设置线程数 默认跟本地核心数有关 Runtime.getRuntime().availableProcessors()
.setIoThreadCount(1)
.build());
}
});
(3)设置认证
// 用户名密码
final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("user", "password"));
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
});
// 基于header的Authorization 头 作为认证方式
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
Header[] defaultHeaders =
new Header[]{new BasicHeader("Authorization",
"Bearer u6iuAxZ0RG1Kcm5jVFI4eU4tZU9aVFEwT2F3")};
builder.setDefaultHeaders(defaultHeaders);
// Elasticsearch API 密钥 认证方式
String apiKeyId = "uqlEyn8B_gQ_jlvwDIvM";
String apiKeySecret = "HxHWk2m4RN-V_qg9cDpuX";
String apiKeyAuth =
Base64.getEncoder().encodeToString(
(apiKeyId + ":" + apiKeySecret)
.getBytes(StandardCharsets.UTF_8));
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
Header[] defaultHeaders =
new Header[]{new BasicHeader("Authorization",
"ApiKey " + apiKeyAuth)};
builder.setDefaultHeaders(defaultHeaders);
三、文档基本API
1、IndexRequest 创建/根据ID更新文档
(1)文档源
// 索引
IndexRequest request = new IndexRequest("posts");
// 请求的文档 ID
request.id("1");
// 以json 字符串形式提供的文档源
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
// 以 Map 形式提供的文档源,可自动转换为 JSON 格式
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts")
.id("1").source(jsonMap);
// XContentBuilder 对象提供文档源,Elasticsearch 内置帮助器生成 JSON 内容
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts")
.id("1").source(builder);
// 以 Object 键对形式提供的文档源,转换为 JSON 格式
IndexRequest indexRequest = new IndexRequest("posts")
.id("1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
(2)可选参数
// 1、路由值 根据指定的值进行hash 来确定路由到哪个分片
// 通常可以为memberId、订单ID、等等
request.routing("routing");
// 2、等待主分片作为 TimeValue 可用 为当前请求设置超时时间
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
// 3、刷新策略
// NONE("false"), 默认(默认 1 秒) 才会刷新,数据才会被搜索到,性能较好
// IMMEDIATE("true"), 立即刷新,性能较差
// WAIT_UNTIL("wait_for"); 不立即触发刷新,但会等待下一次自然刷新完成后再返回响应(默认最多等 1 秒)。
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
// 4、版本号 更新会根据版本号来更新
request.version(2);
// 5、版本类型
// INDEX(0), 默认,若id不存在则创建,存在则更新(覆盖内容,版本号递增)。
// CREATE(1),强制要求仅在文档不存在时执行创建操作,若文档已存在则直接失败。
// UPDATE(2),DELETE(3);
request.opType(DocWriteRequest.OpType.CREATE);
request.opType("create");
// 6、设置pipeline 当通过 setPipeline 为请求指定某个 pipeline(例如名为 my_pipeline)时,Elasticsearch 会按照以下流程处理文档:
// (1)客户端发送文档到 Elasticsearch,请求中指定了 pipeline: my_pipeline;
//(2)Elasticsearch 先将文档传入 my_pipeline,依次执行管道中定义的所有处理器(按顺序执行);
//(3)文档经过预处理后,得到 “加工后的文档”;
//(4)最终将加工后的文档索引到目标分片。
request.setPipeline("pipeline");
(3)执行
// 1、同步执行
// 同步调用可能会在高级 REST 客户端中无法解析 REST 响应、请求超时或类似情况下引发 IOException, 即没有从服务器返回响应。
// 在服务器返回 4xx 或 5xx 错误代码的情况下,高级客户端会尝试解析响应主体错误详细信息,然后抛出一个通用的 ElasticsearchException,并将原始 ResponseException 作为抑制异常添加到其中。
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
// 2、异步执行,需要添加监听
client.indexAsync(request, RequestOptions.DEFAULT, listener);
// 监听示例
listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// 当执行成功完成时调用。
}
@Override
public void onFailure(Exception e) {
// 当整个 IndexRequest 失败时调用。
}
};
(4)返回值
String index = indexResponse.getIndex();
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 处理(如果需要)首次创建文档的情况
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 处理(如果需要)重写文档的情况,因为它已经存在
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 处理成功分片数小于分片总数的情况
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason(); // 处理潜在故障
}
}
// 异常情况
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 如果存在版本冲突,则会抛出 ElasticsearchException
}
}
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 如果 opType 设置为 create,并且已经存在具有相同索引和 id 的文档,则会抛出 ElasticsearchException
}
}
(5)实用:根据返回值判断是否成功
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.ElasticsearchException;
import java.io.IOException;
public class EsIndexExample {
public void indexDocument(RestHighLevelClient client) {
// 1. 构建IndexRequest(指定索引名、文档ID)
IndexRequest request = new IndexRequest("my_index");
request.id("1"); // 文档ID(若不指定,ES会自动生成)
String jsonDocument = "{\n" +
" \"name\": \"张三\",\n" +
" \"age\": 30\n" +
"}";
request.source(jsonDocument, XContentType.JSON);
try {
// 2. 执行索引操作,获取响应
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
// 3. 判断操作是否成功
RestStatus status = response.status();
if (status == RestStatus.CREATED || status == RestStatus.OK) {
System.out.println("操作成功!");
// 4. 判断是新增还是修改
if (response.isCreated()) {
System.out.println("操作类型:新增文档");
} else {
System.out.println("操作类型:修改文档");
}
// 可选:获取文档版本号(新增为1,修改后递增)
long version = response.getVersion();
System.out.println("文档版本号:" + version);
} else {
System.out.println("操作失败,状态码:" + status.getStatus());
}
} catch (ElasticsearchException e) {
// 捕获ES服务返回的错误(如索引不存在、字段验证失败等)
System.err.println("操作失败(ES错误):" + e.getMessage());
} catch (IOException e) {
// 捕获IO异常(如网络错误)
System.err.println("操作失败(IO错误):" + e.getMessage());
}
}
}
2、GetRequest 根据ID查询文档所有信息
(1)使用
GetRequest getRequest = new GetRequest(
"posts", // 索引名
"1"); // 文档id
(2)可选参数
// 文档的原始 JSON 数据会被存储在 _source 字段中(默认开启) 例如:_source:{ "name": "张三", "age": 30 }
// 1、禁止在请求的响应中返回文档的源数据(_source 字段),从而减少网络传输的数据量,提升请求效率。
// 对于查询请求(如 SearchRequest):响应中不会包含 hits.hits._source 字段,仅返回文档的元数据(如 _id、_index、_score 等)。
// 对于获取单个文档请求(如 GetRequest):响应中不会包含 _source 字段,仅返回文档是否存在、版本号等元信息。
// 对于更新请求(如 UpdateRequest):响应中不会返回更新后的 _source 数据(默认会返回)。
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
// 默认是FETCH_SOURCE , 默认是会返回(_source 字段)的
// 2、为某个字段配置 _source 字段 的排除
String[] includes = Strings.EMPTY_ARRAY; // 包含的字段,定义一个空的字符串数组 includes,表示 “不主动指定需要包含的字段”(即默认包含所有字段,除非被排除)。
String[] excludes = new String[]{"message"}; // 要排除 message 字段。 不返回
// 第一个参数 true:表示启用 _source 字段的返回(即响应中会包含 _source);
// 第二个参数 includes:指定需要强制包含的字段(空数组表示不额外限制,默认包含所有字段);
// 第三个参数 excludes:指定需要强制排除的字段(这里排除 message 字段)。
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
// 3、检索单独存储的值
// Elasticsearch 中,字段默认不会单独「存储」(store: false),而是嵌入到 _source 字段中
request.storedFields("message"); // 为特定的存储字段配置数据检索功能(这些字段需要被单独存储在相应的映射结构中)
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
String message = getResponse.getField("message").getValue(); // 检索 message 中存储的字段值(前提是该字段必须被单独存储在映射数据中)
// 4、路由查询 通过路由key来手动确认分片,通常可以为memberId、订单ID、等等
request.routing("routing");
// 5、指定 分片选择偏好参数 (详见下面解释)
request.preference("preference");
// 6、是否启用实时性查询
// 默认true 通常默认即可
request.realtime(false);
// 7、在检索文档之前,先执行刷新操作 默认false不会刷新,通常默认即可
request.refresh(true);
// 8.版本号查询
request.version(2);
// 9.版本类型 详细介绍参考下文
request.versionType(VersionType.EXTERNAL);
(3)执行
// 同步执行
// 在同步调用过程中,如果高级 REST 客户端无法解析 REST 响应、请求超时,或者出现其他导致服务器无法返回响应的情况,那么这些调用很可能会抛出错误 IOException。
// 在服务器返回 4xx 或 5xx 错误代码的情况下,高级客户端会尝试解析响应正文中的错误详细信息,然后抛出一个通用的 ElasticsearchException 错误,并将原始的 ResponseException 错误代码作为被隐藏的异常信息一并记录下来。
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
// 异步执行
client.getAsync(request, RequestOptions.DEFAULT, listener);
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
// 成功
}
@Override
public void onFailure(Exception e) {
// 失败
}
};
(4)响应
String index = getResponse.getIndex();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString(); // 将文档以 String 格式显示
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // 将文档以 Map<String, Object> 格式显示
byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 文档以 byte[] 显示
} else {
}
GetRequest request = new GetRequest("does_not_exist", "1");
try {
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
// 查找一个不存在的索引,会报404
}
}
try {
GetRequest request = new GetRequest("posts", "1").version(2);
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
// 如果版本号不一致,会报版本冲突
}
}
(5)拓展:查询文档是否存在
由于exists()仅返回true或false,我们建议关闭对_source以及任何存储字段的获取功能,这样请求的负担就会减轻一些。
GetRequest getRequest = new GetRequest(
"posts", // 索引
"1"); // 文档ID
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用对 _source 的获取功能。
getRequest.storedFields("_none_"); // 禁用对存储字段的获取功能。
// 同步执行
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
// 异步执行
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean exists) {
// 查询成功 true为有该文档,false表示没有该文档
}
@Override
public void onFailure(Exception e) {
}
};
client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
3、GetSourceRequest 根据ID获取源数据
GetRequest 用于获取完整的文档信息(包括元数据和源数据),而 GetSourceRequest 仅用于获取文档的源数据(_source 字段),不包含元数据。
具体请求参数详细含义,参考GetRequest。
// 1、创建request
GetSourceRequest getSourceRequest = new GetSourceRequest(
"posts", // 索引名
"1"); // 文档ID
// 2、可以指定返回的字段和排除的字段
// FetchSourceContext 的第一个参数 fetchSource 必须是 true,否则会抛出 ElasticsearchException 错误
String[] includes = Strings.EMPTY_ARRAY;
// 上下文参数 excludes 和 includes 是可选的(详见 API 文档中的示例)
String[] excludes = new String[]{"postDate"};
getSourceRequest.fetchSourceContext(
new FetchSourceContext(true, includes, excludes));
// 3、路由
getSourceRequest.routing("routing");
// 4、指定 分片选择偏好参数 (详见下面解释)
getSourceRequest.preference("preference");
// 5、是否实时查询 默认true
getSourceRequest.realtime(false);
// 6、查询之前是否刷新索引 默认不刷新
getSourceRequest.refresh(true);
// 7、同步执行
GetSourceResponse response =
client.getSource(getSourceRequest, RequestOptions.DEFAULT);
// 8、异步执行
ActionListener<GetSourceResponse> listener =
new ActionListener<GetSourceResponse>() {
@Override
public void onResponse(GetSourceResponse getResponse) {
// 查询成功
}
@Override
public void onFailure(Exception e) {
// 查询失败
}
};
client.getSourceAsync(request, RequestOptions.DEFAULT, listener);
// 9、获取响应
Map<String, Object> source = response.getSource();
4、DeleteRequest 根据ID删除
(1)参数与使用
// 1、创建request
DeleteRequest request = new DeleteRequest(
"posts", // 索引
"1"); // 文档ID
// 2、路由
request.routing("routing");
// 3、超时时间
request.timeout(TimeValue.timeValueMinutes(2)); // 等待主分片变为可用状态的时间限制为 TimeValue秒
request.timeout("2m"); // 等待主分片变为可用状态的时间限制为 String 格式
// 4、刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
// 5、版本号 与版本类型
request.version(2);
request.versionType(VersionType.EXTERNAL);
// 6、同步执行
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
// 7、异步执行
listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);
(2)响应
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 处理成功恢复的碎片数量少于总碎片数量的情况。
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason(); // 应对可能出现的各种故障情况。
}
}
DeleteRequest request = new DeleteRequest("posts", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
// 如果要删除的文档找不到,那就采取相应的措施吧
}
try {
DeleteResponse deleteResponse = client.delete(
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
// 出现的异常表明系统返回了一个版本冲突错误
}
}
(3)实用:根据返回值判断是否成功
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.ElasticsearchException;
import java.io.IOException;
public class EsDeleteExample {
public void deleteDocument(RestHighLevelClient client) {
// 1. 构建DeleteRequest(指定索引名、文档ID)
DeleteRequest request = new DeleteRequest("my_index", "1"); // 索引名、文档ID
try {
// 2. 执行删除操作,获取响应
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
// 3. 判断操作是否执行成功(服务器正常处理请求)
RestStatus status = response.status();
if (status == RestStatus.OK) {
System.out.println("删除操作执行成功!");
// 4. 判断是否实际删除了文档(文档是否存在过)
if (response.isFound()) {
System.out.println("文档已存在,已成功删除");
} else {
System.out.println("文档不存在,未执行删除");
}
// 可选:获取删除后的版本号(即使文档不存在,版本号也可能递增)
long version = response.getVersion();
System.out.println("当前版本号:" + version);
} else {
System.out.println("删除操作失败,状态码:" + status.getStatus());
}
} catch (ElasticsearchException e) {
// 捕获ES服务端错误(如索引不存在、权限不足等)
System.err.println("删除失败(ES错误):" + e.getMessage());
} catch (IOException e) {
// 捕获客户端IO错误(如网络中断)
System.err.println("删除失败(IO错误):" + e.getMessage());
}
}
}
5、UpdateRequest 根据ID更新文档
(1)参数与使用
// 1、创建request
UpdateRequest request = new UpdateRequest(
"posts", // 索引名
"1"); // 文档ID
// 2、可以通过 内联脚本更新
Map<String, Object> parameters = singletonMap("count", 4); // 脚本参数以对象的形式传递,每个对象用 Map 标识
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters); // 使用 painless 语言以及之前的参数来创建一个内联脚本
request.script(inline); // 将脚本设置为更新请求模式
// 3、可以通过 存储脚本的形式 更新
Script stored = new Script(
ScriptType.STORED, null, "increment-field", parameters); // 引用一个以 increment-field 为名称、使用 painless 语言编写的脚本
request.script(stored); // 将脚本设置在更新请求中
// 4、部分更新,基于JSON字符串的格式
UpdateRequest request = new UpdateRequest("posts", "1");
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
// 4、部分更新,基于Map的格式,会自动转为json
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "1")
.doc(jsonMap);
// 4、部分更新,基于XContentBuilder对象的格式
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "1")
.doc(builder);
// 4、部分更新,基于对象键值对的格式,会自动转为json
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("updated", new Date(),
"reason", "daily update");
// 5、upsert操作 如果文档不存在,则新增
// 与部分文档更新类似,upsert文档的内容也可以通过使用能够接受String、Map、XContentBuilder或Object键值对的方法来定义。
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
// 6、设置路由
request.routing("routing");
// 7、超时时间
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
// 8、刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
// 9、如果在更新操作的获取数据阶段和索引生成阶段之间,待更新的文档被其他操作修改了,那么更新操作需要重试多少次
request.retryOnConflict(3);
// 10、更新操作后是否返回更新后的文档源数据(即 _source 字段),该功能默认是关闭的。
request.fetchSource(true);
// 11、返回 包含 指定字段
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
new FetchSourceContext(true, includes, excludes));
// 返回排除指定字段
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(
new FetchSourceContext(true, includes, excludes));
// 12、 基于文档的 _seq_no 和 _primary_term 实现乐观并发控制
request.setIfSeqNo(2L);
request.setIfPrimaryTerm(1L);
// 13、禁用误操作检测,默认true。 通常默认即可
// true:如果更新后的文档没变化,则不做更改
// false:如果更新后的文档没变化,则做更新操作,version+1、_seq_no 、_primary_term 都会变化
request.detectNoop(false);
// 14、无论该文档是否存在,该脚本都必须被执行。也就是说,如果文档还不存在,脚本会负责创建该文档。(默认false)
// 即 使用脚本的upsert操作,但是需要注意脚本的某些语句默认值的设置
request.scriptedUpsert(true);
// 15、 默认false,使用doc时,文本做upsert操作
// 将更新操作中 doc() 定义的内容同时作为 “插入内容”,实现 “当文档存在时用 doc 更新,不存在时用 doc 插入” 的简化逻辑,无需单独定义插入内容
request.docAsUpsert(true);
// 16、高可用 通常默认即可,用于控制写操作(如索引、更新、删除)执行前,需要等待多少个活跃的分片(主分片或副本分片)可用,以此平衡数据安全性(避免数据丢失)和写操作的响应速度。
// 指定了在继续执行更新操作之前,必须保持活跃状态的副本数量。
request.waitForActiveShards(2);
// 提供的分片副本数量:可以是 2 个、3 个或 4 个(默认值为 4 个)
request.waitForActiveShards(ActiveShardCount.ALL);
(2)执行与响应
// 1、同步更新
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
// 2、异步更新
listener = new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
// 成功
}
@Override
public void onFailure(Exception e) {
// 失败
}
};
client.updateAsync(request, RequestOptions.DEFAULT, listener);
// 3、响应结果
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 处理文档首次被创建的情况。
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 处理文档已被更新的情况。
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
// 处理文档已被删除的情况。
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
// 处理这样一种情况:该文档并未受到更新的影响,也就是说,对该文档并未执行任何操作(即进行了“无操作”处理)。
}
// 4、当通过 `fetchSource` 方法在 UpdateRequest 中启用源文档的获取功能时,响应中会包含更新后的文档的源代码。
GetResult result = updateResponse.getGetResult(); // 将结果以GetResult 类型获取
if (result.isExists()) {
String sourceAsString = result.sourceAsString(); // 获取字符串数据
Map<String, Object> sourceAsMap = result.sourceAsMap(); // 获取map格式数据
byte[] sourceAsBytes = result.source(); // 获取byte[]格式数据
} else {
// 处理这样一种情况:响应中不包含文档的来源信息(默认情况下就是这种情况)
}
// 5、也可以检查碎片是否出现故障
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 处理成功恢复的碎片数量少于总碎片数量的情况。
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason(); // 应对可能出现的各种故障情况。
}
}
// 6、当对一个不存在的文档执行 UpdateRequest 操作时,系统会返回 404 状态码,并抛出 ElasticsearchException 错误。针对这种情况,需要按照以下方式进行处理
UpdateRequest request = new UpdateRequest("posts", "does_not_exist")
.doc("field", "value");
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
// 处理因文档不存在而引发的异常
}
}
// 7、如果出现版本冲突,系统会抛出 ElasticsearchException 错误
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("field", "value")
.setIfSeqNo(101L)
.setIfPrimaryTerm(200L);
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 出现的异常表明系统返回了一个版本冲突错误
}
}
(3)实用:根据返回值判断是否成功
UpdateRequest request = new UpdateRequest("index_name", "doc_id");
// 假设设置了更新内容或upsert逻辑
request.doc(jsonBuilder().startObject().field("name", "new_name").endObject());
try {
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
// 1. 无异常,先确认操作的文档和索引正确
String index = response.getIndex();
String id = response.getId();
if (!"index_name".equals(index) || !"doc_id".equals(id)) {
// 异常情况:操作对象错误(通常是代码逻辑问题)
System.out.println("操作对象错误");
return false;
}
// 2. 查看具体结果
Result result = response.getResult();
long version = response.getVersion();
if (result == Result.UPDATED) {
System.out.println("文档已更新,新版本:" + version);
return true; // 业务上视为成功
} else if (result == Result.CREATED) {
System.out.println("文档不存在,已新增,版本:" + version);
return true; // 业务上视为成功
} else if (result == Result.NOOP) {
System.out.println("文档存在,但更新内容无变化,版本未变:" + version);
// 此处根据业务需求判断:若允许“无变更”,则视为成功;否则视为失败
return true; // 示例:视为成功
} else {
// 其他罕见结果(如DELETED,通常不会在UpdateRequest中出现)
System.out.println("未知结果:" + result);
return false;
}
} catch (VersionConflictEngineException e) {
// 版本冲突:文档已被其他操作修改
System.out.println("版本冲突,更新失败:" + e.getMessage());
return false;
} catch (IndexNotFoundException e) {
// 索引不存在
System.out.println("索引不存在,更新失败:" + e.getMessage());
return false;
} catch (Exception e) {
// 其他异常(如连接超时、分片不可用等)
System.out.println("更新失败:" + e.getMessage());
return false;
}
6、TermVectorsRequest 向量API
暂略
7、BulkRequest 批量增删改
BulkRequest 可以通过一个请求来执行多个索引创建、更新和/或删除操作。
(1)参数与使用
// 1、至少需要向批量请求中添加一个操作 可以添加IndexRequest、UpdateRequest、DeleteRequest
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")
.source(XContentType.JSON,"field", "baz"));
// 或者:一个批量请求,其`全局索引`用于所有子请求,除非在子请求中覆盖。此参数为@Nullable,并且只能在 BulkRequest 创建时设置。
BulkRequest defaulted = new BulkRequest("posts");
// 2、超时时间
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
// 3、刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
// 4、设置在进行索引/更新/删除操作之前必须处于活动状态的分片副本的数量。
request.waitForActiveShards(2);
// 作为 ActiveShardCount 提供的分片副本数量:可以是 ActiveShardCount.ALL 、 ActiveShardCount.ONE 或 ActiveShardCount.DEFAULT (默认)
request.waitForActiveShards(ActiveShardCount.ALL);
// 5、全局 pipelineId 用于所有子请求,除非在子请求中覆盖
request.pipeline("pipelineId");
// 6、全局路由 ID 用于所有子请求,除非在子请求中覆盖
request.routing("routingId");
(2)执行与响应
// 1、同步执行
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
// 2、异步执行
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
// 成功
}
@Override
public void onFailure(Exception e) {
// 失败
}
};
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
// 3、处理响应
for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍历所有操作的结果
// 获取操作的响应结果(是否成功),响应代码可能是 IndexResponse、UpdateResponse 或 DeleteResponse,这些代码实际上都可以被视为表示操作失败的 DocWriteResponse 状态
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
// 处理各种响应
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
// 4、批量响应提供了一种快速判断一个或多个操作是否失败的方法
if (bulkResponse.hasFailures()) {
// 如果至少有一个操作失败,此方法会返回 true
}
// 5、遍历所有的操作结果,以判断这些操作是否失败;如果确实失败了,就需要获取相应的失败原因
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure(); // 失败详细信息
}
}
(3)批量处理器
BulkProcessor 通过提供一个实用工具类,简化了批量 API 的使用方式。该工具类使得索引、更新和删除操作能够在被提交给处理程序时被透明地执行。
为了执行这些请求,BulkProcessor 需要以下组件:
RestHighLevelClient:这个客户端用于执行相关操作。BulkRequest并取回那些东西。BulkResponse
BulkProcessor.Listener:这个监听器会在每次操作之前和之后被调用。BulkRequest执行之时BulkRequest失败了
BulkProcessor.Listener listener = new BulkProcessor.Listener() { // 创建 BulkProcessor.Listener
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// 此方法会在每次执行 BulkRequest之前被调用。
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
// 此方法会在每次执行 BulkRequest之后被调用。
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
// 当 BulkRequest 失败时,就会调用这种方法
}
};
// 通过从 BulkProcessor 中调用 build() 方法来创建 BulkProcessor.Builder。实际上,是 RestHighLevelClient.bulkAsync() 方法在后台负责执行 BulkRequest 的操作。
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build();
// BulkProcessor.Builder 提供了用于配置 BulkProcessor 如何处理请求执行的方法:
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
builder.setBulkActions(500); // 根据当前添加的操作数量来决定何时刷新新的批量请求(默认值为 1000;使用 -1 可禁用此功能)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // 根据当前添加的操作数量来决定何时刷新新的批量请求(默认值为 5MB,使用 -1 可禁用此功能)
builder.setConcurrentRequests(0); //设置允许同时执行的请求数量(默认值为 1;若设置为 0,则仅允许执行一个请求)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //如果达到设定的刷新间隔时间,系统会自动刷新所有尚未处理的BulkRequest任务(默认情况下不设置刷新间隔)
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //可以设置一个恒定的重试策略:该策略最初会等待 1 秒,然后最多重试 3 次。有关更多选项,请参阅 BackoffPolicy.noBackoff()、BackoffPolicy.constantBackoff() 和 BackoffPolicy.exponentialBackoff()
// 一旦创建了编号为BulkProcessor的请求列表,就可以向其中添加新的请求了
IndexRequest one = new IndexRequest("posts").id("1")
.source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
// 这些请求将由 BulkProcessor 来处理,它会为每个批量请求调用 BulkProcessor.Listener。
// 该监听器提供了用于访问 BulkRequest 和 BulkResponse 的方法。
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// 在每次执行 BulkRequest 之前,该方法会被调用,用于获取在 BulkRequest中将要执行的操作总数。
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) { // 每次执行 BulkRequest 后,该方法都会被调用,用于判断 BulkResponse 中是否包含错误
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
// 如果 BulkRequest 失败,就会调用此方法,通过它可以了解具体的失败原因
logger.error("Failed to execute bulk", failure);
}
};
一旦所有请求都被添加到 BulkProcessor 中,就需要使用两种可用的关闭方法之一来关闭该实例。
可以使用这种方法来等待,直到所有请求都被处理完毕,或者指定的等待时间到期为止
// 如果所有批量请求都已完成,该方法将返回 true;如果所有批量请求在等待时间结束后才完成,该方法将返回 false
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
// 立即关闭
bulkProcessor.close();
// 这两种方法都会在关闭处理器之前清除所有已添加到处理器中的请求,并且还会阻止任何新的请求被添加到处理器中
(4)实用:根据返回值判断是否成功
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
// 执行Bulk请求并获取响应
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
// 1. 整体判断是否有失败
if (bulkResponse.hasFailures()) {
System.out.println("批量操作存在失败的子操作!");
// 2. 遍历每个子操作,打印失败详情
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
BulkItemResponse.Failure failure = item.getFailure();
System.out.println(
"子操作失败:" +
" 索引=" + failure.getIndex() +
" 文档ID=" + failure.getId() +
" 原因=" + failure.getMessage()
);
}
}
} else {
System.out.println("所有子操作均成功执行!");
}
8、MultiGetRequest 批量根据id查询
(1)参数与使用
multiGet 支持与 get API 相同的可选参数。您可以在 Item 中设置这些参数中的大部分。
// 1、一次请求可以添加多个请求项
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item(
"index", // 索引
"example_id")); // 文档ID
request.add(new MultiGetRequest.Item("index", "another_id"));
// 2、禁用source源获取(默认开启)
request.add(new MultiGetRequest.Item("index", "example_id")
.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
// 3、返回字段的过滤
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id")
.fetchSourceContext(fetchSourceContext));
// 返回字段排除
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[] {"foo", "*r"};
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id")
.fetchSourceContext(fetchSourceContext));
// 4、为特定的存储字段配置数据检索功能(这些字段需要被单独存储在相应的映射结构中)
request.add(new MultiGetRequest.Item("index", "example_id")
.storedFields("foo"));
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse item = response.getResponses()[0];
String value = item.getResponse().getField("foo").getValue(); // 检索 foo 中存储的字段值(前提是该字段必须被单独存储在映射数据中)
// 5、路由和版本
request.add(new MultiGetRequest.Item("index", "with_routing")
.routing("some_routing"));
request.add(new MultiGetRequest.Item("index", "with_version")
.versionType(VersionType.EXTERNAL)
.version(10123L));
// 6、刷新、实时查询等参数
request.preference("some_preference");
request.realtime(false);
request.refresh(true);
(2)执行与获取返回
// 1、同步执行
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
// 2、异步执行
listener = new ActionListener<MultiGetResponse>() {
@Override
public void onResponse(MultiGetResponse response) {
}
@Override
public void onFailure(Exception e) {
}
};
client.mgetAsync(request, RequestOptions.DEFAULT, listener);
// 1、返回值处理
MultiGetItemResponse firstItem = response.getResponses()[0];
assertNull(firstItem.getFailure()); // getFailure 返回空值,因为并没有发生任何错误
GetResponse firstGet = firstItem.getResponse(); // getResponse 会返回 GetResponse
String index = firstItem.getIndex();
String id = firstItem.getId();
if (firstGet.isExists()) {
long version = firstGet.getVersion();
String sourceAsString = firstGet.getSourceAsString(); // json字符串
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap(); // map
byte[] sourceAsBytes = firstGet.getSourceAsBytes(); // byte数组
} else {
// 需要处理文档未找到的情况。请注意,尽管返回的响应代码为 404,但实际上返回的是一个有效的 GetResponse 值,而不是抛出异常。这种响应不包含任何源文档信息,其 isExists 方法会返回 false
}
// 2、当对一个不存在的索引执行某个子请求时,将会抛出异常:getFailure
assertNull(missingIndexItem.getResponse()); // getResponse 的值为 null
Exception e = missingIndexItem.getFailure().getFailure(); // getFailure 并非如此,其中实际上包含了一个 Exception
ElasticsearchException ee = (ElasticsearchException) e; // 其实,那个Exception实际上是一个ElasticsearchException
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.NOT_FOUND, ee.status()); // 其状态码为 NOT_FOUND。如果不是因为使用了“多请求”机制,那么这个请求的结果应该会是 HTTP 404 错误
assertThat(e.getMessage(), // getMessage 阐明了实际的原因,即 no such index
containsString("reason=no such index [missing_index]"));
// 3、发生版本冲突时
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "example_id")
.version(1000L));
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse item = response.getResponses()[0];
assertNull(item.getResponse()); // getResponse 的值为 null
Exception e = item.getFailure().getFailure(); // getFailure 并非如此,其中实际上包含了一个 Exception
ElasticsearchException ee = (ElasticsearchException) e; // 其实,那个Exception实际上是一个ElasticsearchException
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.CONFLICT, ee.status()); // 其状态码为 CONFLICT。如果不是因为使用了多次请求(multi-get)的方式,那么这个请求的结果应该会是 HTTP 409 状态码
assertThat(e.getMessage(), // getMessage 阐明了真正的原因
containsString("version conflict, current version [1] is "
+ "different than the one provided [1000]"));
9、ReindexRequest 索引数据复制
该操作需要一个现有的源索引和一个目标索引,这两个索引在请求执行之前可能已经存在,也可能还不存在。Reindex命令并不会尝试创建目标索引,也不会复制源索引的配置信息。在执行_reindex操作之前,您应该先设置好目标索引的相关参数,包括映射关系、分片数量、副本设置等。
暂略。
四、文档API常用核心参数详解
1、查询/写入API- routing
routing的核心作用是:指定「路由键」(routing key),强制文档存储到 / 查询时定位到「唯一的主分片」—— 简单说,就是给文档分配一个 “分片定位标识”,让 ES 不用默认逻辑,而是按你指定的规则找主分片。
查询时的 routing 必须与「写入文档时的 routing 完全一致」:
比如写入时用 routing("user_123") 存储文档,查询时若用 routing("user_456"),会定位到错误的主分片,导致查不到文档(即使文档 ID 正确)。
若写入时未指定 routing:ES 会默认用「文档 ID」作为 routing 值,查询时也无需指定(或指定文档 ID 作为 routing)。
2、查询API - preference
preference 的核心作用是:指定查询 / 获取(不能用于写入请求)请求的「分片选择偏好规则」—— 告诉 Elasticsearch 集群,优先选择哪些节点 / 分片来执行当前请求,从而控制请求的路由逻辑。
preference 只能在「routing 定位的主分片 + 其所有副本分片」中选择,无法跨主分片生效:
比如 routing 定位到主分片 3,preference("_local") 只会在「主分片 3」和「它的副本分片」中找本地节点的分片,不会去其他主分片(如主分片 4)的节点。
当不设置 preference 时(
通常不需要手动指定),ES 的默认规则是:
对目标文档(由 routing 或文档 ID 定位到的主分片),随机选择该主分片或其所有副本分片之一执行查询,且会在多个请求间做「轮询负载均衡」,避免单个分片 / 节点过载。
| 规则值 | 作用说明 | 场景 |
|---|---|---|
| _primary | 只选择「主分片」执行请求。 | 场景:需要获取绝对最新的数据(副本同步可能有微小延迟)。 |
| _replica | 只选择「副本分片」执行请求。 | 场景:分担主分片压力,适合非实时性要求的查询。 |
| _local | 优先选择「当前客户端连接的节点」上的分片(主 / 副均可)。 | 场景:减少跨节点网络传输,提升查询速度。 |
| 自定义字符串 | (如 preference(“user_123”)、preference(“session_456”))ES 会根据该字符串哈希,固定选择某一个分片(主 / 副均可)。 | 场景:保证同一用户 / 会话的查询始终路由到同一分片,利用分片级缓存提升性能;或避免重复查询不同分片导致的结果不一致(如聚合查询)。 |
| _only_nodes:xxx | 只选择指定节点(通过节点 ID / 名称匹配)的分片。 | 场景:运维时指定特定节点执行查询(如测试、故障排查)。 |
与routing的区别:
查询时可以同时指定 routing 和 preference,两者完全不冲突,反而能协同优化查询逻辑 —— 本质是「分工不同、互补协作」:routing:负责「精准定位文档所在的主分片」(缩小查询范围);preference:负责「在已定位的主分片及其副本中,选择哪个分片 / 节点执行查询」(优化执行节点)。
3、GET API独有 - realtime:实时性查询
在 Elasticsearch 的 Get API 中,request.realtime(false) 用于禁用实时性查询,控制 Get 请求是否忽略未刷新(未写入索引分段)的数据。
Elasticsearch 中,文档的索引操作会经历以下步骤:
1.先写入事务日志(translog)(确保数据不丢失);
2.再写入内存索引缓冲区;
3.定期(默认 1 秒)通过 refresh 操作将内存缓冲区的数据写入磁盘上的索引分段(segment),此时数据才会变为“可搜索”状态。
默认情况下,Get API 是实时的(realtime(true)):
它会同时检查事务日志(translog) 和已刷新的索引分段,因此即使文档刚被索引、还未经过 refresh 操作(未写入 segment),也能通过 Get API 查到最新数据(从 translog 中读取)。
当设置 realtime(false) 时:
Get API 会仅查询已通过 refresh 操作写入索引分段(segment)的数据,不再检查事务日志(translog)。这意味着:
如果文档刚被索引但尚未触发 refresh(比如索引后不足 1 秒),此时用 realtime(false) 查询会返回不存在(not found);
只有等待 refresh 完成(或手动触发 _refresh),才能查到该文档。
4、写入刷新(setRefreshPolicy)与读时刷新(refresh)
写入刷新(setRefreshPolicy):
针对 写操作(如 IndexRequest、UpdateRequest、DeleteRequest 等),通过写操作主动触发刷新,让“新写入的数据”快速对后续查询可见。
在写操作执行过程中或完成后,控制是否触发索引的 refresh 操作,确保写入的数据尽快进入“可搜索状态”(即刷新到索引分段 segment 中)。
写入刷新:setRefreshPolicy 的可选值:RefreshPolicy.NONE(默认值):
写操作完成后不触发任何刷新,依赖 Elasticsearch 自身的自动刷新机制(默认每 1 秒一次)。
这是性能最优的选择,因为避免了刷新的额外开销,但新数据需要等待 1 秒左右才能被搜索到。RefreshPolicy.IMMEDIATE:
写操作完成后立即触发一次 refresh,确保数据“写入即能被搜索到”。
实时性最高,但性能开销大:refresh 会生成新的索引分段,频繁触发会导致大量小分段,后续需要合并分段(merge),增加磁盘 I/O 和 CPU 消耗。RefreshPolicy.WAIT_UNTIL:
写操作不会立即触发 refresh,但会等待下一次自动刷新完成后再返回(最多等待 1 秒,即自动刷新间隔)。
平衡了实时性和性能:既保证数据最终会被刷新(返回时已可搜索),又避免了主动触发 refresh 的即时开销。
读时刷新(refresh 参数):
针对 读操作(如 GetRequest、SearchRequest 等),通过读操作主动触发刷新,让“本次查询”能立即看到最新数据(无论写操作是否触发过刷新)。
在读操作执行之前,强制触发一次索引的 refresh 操作,确保本次查询能看到“所有已写入但未刷新的数据”。
读时刷新:refresh 参数的行为:
读操作(如 GetRequest、SearchRequest)中,通过 request.refresh(true) 开启读时刷新,行为是固定的:
执行读操作之前,强制触发一次目标索引的 refresh 操作(无论该索引最近是否刷新过)。
刷新完成后,再执行查询,因此本次查询能看到“所有已写入内存缓冲区或事务日志的未刷新数据”。
5、version与versionType
(1)version
version(版本号)和 versionType(版本控制类型)是用于并发控制的核心参数,用于避免多线程/多进程操作时的文档更新冲突。它们通常配合使用,通过定义“版本比较规则”来决定一个操作(如索引、更新、删除)是否允许执行。
Elasticsearch 为每个文档维护一个版本号(_version 字段),用于标记文档的修改历史:
文档创建时,_version 初始化为 1;
每次对文档执行索引(index)、更新(update)、删除(delete) 操作,_version 会自动递增(如 1→2→3...);version 参数允许用户在操作时指定一个“期望的版本号”,Elasticsearch 会根据 versionType 定义的规则,判断该版本号是否符合条件,从而决定是否允许操作执行。
(2)versionType
versionType(版本控制类型) 定义了“期望的版本号(version 参数)”与文档当前 _version 的比较逻辑,决定操作是否被允许。Elasticsearch 提供多种版本类型,核心差异在于比较规则的不同。
versionType 的具体类型及比较规则
Elasticsearch 的 VersionType 是一个枚举类,包含以下常用类型(不同版本可能略有差异,以 7.x+ 为例):
| 版本类型 | 比较规则(操作允许的条件) | 适用场景 |
|---|---|---|
INTERNAL(默认) |
请求中指定的 version 必须等于 文档当前的 _version |
依赖 Elasticsearch 内部自动管理版本号,适用于仅通过 ES 操作文档的场景。 |
EXTERNAL |
请求中指定的 version(外部版本号)必须大于 文档当前的 _version |
对接外部系统(如 MySQL)的版本号(如自增 ID),确保外部版本更新时 ES 同步更新。 |
EXTERNAL_GTE |
请求中指定的 version(外部版本号)大于或等于 文档当前的 _version |
外部版本号可能重复或非严格递增的场景(比 EXTERNAL 更宽松)。 |
FORCE(不推荐) |
忽略版本检查,强制执行操作(会覆盖任何版本的文档,且 _version 会强制递增) |
极少数需强制覆盖的场景(可能导致数据丢失,谨慎使用)。 |
1.当操作中指定 version 和 versionType 时,Elasticsearch 会先获取文档当前的 _version;
2.根据 versionType 的规则,比较 version 参数与当前 _version;
3.若符合条件,操作执行,_version 按规则更新(如 INTERNAL 递增,EXTERNAL 直接设为指定版本);
4.若不符合条件,操作失败,返回 VersionConflictEngineException(版本冲突)。
6、painless脚本
(1)核心概念
type:脚本类型(INLINE 或 STORED,默认INLINE);
lang:脚本语言(固定为"painless");
source:脚本内容(inline 脚本)或id(存储脚本的 ID);
params:脚本参数(键值对 Map,用于动态传参)。
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import java.util.HashMap;
import java.util.Map;
// 1. 构建带参数的inline脚本
Map<String, Object> params = new HashMap<>();
params.put("increment", 1); // 定义参数
Script inlineScript = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.counter += params.increment", // 脚本内容
params
);
// 2. 构建引用存储脚本的Script(需先创建存储脚本)
Script storedScript = new Script(
ScriptType.STORED,
"painless",
"calculate_discount", // 存储脚本的ID
null // 无参数时可传null
);
(2)更新文档时使用 Painless 脚本
通过UpdateRequest,用脚本修改文档字段(如自增、字段计算等)。
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
// 自增计数器字段
public class UpdateWithScript {
public static void main(String[] args) throws IOException {
// 1. 定义脚本:counter字段自增(初始值为0)
Map<String, Object> params = new HashMap<>();
params.put("step", 2); // 每次增加2
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.counter = (ctx._source.counter ?: 0) + params.step", // 脚本说明:ctx._source代表文档的_source字段,?: 0处理字段不存在的情况(默认初始化为 0)。
params
);
// 2. 构建更新请求
UpdateRequest request = new UpdateRequest("products", "1"); // 索引名+文档ID
request.script(script); // 设置脚本
request.retryOnConflict(3); // 冲突时重试3次
request.timeout(TimeValue.timeValueSeconds(10)); // 超时时间
// 3. 执行更新并处理响应
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
System.out.println("更新结果:" + response.getResult()); // 输出 UPDATED/NOOP
System.out.println("版本号:" + response.getVersion()); // 文档版本号自增
}
}
(3)聚合计算中使用 Painless 脚本
在聚合(如scripted_metric、terms)中,用脚本动态生成聚合键或计算指标。
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
// 计算字段平方和的自定义聚合
public class AggregationWithScript {
public static void main(String[] args) throws IOException {
// 1. 定义脚本化聚合(四阶段:init/map/combine/reduce)
// 脚本说明:doc['value'].value获取字段值,通过四阶段完成分布式计算(适合复杂指标)
ScriptedMetricAggregationBuilder sumOfSquares = AggregationBuilders.scriptedMetric("sum_squares")
.initScript(new Script(ScriptType.INLINE, "painless", "state.sum = 0", null)) // 初始化状态
.mapScript(new Script(ScriptType.INLINE, "painless",
"state.sum += doc['value'].value * doc['value'].value", null)) // 映射阶段计算
.combineScript(new Script(ScriptType.INLINE, "painless", "return state.sum", null)) // 合并分片结果
.reduceScript(new Script(ScriptType.INLINE, "painless",
"double total = 0; for (s in states) { total += s; } return total", null)); // 全局汇总
// 2. 构建搜索请求,添加聚合
SearchRequest searchRequest = new SearchRequest("metrics");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(0); // 不返回文档,只需要聚合结果
sourceBuilder.aggregation(sumOfSquares);
searchRequest.source(sourceBuilder);
// 3. 执行搜索并解析结果
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
double total = (double) response.getAggregations().get("sum_squares").getMetricValue();
System.out.println("平方和:" + total);
}
}
(4)排序时使用 Painless 脚本
通过ScriptSortBuilder,基于脚本计算结果排序(如 “价格 × 折扣” 排序)。
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.ScriptSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
// 按 “价格 × 折扣” 升序排序
public class SortWithScript {
public static void main(String[] args) throws IOException {
// 1. 定义排序脚本:计算实际支付价格
Script sortScript = new Script(
ScriptType.INLINE,
"painless",
"doc['price'].value * doc['discount'].value",
null
);
// 2. 构建脚本排序器
ScriptSortBuilder scriptSort = new ScriptSortBuilder(sortScript, ScriptSortBuilder.ScriptSortType.NUMBER)
.order(SortOrder.ASC); // 升序
// 3. 构建搜索请求,添加排序
SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.sort(scriptSort); // 应用排序
sourceBuilder.size(10);
searchRequest.source(sourceBuilder);
// 4. 执行搜索并输出结果
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
response.getHits().forEach(hit -> {
System.out.println("文档ID:" + hit.getId() + ",排序值:" + hit.getSortValues()[0]);
});
}
}
(5)过滤文档时使用 Painless 脚本
通过ScriptQueryBuilder,用脚本返回布尔值过滤文档(如 “库存> 阈值”)。
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.ScriptQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
// 过滤出库存大于 10 的文档
public class FilterWithScript {
public static void main(String[] args) throws IOException {
// 1. 定义过滤脚本:库存>minStock时返回true
Map<String, Object> params = new HashMap<>();
params.put("minStock", 10);
Script filterScript = new Script(
ScriptType.INLINE,
"painless",
"doc['stock'].value > params.minStock",
params
);
// 2. 构建脚本查询
ScriptQueryBuilder scriptQuery = QueryBuilders.scriptQuery(filterScript);
// 3. 构建搜索请求,应用过滤
SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(scriptQuery); // 设置查询条件
sourceBuilder.size(10);
searchRequest.source(sourceBuilder);
// 4. 执行搜索并输出结果
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("符合条件的文档数:" + response.getHits().getTotalHits().value);
}
}
(6)使用存储脚本(Stored Scripts)
将脚本存储到 ES 中,后续通过 ID 引用,避免重复编写。
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
// 步骤 1:创建存储脚本
public class CreateStoredScript {
public static void main(String[] args) throws IOException {
// 1. 定义存储脚本内容(JSON格式)
String scriptContent = "{" +
"\"lang\": \"painless\"," +
"\"source\": \"ctx._source.final_price = ctx._source.price * ctx._source.discount\"" +
"}";
// 2. 构建存储脚本请求(ID为"calculate_final_price")
PutStoredScriptRequest request = new PutStoredScriptRequest();
request.id("calculate_final_price");
request.content(scriptContent, XContentType.JSON);
// 3. 执行存储脚本
AcknowledgedResponse response = client.putScript(request, RequestOptions.DEFAULT);
System.out.println("存储脚本是否成功:" + response.isAcknowledged()); // true表示成功
}
}
// 步骤 2:引用存储脚本更新文档
// 引用存储脚本(通过ID)
Script storedScript = new Script(
ScriptType.STORED,
"painless",
"calculate_final_price", // 存储脚本ID
null
);
// 构建更新请求
UpdateRequest request = new UpdateRequest("products", "1");
request.script(storedScript);
// 执行更新
client.update(request, RequestOptions.DEFAULT);
7、更新API:retryOnConflict
在 Elasticsearch 的 Java 客户端中,UpdateRequest 的 retryOnConflict(int retryCount) 方法用于处理更新操作中的版本冲突(Version Conflict),其核心作用是:当更新请求因并发冲突失败时,自动重试指定次数。
背景:ES 的版本冲突机制
Elasticsearch 为每个文档维护一个版本号(_version字段),用于控制并发更新的一致性。当两个请求同时尝试更新同一个文档时,可能出现以下情况:
第一个请求成功更新文档,版本号递增(例如从v1变为v2);
第二个请求仍基于旧版本(v1)发起更新,此时 ES 会检测到版本不匹配,返回 版本冲突错误(VersionConflictEngineException),拒绝该更新。
retryOnConflict 的作用:retryOnConflict(retryCount) 允许你设置一个重试次数(例如 retryOnConflict(3))。当更新请求因版本冲突失败时,客户端会自动重新尝试更新操作,最多重试 retryCount 次。
每次重试时,客户端会先获取文档的最新版本号,再基于最新版本执行更新,从而最大程度避免因短暂并发冲突导致的更新失败,减少需要手动处理冲突的场景。
示例场景
假设在高并发场景下,多个线程同时更新同一文档:
若未设置retryOnConflict,第一次冲突时更新会直接失败,需要业务代码手动捕获异常并处理重试;
若设置retryOnConflict(2),则冲突时会自动重试 2 次,若第 3 次(首次 + 2 次重试)仍失败,才会抛出异常。
8、waitForActiveShards:写时高可用
在 Elasticsearch 的 Java 客户端中,waitForActiveShards 是一个用于控制写操作(如索引、更新、删除)可靠性的参数,其核心作用是在执行写操作前,等待指定数量的分片(主分片+副本分片)处于“活跃”状态,以确保数据写入的安全性和可用性。
分片活跃的含义
Elasticsearch 索引的分片分为主分片(primary shard)和副本分片(replica shard)。“活跃”指分片已成功分配到节点,且节点正常运行(可处理请求)。
控制写操作的前置条件
当发送写请求(如 IndexRequest、UpdateRequest)时,waitForActiveShards 会指定一个“最低活跃分片数量”。只有当集群中至少有这么多分片(主分片+副本分片)处于活跃状态时,才会执行写操作;否则,请求会阻塞等待,直到满足条件或超时。
例如:若索引配置为 3 个主分片、每个主分片 1 个副本(共 6 个分片),设置 waitForActiveShards=4,则写操作需等待至少 4 个分片(主+副)活跃后才执行。
平衡可靠性与效率
低数值(如 1):仅需主分片活跃即可执行,速度快,但风险高(若主分片所在节点故障,且无副本可用,数据可能丢失)。
高数值(如 all 或等于总分片数):需所有配置的分片(主+副)都活跃,安全性高,但可能因个别分片不活跃(如节点故障)导致请求阻塞超时。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)