在 Elasticsearch 中,聚合(Aggregations)是一种强大的功能,它允许你对数据集执行复杂的分析和统计操作。通过聚合,你可以获得关于数据的深入见解,如计数、平均值、分组等。下面我们将详细介绍如何使用 Java 操作 Elasticsearch 实现聚合查询。

1. 理解聚合

Elasticsearch 的聚合可以分为两大类:

  • 桶聚合(Bucket Aggregation):将文档分成不同的桶(bucket),每个桶代表一个分类或范围。常见的类型包括 terms(按字段值分组)、range(数值范围)、date_histogram(时间序列)等。

  • 度量聚合(Metric Aggregation):对桶内的文档计算统计数据,如 avg(平均值)、sum(总和)、min(最小值)、max(最大值)、stats(综合统计信息)等。

聚合可以嵌套,即在一个桶内再进行另一个聚合,从而实现更复杂的数据分析。

2. 引入依赖

确保你的项目中包含了最新的 Elasticsearch Java API Client 依赖。如果你还没有添加,请参考之前的教程中的依赖部分。

3. 编写聚合查询代码

我们将展示如何构建并执行一个简单的聚合查询,假设我们有一个用户表,并且想要根据用户的年龄进行分组,并计算每一年龄段的用户数量。

创建客户端实例

首先,确保你有一个可用的 Elasticsearch 客户端实例。可以参考之前教程中的 ElasticsearchClientFactory 类来创建。

import co.elastic.clients.elasticsearch.ElasticsearchClient;
// ... (其他必要的导入)

public class ElasticsearchClientFactory {
    public static ElasticsearchClient createClient() {
        // 创建低级别的 RestClient
        RestClient restClient = RestClient.builder(
            new HttpHost("localhost", 9200)
        ).build();

        // 使用 RestClient 创建高级别的 Elasticsearch 客户端
        ElasticsearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper()
        );

        return new ElasticsearchClient(transport);
    }
}
定义聚合查询

接下来,我们定义一个方法来执行聚合查询。这里我们将使用 terms 聚合来按年龄分组,并计算每个年龄段的用户数量。

import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Aggregation;
import co.elastic.clients.elasticsearch.core.search.Aggregations;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch._types.aggregations.TermsAggregation;
import co.elastic.clients.elasticsearch._types.aggregations.Bucket;
import java.util.List;
import java.util.Map;

public class AggregationService {

    private final ElasticsearchClient client;

    public AggregationService(ElasticsearchClient client) {
        this.client = client;
    }

    /**
     * 执行按年龄分组的聚合查询,并返回结果。
     */
    public void executeAgeGroupAggregation() throws IOException {
        // 构建搜索请求
        SearchRequest searchRequest = new SearchRequest.Builder()
            .index("users")
            .size(0) // 不需要返回具体的文档内容
            .aggregations("age_groups", 
                new TermsAggregation.Builder()
                    .field("age")
                    .build())
            .build();

        // 执行搜索请求
        SearchResponse<Map> response = client.search(searchRequest, Map.class);

        // 解析聚合结果
        if (response.aggregations() != null) {
            Aggregations aggregations = response.aggregations();
            if (aggregations.containsKey("age_groups")) {
                Aggregation ageGroupsAgg = aggregations.get("age_groups");
                if (ageGroupsAgg instanceof TermsAggregation) {
                    TermsAggregation termsAgg = (TermsAggregation) ageGroupsAgg;
                    List<Bucket<?>> buckets = termsAgg.buckets();
                    for (Bucket<?> bucket : buckets) {
                        System.out.println("Age: " + bucket.key() + ", Count: " + bucket.docCount());
                    }
                }
            }
        }
    }
}
调用聚合服务

最后,在适当的地方调用 executeAgeGroupAggregation 方法来执行聚合查询并输出结果。

public class MainApplication {

    public static void main(String[] args) throws IOException {
        try (ElasticsearchClient client = ElasticsearchClientFactory.createClient()) {
            AggregationService aggregationService = new AggregationService(client);
            aggregationService.executeAgeGroupAggregation();
        }
    }
}

4. 更多聚合示例

除了上述简单的 terms 聚合外,Elasticsearch 还支持多种类型的聚合,以下是一些常见的例子:

4.1 平均值聚合 (avg)

计算所有文档中某个数值字段的平均值。

SearchRequest searchRequest = new SearchRequest.Builder()
    .index("users")
    .size(0)
    .aggregations("average_age", 
        new AverageAggregation.Builder()
            .field("age")
            .build())
    .build();
4.2 数值范围聚合 (range)

将数值字段按指定范围分组。

SearchRequest searchRequest = new SearchRequest.Builder()
    .index("users")
    .size(0)
    .aggregations("age_ranges", 
        new RangeAggregation.Builder()
            .field("age")
            .ranges(r -> r
                .ranges(List.of(
                    new RangeAggregation.Range.Builder().from(0).to(20).build(),
                    new RangeAggregation.Range.Builder().from(20).to(40).build(),
                    new RangeAggregation.Range.Builder().from(40).to(Double.POSITIVE_INFINITY).build()
                ))
            )
            .build())
    .build();
4.3 时间直方图聚合 (date_histogram)

按时间间隔对日期字段进行分组。

SearchRequest searchRequest = new SearchRequest.Builder()
    .index("users")
    .size(0)
    .aggregations("signup_date_histogram", 
        new DateHistogramAggregation.Builder()
            .field("signup_date")
            .calendarInterval(DateHistogramAggregation.CalendarInterval.Month)
            .build())
    .build();
4.4 嵌套聚合

在一个桶内再进行另一个聚合。

SearchRequest searchRequest = new SearchRequest.Builder()
    .index("users")
    .size(0)
    .aggregations("age_groups", 
        new TermsAggregation.Builder()
            .field("age")
            .aggregations("average_salary", 
                new AverageAggregation.Builder()
                    .field("salary")
                    .build())
            .build())
    .build();
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐