如何在 C++ 中使用 Elasticsearch 的详细指南,包括适用场景、完整代码示例和测试代码。由于 Elasticsearch 没有官方的 C++ 客户端,我们将使用 libcurl(HTTP 请求库)与 Elasticsearch 的 RESTful API 交互,并结合 nlohmann/json 库解析 JSON 数据。这是一种常见的 C++ 集成方式,适用于需要直接与 Elasticsearch API 通信的场景。以下内容涵盖环境搭建、代码实现、测试用例及注意事项。


一、适用场景

Elasticsearch 是一个分布式全文搜索引擎,结合 C++ 使用时,适用于以下场景:

  1. 高性能日志分析

    • 场景:实时分析服务器或应用程序日志(如游戏服务器、物联网设备日志)。
    • 示例:C++ 应用程序收集设备日志,通过 Elasticsearch REST API 存储并查询。
  2. 全文搜索

    • 场景:构建高效的搜索功能(如企业内部文档搜索、实时日志搜索)。
    • 示例:C++ 后端接收用户查询,调用 Elasticsearch API 返回搜索结果。
  3. 数据分析

    • 场景:对大规模数据进行聚合分析(如传感器数据统计、用户行为分析)。
    • 示例:C++ 程序从 Elasticsearch 获取聚合结果,生成报表。
  4. 实时监控

    • 场景:监控系统性能指标(如网络流量、服务器负载)。
    • 示例:C++ 程序通过 Elasticsearch 查询 Metricbeat 采集的指标,触发告警。
  5. 嵌入式系统

    • 场景:在资源受限的嵌入式设备中集成轻量级 Elasticsearch 客户端。
    • 示例:物联网设备通过 C++ 向 Elasticsearch 发送传感器数据。

二、环境准备

1. 安装 Elasticsearch

  • 下载并安装 Elasticsearch(推荐最新版本,如 8.x):Elasticsearch 下载
  • 默认运行在 http://localhost:9200
  • 可选:安装 Kibana 用于调试和可视化查询。

2. 依赖库

  • libcurl:用于发送 HTTP 请求。
    • Ubuntu:sudo apt-get install libcurl4-openssl-dev
    • macOS:brew install curl
    • Windows:下载预编译的 libcurl 或通过 vcpkg 安装(vcpkg install curl)。
  • nlohmann/json:用于解析和生成 JSON 数据。
    • 通过 GitHub 获取或使用包管理器:
      git clone https://github.com/nlohmann/json.git
      
      或通过 vcpkg:vcpkg install nlohmann-json

3. 创建 C++ 项目

  • 使用 CMake 组织项目,确保跨平台兼容。
  • 示例项目:ElasticsearchCppDemo
mkdir ElasticsearchCppDemo
cd ElasticsearchCppDemo
mkdir src tests
touch CMakeLists.txt src/main.cpp src/ElasticsearchClient.hpp src/ElasticsearchClient.cpp

4. 配置 CMake

编辑 CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(ElasticsearchCppDemo)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# 查找 libcurl
find_package(CURL REQUIRED)
include_directories(${CURL_INCLUDE_DIRS})

# 添加 nlohmann/json
include_directories(${CMAKE_SOURCE_DIR}/third_party/json/include)

# 主程序
add_executable(ElasticsearchDemo src/main.cpp src/ElasticsearchClient.cpp)
target_link_libraries(ElasticsearchDemo ${CURL_LIBRARIES})

# 测试程序
enable_testing()
add_executable(TestElasticsearch tests/test_elasticsearch.cpp src/ElasticsearchClient.cpp)
target_link_libraries(TestElasticsearch ${CURL_LIBRARIES})
add_test(NAME ElasticsearchTests COMMAND TestElasticsearch)
  • 说明:假设 nlohmann/json 的头文件位于 third_party/json/include。根据实际路径调整。

三、完整代码示例

以下是一个完整的 C++ 代码示例,展示如何通过 REST API 与 Elasticsearch 交互,包括创建索引、插入文档、查询、更新、删除和聚合分析。我们以商品管理场景为例。

1. 项目结构

  • ElasticsearchClient.hpp:定义 Elasticsearch 客户端接口。
  • ElasticsearchClient.cpp:实现 HTTP 请求和 JSON 处理。
  • main.cpp:主程序,演示 CRUD 和聚合。
  • tests/test_elasticsearch.cpp:测试代码。

2. 代码实现

(1)ElasticsearchClient.hpp
#ifndef ELASTICSEARCH_CLIENT_HPP
#define ELASTICSEARCH_CLIENT_HPP

#include <string>
#include <curl/curl.h>
#include <nlohmann/json.hpp>

class ElasticsearchClient {
public:
    ElasticsearchClient(const std::string& host = "http://localhost:9200");
    ~ElasticsearchClient();

    // 创建索引
    bool createIndex(const std::string& index, const nlohmann::json& mapping);

    // 插入文档
    bool indexDocument(const std::string& index, const std::string& id, const nlohmann::json& document);

    // 查询文档
    nlohmann::json search(const std::string& index, const nlohmann::json& query);

    // 更新文档
    bool updateDocument(const std::string& index, const std::string& id, const nlohmann::json& document);

    // 删除文档
    bool deleteDocument(const std::string& index, const std::string& id);

    // 聚合分析
    nlohmann::json aggregate(const std::string& index, const nlohmann::json& query);

private:
    std::string host_;
    CURL* curl_;
    static size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::string* userp);

    // 执行 HTTP 请求
    bool performRequest(const std::string& method, const std::string& url, const std::string& data, std::string& response);
};

#endif
(2)ElasticsearchClient.cpp
#include "ElasticsearchClient.hpp"
#include <iostream>

ElasticsearchClient::ElasticsearchClient(const std::string& host) : host_(host) {
    curl_ = curl_easy_init();
    if (!curl_) {
        throw std::runtime_error("Failed to initialize libcurl");
    }
}

ElasticsearchClient::~ElasticsearchClient() {
    if (curl_) {
        curl_easy_cleanup(curl_);
    }
}

size_t ElasticsearchClient::WriteCallback(void* contents, size_t size, size_t nmemb, std::string* userp) {
    userp->append((char*)contents, size * nmemb);
    return size * nmemb;
}

bool ElasticsearchClient::performRequest(const std::string& method, const std::string& url, const std::string& data, std::string& response) {
    CURLcode res;
    response.clear();

    curl_easy_setopt(curl_, CURLOPT_URL, url.c_str());
    curl_easy_setopt(curl_, CURLOPT_CUSTOMREQUEST, method.c_str());
    curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, WriteCallback);
    curl_easy_setopt(curl_, CURLOPT_WRITEDATA, &response);

    if (!data.empty()) {
        curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, data.c_str());
        struct curl_slist* headers = nullptr;
        headers = curl_slist_append(headers, "Content-Type: application/json");
        curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers);
    }

    res = curl_easy_perform(curl_);
    if (res != CURLE_OK) {
        std::cerr << "HTTP request failed: " << curl_easy_strerror(res) << std::endl;
        return false;
    }

    long response_code;
    curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &response_code);
    return response_code >= 200 && response_code < 300;
}

bool ElasticsearchClient::createIndex(const std::string& index, const nlohmann::json& mapping) {
    std::string url = host_ + "/" + index;
    std::string response;
    return performRequest("PUT", url, mapping.dump(), response);
}

bool ElasticsearchClient::indexDocument(const std::string& index, const std::string& id, const nlohmann::json& document) {
    std::string url = host_ + "/" + index + "/_doc/" + id;
    std::string response;
    return performRequest("PUT", url, document.dump(), response);
}

bool ElasticsearchClient::updateDocument(const std::string& index, const std::string& id, const nlohmann::json& document) {
    std::string url = host_ + "/" + index + "/_doc/" + id + "/_update";
    nlohmann::json update_body = {{"doc", document}};
    std::string response;
    return performRequest("POST", url, update_body.dump(), response);
}

bool ElasticsearchClient::deleteDocument(const std::string& index, const std::string& id) {
    std::string url = host_ + "/" + index + "/_doc/" + id;
    std::string response;
    return performRequest("DELETE", url, "", response);
}

nlohmann::json ElasticsearchClient::search(const std::string& index, const nlohmann::json& query) {
    std::string url = host_ + "/" + index + "/_search";
    std::string response;
    if (performRequest("POST", url, query.dump(), response)) {
        try {
            return nlohmann::json::parse(response);
        } catch (const std::exception& e) {
            std::cerr << "JSON parse error: " << e.what() << std::endl;
        }
    }
    return {};
}

nlohmann::json ElasticsearchClient::aggregate(const std::string& index, const nlohmann::json& query) {
    std::string url = host_ + "/" + index + "/_search";
    std::string response;
    if (performRequest("POST", url, query.dump(), response)) {
        try {
            return nlohmann::json::parse(response);
        } catch (const std::exception& e) {
            std::cerr << "JSON parse error: " << e.what() << std::endl;
        }
    }
    return {};
}
(3)main.cpp
#include "ElasticsearchClient.hpp"
#include <nlohmann/json.hpp>
#include <iostream>

using json = nlohmann::json;

int main() {
    ElasticsearchClient client("http://localhost:9200");

    // 1. 创建索引
    json mapping = {
        {"settings", {
            {"number_of_shards", 3},
            {"number_of_replicas", 1}
        }},
        {"mappings", {
            {"properties", {
                {"name", {{"type", "text"}, {"analyzer", "ik_max_word"}}},
                {"category", {{"type", "keyword"}}},
                {"price", {{"type", "double"}}},
                {"create_time", {{"type", "date"}, {"format", "yyyy-MM-dd HH:mm:ss"}}}
            }}
        }}
    };
    if (client.createIndex("products", mapping)) {
        std::cout << "Index created successfully\n";
    } else {
        std::cerr << "Failed to create index\n";
    }

    // 2. 插入文档
    json doc1 = {
        {"name", "华为手机"},
        {"category", "手机"},
        {"price", 2999.99},
        {"create_time", "2025-10-06 12:00:00"}
    };
    json doc2 = {
        {"name", "小米手机"},
        {"category", "手机"},
        {"price", 1999.99},
        {"create_time", "2025-10-06 12:00:00"}
    };
    json doc3 = {
        {"name", "苹果笔记本"},
        {"category", "笔记本"},
        {"price", 9999.99},
        {"create_time", "2025-10-06 12:00:00"}
    };
    client.indexDocument("products", "1", doc1);
    client.indexDocument("products", "2", doc2);
    client.indexDocument("products", "3", doc3);
    std::cout << "Documents indexed successfully\n";

    // 3. 查询文档(全文搜索)
    json search_query = {
        {"query", {
            {"match", {
                {"name", "手机"}
            }}
        }}
    };
    json search_result = client.search("products", search_query);
    if (!search_result.empty()) {
        std::cout << "Search results:\n";
        for (const auto& hit : search_result["hits"]["hits"]) {
            std::cout << hit["_source"].dump(2) << "\n";
        }
    }

    // 4. 更新文档
    json update_doc = {{"price", 3499.99}};
    if (client.updateDocument("products", "1", update_doc)) {
        std::cout << "Document updated successfully\n";
    }

    // 5. 删除文档
    if (client.deleteDocument("products", "2")) {
        std::cout << "Document deleted successfully\n";
    }

    // 6. 聚合分析
    json agg_query = {
        {"size", 0},
        {"aggs", {
            {"by_category", {
                {"terms", {{"field", "category"}, {"size", 10}}},
                {"aggs", {
                    {"avg_price", {{"avg", {{"field", "price"}}}}}
                }}
            }}
        }}
    };
    json agg_result = client.aggregate("products", agg_query);
    if (!agg_result.empty()) {
        std::cout << "Aggregation results:\n";
        for (const auto& bucket : agg_result["aggregations"]["by_category"]["buckets"]) {
            std::cout << "Category: " << bucket["key"].get<std::string>()
                      << ", Avg Price: " << bucket["avg_price"]["value"].get<double>() << "\n";
        }
    }

    return 0;
}

3. 测试代码

(1)tests/test_elasticsearch.cpp
#include "ElasticsearchClient.hpp"
#include <nlohmann/json.hpp>
#include <cassert>
#include <iostream>

using json = nlohmann::json;

void test_create_and_search() {
    ElasticsearchClient client("http://localhost:9200");
    std::string index = "products_test";

    // 1. 删除现有索引
    client.performRequest("DELETE", "http://localhost:9200/" + index, "", std::string());

    // 2. 创建索引
    json mapping = {
        {"mappings", {
            {"properties", {
                {"name", {{"type", "text"}}},
                {"category", {{"type", "keyword"}}},
                {"price", {{"type", "double"}}}
            }}
        }}
    };
    assert(client.createIndex(index, mapping));

    // 3. 插入测试数据
    json doc = {
        {"name", "测试手机"},
        {"category", "手机"},
        {"price", 1999.99}
    };
    assert(client.indexDocument(index, "test1", doc));

    // 4. 查询测试
    json query = {
        {"query", {
            {"match", {
                {"name", "手机"}
            }}
        }}
    };
    json result = client.search(index, query);
    assert(!result.empty());
    assert(result["hits"]["hits"].size() == 1);
    assert(result["hits"]["hits"][0]["_source"]["name"] == "测试手机");

    std::cout << "Test create_and_search passed\n";
}

void test_update() {
    ElasticsearchClient client("http://localhost:9200");
    std::string index = "products_test";

    // 1. 插入测试数据
    json doc = {
        {"name", "测试笔记本"},
        {"category", "笔记本"},
        {"price", 4999.99}
    };
    assert(client.indexDocument(index, "test2", doc));

    // 2. 更新价格
    json update_doc = {{"price", 5999.99}};
    assert(client.updateDocument(index, "test2", update_doc));

    // 3. 查询验证
    json query = {
        {"query", {
            {"term", {
                {"_id", "test2"}
            }}
        }}
    };
    json result = client.search(index, query);
    assert(!result.empty());
    assert(result["hits"]["hits"][0]["_source"]["price"] == 5999.99);

    std::cout << "Test update passed\n";
}

int main() {
    test_create_and_search();
    test_update();
    return 0;
}

4. 编译和运行

  1. 安装依赖
    • 确保 libcurl 和 nlohmann/json 已安装。
    • 将 nlohmann/json 的 include 目录复制到项目 third_party/json/include
  2. 编译项目
    mkdir build
    cd build
    cmake ..
    make
    
  3. 运行主程序
    ./ElasticsearchDemo
    
  4. 运行测试
    ./TestElasticsearch
    

四、代码说明

1. 客户端实现

  • ElasticsearchClient:封装 libcurl 的 HTTP 请求,支持 PUT、POST、DELETE 方法。
  • JSON 处理:使用 nlohmann/json 解析和生成 JSON 数据,简化与 Elasticsearch API 的交互。
  • HTTP 请求
    • 使用 performRequest 方法统一处理 HTTP 请求。
    • 通过 WriteCallback 捕获响应数据。

2. CRUD 操作

  • 创建索引:发送 PUT 请求,设置分片、副本和字段映射(支持 IK 分词器)。
  • 插入文档:PUT 请求到 /{index}/_doc/{id}
  • 查询文档:POST 请求到 /{index}/_search,支持全文搜索(match)和精确匹配(term)。
  • 更新文档:POST 请求到 /{index}/_doc/{id}/_update,仅更新指定字段。
  • 删除文档:DELETE 请求到 /{index}/_doc/{id}

3. 聚合分析

  • 发送聚合查询,统计每个分类的平均价格。
  • 使用 terms 聚合分组,嵌套 avg 聚合计算平均值。

4. 测试代码

  • 使用 assert 验证索引创建、文档插入、查询和更新功能。
  • 每次测试前删除测试索引,确保环境干净。

五、注意事项

  1. Elasticsearch 版本

    • 示例基于 Elasticsearch 8.x,确保 REST API 兼容。
    • 若使用 7.x,需调整 Mapping(移除 _type)。
  2. 中文分词

    • 安装 IK 分词器以支持中文搜索:
      ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.14.0/elasticsearch-analysis-ik-8.14.0.zip
      
    • 配置 ik_max_wordik_smart 模式。
  3. 错误处理

    • 检查 HTTP 响应状态码(200-299 表示成功)。
    • 捕获 JSON 解析异常,确保程序健壮性。
  4. 性能优化

    • 批量操作:使用 _bulk API 批量插入文档,减少请求次数。
      json bulk_data = {
          {{"index", {{"_index", "products"}, {"_id", "1"}}}},
          {{"name", "华为手机"}, {"category", "手机"}, {"price", 2999.99}},
          {{"index", {{"_index", "products"}, {"_id", "2"}}}},
          {{"name", "小米手机"}, {"category", "手机"}, {"price", 1999.99}}
      };
      client.performRequest("POST", host_ + "/_bulk", bulk_data.dump("\n"), response);
      
    • 连接复用:保持 libcurl 的连接复用,避免重复初始化。
  5. 安全性

    • 生产环境使用 HTTPS 和认证:
      curl_easy_setopt(curl_, CURLOPT_USERNAME, "elastic");
      curl_easy_setopt(curl_, CURLOPT_PASSWORD, "your_password");
      

六、扩展功能

1. 异步请求

  • 使用 C++ 异步库(如 boost::asio)实现异步 HTTP 请求,提高并发性能。
  • 示例(需额外依赖 Boost):
    #include <boost/asio.hpp>
    // 实现异步 HTTP 请求逻辑
    

2. 连接池

  • 使用第三方库(如 cpp-httplib)实现连接池,优化多节点访问:
    vcpkg install cpp-httplib
    

3. 错误重试

  • 实现指数退避重试机制,处理网络不稳定:
    bool retryRequest(const std::string& method, const std::string& url, const std::string& data, std::string& response, int retries = 3) {
        for (int i = 0; i < retries; ++i) {
            if (performRequest(method, url, data, response)) return true;
            std::this_thread::sleep_for(std::chrono::milliseconds(1000 * (1 << i)));
        }
        return false;
    }
    

七、常见问题

  1. 连接失败

    • 检查 Elasticsearch 是否运行(curl http://localhost:9200)。
    • 确保防火墙允许 9200 端口。
  2. JSON 解析错误

    • 验证响应数据格式,确保 Elasticsearch 返回有效 JSON。
    • 使用 try-catch 捕获 nlohmann/json 异常。
  3. 性能瓶颈

    • 批量操作:使用 _bulk API 减少请求。
    • 分片优化:设置合理分片数量(20-50GB/分片)。
    • 缓存:启用 Elasticsearch 查询缓存。
  4. 中文搜索问题

    • 确保 IK 分词器正确安装和配置。
    • 测试分词效果:POST /products/_analyze {"analyzer": "ik_max_word", "text": "华为手机"}

八、总结

通过 libcurl 和 nlohmann/json,C++ 可以高效调用 Elasticsearch 的 REST API,实现索引管理、文档 CRUD 和聚合分析。示例代码展示了如何在商品管理场景中集成 Elasticsearch,测试代码确保功能正确性。虽然 C++ 没有官方客户端,但 REST API 的通用性使其易于扩展。

如果你有更具体的需求(如复杂查询、集群管理、性能优化),请提供细节,我可以进一步定制代码或提供优化方案!

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐