.net 8 添加swagger以及批量index,批量删除 elasticsearch,update,批量update
【代码】.net 8 添加swagger以及批量index,批量删除 elasticsearch。
·
想法:
从kafka拉数据推送,因此这里不适用OOM,后续接入kafka。
- 安装依赖包
> Elastic.Clients.Elasticsearch 9.2.2 9.2.2
> Newtonsoft.Json 13.0.4 13.0.4
> Swashbuckle.AspNetCore 10.0.1 10.0.1
- 写helper
using Elastic.Clients.Elasticsearch;
using Elastic.Clients.Elasticsearch.Core.Bulk;
using Elastic.Clients.Elasticsearch.Nodes;
using Elastic.Clients.Elasticsearch.QueryDsl;
using Elastic.Transport;
using Newtonsoft.Json.Linq;
using System.Net;
using System.Text.Json;
namespace ElasticsearchBulkIndex
{
public class EsHelper
{
private readonly ElasticsearchClient _esClient;
public EsHelper()
{
var node = new SingleNodePool(new Uri("http://192.168.220.130:31200"));
var settings = new ElasticsearchClientSettings(node).Authentication(new BasicAuthentication("elastic", "ellischen"))
.EnableDebugMode().DefaultIndex("123");
_esClient = new ElasticsearchClient(settings);
}
public async Task<int> BulkIndex(List<JsonElement> jsonList)
{
var bulkRequest = new BulkRequest("ellis")
{
Operations = new List<IBulkOperation>()
};
foreach (var json in jsonList)
{
bulkRequest.Operations.Add(
new BulkIndexOperation<JsonElement>(json)
{
Id = json.TryGetProperty("id", out var id)
? id.GetInt32().ToString()
: null
}
);
}
BulkResponse response = await _esClient.BulkAsync(bulkRequest);
Console.WriteLine(response.ElasticsearchServerError);
Console.WriteLine(response.DebugInformation);
return response.Items.Count;
}
public async Task<bool> BulkDelete(List<string> ids)
{
var delete = new DeleteByQueryRequest("ellis")
{
Query = new IdsQuery()
{
Values=new Ids(ids)
}
};
DeleteByQueryResponse deleteResponse = await _esClient.DeleteByQueryAsync(delete);
Console.WriteLine(deleteResponse.DebugInformation);
Console.WriteLine(deleteResponse.IsSuccess());
return deleteResponse.IsSuccess();
}
public async Task<bool> ParticalUpdateByID(Dictionary<string, object> doc, string id)
{
UpdateResponse<object> updateResponse = await _esClient.UpdateAsync<object, object>(
index: "ellis",
id: id,
u => u
.Doc(doc)
);
Console.WriteLine(updateResponse.DebugInformation);
return updateResponse.IsSuccess();
}
/// <summary>
///
/// 不存在则插入,存在则更新
/// </summary>
/// <param name="doc"></param>
/// <param name="id"></param>
/// <returns></returns>
public async Task<bool> PartialUpsertByID(Dictionary<string, object> doc, string id)
{
UpdateResponse<object> updateResponse = await _esClient.UpdateAsync<object, object>(
index: "ellis",
id: id,
u => u
.Doc(doc)
.Upsert(true)
);
Console.WriteLine(updateResponse.DebugInformation);
return updateResponse.IsSuccess();
}
public async Task<bool> BulkUpdate(List<Dictionary<string,object>> pairs)
{
BulkResponse bulkResponse = await _esClient.BulkAsync(u => u.Index("ellis").UpdateMany<Dictionary<string, object>>(pairs, (bu, d) => bu.Doc(d).Id((Id)d["id"].ToString())));
Console.WriteLine(bulkResponse.DebugInformation);
return bulkResponse.IsSuccess();
}
}
}
- DI以及配置swagger
....
builder.Services.AddControllers();
builder.Services.AddSingleton<EsHelper>();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
....
app.UseAuthorization();
app.UseSwagger();
app.UseSwaggerUI(c =>
{
c.RoutePrefix = "swagger";
});
app.MapControllers();
app.Run();
- 修改launchsetting.json
"launchUrl": "swagger"
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)