Go语言实现的 Elasticsearch 分片查询与数据聚合

概述

本文档提供了一个Go语言编写的示例,演示了如何使用分片查询从Elasticsearch中高效地检索大量数据,并通过通道机制将结果汇总。该示例经过处理以保护敏感信息,适用于教学和文档编写目的。主要内容包括:

  • 数据结构定义:定义了用于表示单条记录的数据传输对象(DTO)。
  • 变量声明:声明了存储查询结果的切片及用于goroutine同步的等待组。
  • 初始化消息通道:创建了一个有容量限制的消息通道,用于在不同goroutine之间传递数据。
  • 分片查询与数据处理:展示了如何使用循环对数据进行分片查询,并通过独立的goroutine执行查询操作。
  • 等待所有goroutine完成:确保所有查询操作完成后,关闭消息通道并汇总结果。
package main

import (
    // 注意:这里省略了具体的导入包,你需要根据实际情况添加
)

// 假设DTO是某种数据传输对象结构体,请根据实际情况定义或保持原样。
type SloDTO struct {
    // 字段定义...
}

// 示例代码中使用的占位符变量和常量,请根据实际情况调整。
var (
    sloTraceDates []SloDTO
    sliceWaitGroup sync.WaitGroup
)
messagesChan := make(chan []*json.RawMessage, 60) // 通道用于接收消息

// maxSlice 和 maxPageSize 应根据实际情况设定
for i := 0; i < maxSlice; i++ {
    // 准备分片查询,使用了假设性的NewSliceQuery函数
    sliceQuery := NewSliceQuery().Id(i).Max(maxSlice)
    sliceWaitGroup.Add(1)
    go func(sliceQuery *SliceQuery) {
        defer sliceWaitGroup.Done()
        // 使用假设性的数据库客户端滚动查询数据,实际使用时请替换为真实客户端和方法调用
        scrollCli := dbClient.Scroll("your_index_name").Type("data_type").
            Query(yourQueryCondition).Size(yourMaxPageSize).Slice(sliceQuery)
        var scrollID string
        for {
            searchResult, err := scrollCli.ScrollId(scrollID).Do(context.Background())
            if err == io.EOF {
                break
            }
            if err != nil {
                // 错误日志记录,klog应替换为你实际使用的日志库
                klog.Errorf("scroll query error: %v", err)
                return
            }
            scrollID = searchResult.ScrollId
            var batchMessages []*json.RawMessage
            for _, hit := range searchResult.Hits.Hits {
                batchMessages = append(batchMessages, hit.Source)
            }
            messagesChan <- batchMessages
            // 打印语句可以根据需要保留或修改
            fmt.Println("processing data...")
        }
    }(sliceQuery)
}

// 等待所有goroutine完成,并在完成后关闭通道
go func() {
    sliceWaitGroup.Wait()
    defer close(messagesChan)
}()

var allMessages []*json.RawMessage
for batch := range messagesChan {
    allMessages = append(allMessages, batch...)
}

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐