/// <summary>
///
/// </summary>
/// <param name="pf"></param>
/// <param name="dteData"></param>
/// <param name="shredConfig">key:column; value:alias/field</param>
/// <returns></returns>
public async Task Save(PFGlobalParameter pf, List<DbTableEsData> dteData, ShredConfig shredConfig)
{
var esConfig = _configurationBuilderService.GetEsConfig(pf);
string indexName = $"{pf.OwnerId}-{pf.ModuleKey}";
var client = _esClientProvider.GetClient(indexName, esConfig);
if (shredConfig.PrimaryColumn == null || string.IsNullOrEmpty(shredConfig.PrimaryColumn.Field))
{
_logger.LogError("【shredConfig:primaryColumn:field】未配置。");
return;
}
if (shredConfig.MapperColumn == null)
{
shredConfig.MapperColumn = new List<DataKeyValue>();
}
if (!client.Indices.Exists(pf.ModuleKey).Exists)
{
client.CreateIndex<DbTableEsData>(indexName);
}
BulkDescriptor descriptor = new BulkDescriptor();
int resultU = 0;
int resultI = 0;
int resultD = 0;
try
{
foreach (var item in dteData)
{
item.Id = item.Data[shredConfig.PrimaryColumn.Field].ToString();
var itemD = new Dictionary<string, object>();
itemD["id"] = item.Id;
itemD["shredType"] = item.ShredType;
itemD["connName"] = item.ConnName ?? shredConfig.ConnName;
itemD["connIndex"] = item.ConnIndex;
itemD["dbNameSuffix"] = item.DbNameSuffix;
itemD["tableNameSuffix"] = item.TableNameSuffix;
itemD["sql"] = item.Sql;
if (item.Data.ContainsKey("create_user_id"))
{
itemD["createUserId"] = item.Data["create_user_id"];
}
else if (item.Data.ContainsKey("createUserId"))
{
itemD["createUserId"] = item.Data["createUserId"];
}
if (item.Data.ContainsKey("modify_user_id"))
{
itemD["modifyUserId"] = item.Data["modify_user_id"];
}
else if (item.Data.ContainsKey("modifyUserId"))
{
itemD["modifyUserId"] = item.Data["modifyUserId"];
}
if (item.Data.ContainsKey("createDate"))
{
itemD["createDate"] = item.Data["createDate"];
}
else if (item.Data.ContainsKey("create_date"))
{
itemD["createDate"] = item.Data["create_date"];
}
else if (item.Data.ContainsKey("create_time"))
{
itemD["createDate"] = item.Data["create_time"];
}
else if (item.Data.ContainsKey("createTime"))
{
itemD["createDate"] = item.Data["createTime"];
}
if (item.Data.ContainsKey("modifyDate"))
{
itemD["modifyDate"] = item.Data["modifyDate"];
}
else if (item.Data.ContainsKey("modify_date"))
{
itemD["modifyDate"] = item.Data["modify_date"];
}
else if (item.Data.ContainsKey("modify_time"))
{
itemD["modifyDate"] = item.Data["modify_time"];
}
else if (item.Data.ContainsKey("modifyTime"))
{
itemD["modifyDate"] = item.Data["modifyTime"];
}
foreach (var column in shredConfig.MapperColumn)
{
if (!item.Data.ContainsKey(column.Field)) continue;
itemD[column.Key] = item.Data[column.Field];
}
if ((item.Data.ContainsKey("batO") && item.Data["batO"].ToString() == "d") || (item.Data.ContainsKey("rmt") && item.Data["rmt"].ToString() == "delete"))
{
var responseD = client.Delete<dynamic>(item.Id);
if (responseD.IsValid)
{
resultD++;
}
else
{
_logger.LogError(responseD.OriginalException, "【RG3.DO.Elasticsearch.Providers】【delete】IsValid异常");
}
}
else
{
var documentExists = client.DocumentExists<dynamic>(item.Id);
if (documentExists.Exists)
{
var responseU = await client.UpdateAsync<dynamic>(item.Id, x => x.Doc(itemD));
if (responseU.IsValid)
{
resultU++;
}
else
{
_logger.LogError(responseU.OriginalException, "【RG3.DO.Elasticsearch.Providers】【update】IsValid异常");
}
}
else
{
descriptor.Index<dynamic>(op => op.Document(itemD).Index(indexName).Id(item.Id));
resultI++;
}
}
}
if (resultI == 0) return;
var response = await client.BulkAsync(descriptor);
if (!response.IsValid)
{
_logger.LogError(response.OriginalException, "【RG3.DO.Elasticsearch.Providers】【insert】IsValid异常");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "【RG3.DO.Elasticsearch.Providers】【all】异常");
}
}
文档更新时间: 2021-09-14 22:17 作者:admin