想法:
从kafka拉数据推送,因此这里不适用OOM,后续接入kafka。

  1. 安装依赖包
   > Elastic.Clients.Elasticsearch      9.2.2    9.2.2 
   > Newtonsoft.Json                    13.0.4   13.0.4
   > Swashbuckle.AspNetCore             10.0.1   10.0.1
  1. 写helper
using Elastic.Clients.Elasticsearch;
using Elastic.Clients.Elasticsearch.Core.Bulk;
using Elastic.Clients.Elasticsearch.Nodes;
using Elastic.Clients.Elasticsearch.QueryDsl;
using Elastic.Transport;
using Newtonsoft.Json.Linq;
using System.Net;
using System.Text.Json;

namespace ElasticsearchBulkIndex
{
    public class EsHelper
    {
        private readonly ElasticsearchClient _esClient;
        public EsHelper()
        {
           var node =  new SingleNodePool(new Uri("http://192.168.220.130:31200"));

           var settings = new ElasticsearchClientSettings(node).Authentication(new BasicAuthentication("elastic", "ellischen"))
             .EnableDebugMode().DefaultIndex("123");
            _esClient = new ElasticsearchClient(settings);

        }


        public async Task<int> BulkIndex(List<JsonElement> jsonList)
        {

            var bulkRequest = new BulkRequest("ellis")
            {
                Operations = new List<IBulkOperation>()
            };

            foreach (var json in jsonList)
            {

                bulkRequest.Operations.Add(
                    new BulkIndexOperation<JsonElement>(json)
                    {
                        Id = json.TryGetProperty("id", out var id)
                             ? id.GetInt32().ToString()
                             : null
                    }
                );
            }

            BulkResponse response = await _esClient.BulkAsync(bulkRequest);

            Console.WriteLine(response.ElasticsearchServerError);
            Console.WriteLine(response.DebugInformation);
            return response.Items.Count;
        }


        public async Task<bool> BulkDelete(List<string> ids)
        {
            var delete = new DeleteByQueryRequest("ellis")
            {
                Query = new IdsQuery()
                {
                    Values=new Ids(ids)
                }
            };


            DeleteByQueryResponse deleteResponse = await _esClient.DeleteByQueryAsync(delete);

            Console.WriteLine(deleteResponse.DebugInformation);


            Console.WriteLine(deleteResponse.IsSuccess());

            return deleteResponse.IsSuccess();
        }



        public async Task<bool> ParticalUpdateByID(Dictionary<string, object> doc, string id)
        {
            UpdateResponse<object> updateResponse = await _esClient.UpdateAsync<object, object>(
                index: "ellis",
                id: id,
                u => u
                    .Doc(doc)
            );


            Console.WriteLine(updateResponse.DebugInformation);

            return updateResponse.IsSuccess();
        }
        /// <summary>
        /// 
        /// 不存在则插入,存在则更新
        /// </summary>
        /// <param name="doc"></param>
        /// <param name="id"></param>
        /// <returns></returns>
        public async Task<bool> PartialUpsertByID(Dictionary<string, object> doc, string id)
        {
            UpdateResponse<object> updateResponse = await _esClient.UpdateAsync<object, object>(
                index: "ellis",
                id: id,
                u => u
                    .Doc(doc)
                    .Upsert(true)
            );

            Console.WriteLine(updateResponse.DebugInformation);

            return updateResponse.IsSuccess();
        }


        public async Task<bool> BulkUpdate(List<Dictionary<string,object>> pairs)
        {
            BulkResponse bulkResponse =  await _esClient.BulkAsync(u => u.Index("ellis").UpdateMany<Dictionary<string, object>>(pairs, (bu, d) => bu.Doc(d).Id((Id)d["id"].ToString())));
            Console.WriteLine(bulkResponse.DebugInformation);

            return bulkResponse.IsSuccess();
        }
    }
}


  1. DI以及配置swagger
....
builder.Services.AddControllers();

builder.Services.AddSingleton<EsHelper>();

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

....
app.UseAuthorization();

app.UseSwagger();

app.UseSwaggerUI(c =>
{
    c.RoutePrefix = "swagger";
});
app.MapControllers();

app.Run();
  1. 修改launchsetting.json
"launchUrl": "swagger"
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐