/// <summary>
        /// 
        /// </summary>
        /// <param name="pf"></param>
        /// <param name="dteData"></param>
        /// <param name="shredConfig">key:column; value:alias/field</param>
        /// <returns></returns>
        public async Task Save(PFGlobalParameter pf, List<DbTableEsData> dteData, ShredConfig shredConfig)
        {
            var esConfig = _configurationBuilderService.GetEsConfig(pf);
            string indexName = $"{pf.OwnerId}-{pf.ModuleKey}";
            var client = _esClientProvider.GetClient(indexName, esConfig);

            if (shredConfig.PrimaryColumn == null || string.IsNullOrEmpty(shredConfig.PrimaryColumn.Field))
            {
                _logger.LogError("【shredConfig:primaryColumn:field】未配置。");

                return;

            }
            if (shredConfig.MapperColumn == null)
            {
                shredConfig.MapperColumn = new List<DataKeyValue>();
            }

            if (!client.Indices.Exists(pf.ModuleKey).Exists)
            {
                client.CreateIndex<DbTableEsData>(indexName);
            }

            BulkDescriptor descriptor = new BulkDescriptor();

            int resultU = 0;
            int resultI = 0;
            int resultD = 0;

            try
            {
                foreach (var item in dteData)
                {
                    item.Id = item.Data[shredConfig.PrimaryColumn.Field].ToString();

                    var itemD = new Dictionary<string, object>();

                    itemD["id"] = item.Id;
                    itemD["shredType"] = item.ShredType;
                    itemD["connName"] = item.ConnName ?? shredConfig.ConnName;
                    itemD["connIndex"] = item.ConnIndex;
                    itemD["dbNameSuffix"] = item.DbNameSuffix;
                    itemD["tableNameSuffix"] = item.TableNameSuffix;
                    itemD["sql"] = item.Sql;

                    if (item.Data.ContainsKey("create_user_id"))
                    {
                        itemD["createUserId"] = item.Data["create_user_id"];
                    }
                    else if (item.Data.ContainsKey("createUserId"))
                    {
                        itemD["createUserId"] = item.Data["createUserId"];
                    }
                    if (item.Data.ContainsKey("modify_user_id"))
                    {
                        itemD["modifyUserId"] = item.Data["modify_user_id"];
                    }
                    else if (item.Data.ContainsKey("modifyUserId"))
                    {
                        itemD["modifyUserId"] = item.Data["modifyUserId"];
                    }

                    if (item.Data.ContainsKey("createDate"))
                    {
                        itemD["createDate"] = item.Data["createDate"];
                    }
                    else if (item.Data.ContainsKey("create_date"))
                    {
                        itemD["createDate"] = item.Data["create_date"];
                    }
                    else if (item.Data.ContainsKey("create_time"))
                    {
                        itemD["createDate"] = item.Data["create_time"];
                    }
                    else if (item.Data.ContainsKey("createTime"))
                    {
                        itemD["createDate"] = item.Data["createTime"];
                    }

                    if (item.Data.ContainsKey("modifyDate"))
                    {
                        itemD["modifyDate"] = item.Data["modifyDate"];
                    }
                    else if (item.Data.ContainsKey("modify_date"))
                    {
                        itemD["modifyDate"] = item.Data["modify_date"];
                    }
                    else if (item.Data.ContainsKey("modify_time"))
                    {
                        itemD["modifyDate"] = item.Data["modify_time"];
                    }
                    else if (item.Data.ContainsKey("modifyTime"))
                    {
                        itemD["modifyDate"] = item.Data["modifyTime"];
                    }


                    foreach (var column in shredConfig.MapperColumn)
                    {
                        if (!item.Data.ContainsKey(column.Field)) continue;
                        itemD[column.Key] = item.Data[column.Field];
                    }

                    if ((item.Data.ContainsKey("batO") && item.Data["batO"].ToString() == "d") || (item.Data.ContainsKey("rmt") && item.Data["rmt"].ToString() == "delete"))
                    {

                        var responseD = client.Delete<dynamic>(item.Id);
                        if (responseD.IsValid)
                        {
                            resultD++;
                        }
                        else
                        {
                            _logger.LogError(responseD.OriginalException, "【RG3.DO.Elasticsearch.Providers】【delete】IsValid异常");
                        }
                    }
                    else
                    {

                        var documentExists = client.DocumentExists<dynamic>(item.Id);
                        if (documentExists.Exists)
                        {
                            var responseU = await client.UpdateAsync<dynamic>(item.Id, x => x.Doc(itemD));
                            if (responseU.IsValid)
                            {
                                resultU++;
                            }
                            else
                            {
                                _logger.LogError(responseU.OriginalException, "【RG3.DO.Elasticsearch.Providers】【update】IsValid异常");
                            }
                        }
                        else
                        {

                            descriptor.Index<dynamic>(op => op.Document(itemD).Index(indexName).Id(item.Id));
                            resultI++;
                        }

                    }
                }

                if (resultI == 0) return;
                var response = await client.BulkAsync(descriptor);
                if (!response.IsValid)
                {
                    _logger.LogError(response.OriginalException, "【RG3.DO.Elasticsearch.Providers】【insert】IsValid异常");
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "【RG3.DO.Elasticsearch.Providers】【all】异常");
            }
        }
文档更新时间: 2021-09-14 22:17   作者:admin