同步数据方式


/// <summary>
        /// 要同步到数据
        /// </summary>
        /// <param name="pfGlobal"></param>
        /// <param name="schedulerConfig"></param>
        /// <param name="dicData"></param>
        /// <param name="table">his/last</param>
        /// <returns></returns>
        public async Task<int> DoAsync(PFGlobalParameter pfGlobal, SchedulerConfig schedulerConfig, IEnumerable<Dictionary<string, object>> dicData, string table)
        {
                var doMethod = schedulerConfig.Bulk.DoMethod;
            if (doMethod.Contains("result") && (!string.IsNullOrEmpty(schedulerConfig.ResponseDataLs)))
            {
                Console.WriteLine($"【交互结果】【{DateTime.Now}{schedulerConfig.ResponseDataLs}");
                FileUtil.CreateFileAndContentAsync(schedulerConfig.ResponseDataLs, JsonUtil.Serialize(new { success = true, data = dicData }, false, false, false));
            }

            if (doMethod.Contains("db-del"))
            {
                //count == 0 &&  int count = Convert.ToInt32(_db.QueryDataTable(pfGlobal, schedulerConfig.ConnName, "select count(*) trx_started from information_schema.INNODB_TRX where trx_started is not null", new { }).Rows[0][0]);
                if (!ValidateUtil.ValidateExistsValue(schedulerConfig.DeleteSqlV))
                {
                    Console.WriteLine($"【删除】【{DateTime.Now}{schedulerConfig.Title},DeleteSql未配置");
                    return await Task.FromResult(0);
                }
                int resultT = 0;
                try
                {
                    //执行对应历史数据删除
                    Dictionary<string, object> dicParams = new Dictionary<string, object>();
                    RequestPFUtil.ConvertDicBySystemParamDate(dicParams);
                    resultT = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.DeleteSqlV.Replace("{table}", table), dicParams);
                    Console.WriteLine($"【删除】【{DateTime.Now}{schedulerConfig.Title}{resultT}条");
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"【删除异常】【{DateTime.Now}{schedulerConfig.Title}{ex.Message}条");
                }
                return await Task.FromResult(resultT);
            }
            else if (doMethod.Contains("db") && !string.IsNullOrEmpty(schedulerConfig.FromParamsDir) && !string.IsNullOrEmpty(schedulerConfig.FromParamsSuffix))
            {
                try
                {
                    DirectoryInfo dir = new DirectoryInfo(PhyPathUtil.ConvertVirToPhyPath(schedulerConfig.FromParamsDir));
                    FileInfo[] finfo = dir.GetFiles(schedulerConfig.FromParamsSuffix);
                    int resultC = 0;
                    foreach (var item in finfo)
                    {
                        string valueT = FileUtil.ReadFileAndContentByVirUrl(item.FullName);
                        if (!string.IsNullOrEmpty(valueT) && valueT.Contains("\\"data\\""))
                        {
                            var jobject = JsonUtil.Deserialize<JObject>(valueT);
                            List<Dictionary<string, object>> dicT = new List<Dictionary<string, object>>();
                            JTokenUtil.ConvertJArrayToDictionary(jobject.SelectToken("data") as JArray, dicT, new Dictionary<string, object>());
                            resultC = resultC + _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicT);

                        }
                    }
                    return await Task.FromResult(resultC);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"【插入异常】{schedulerConfig.Title}{ex.Message}条");
                    return await Task.FromResult(0);
                }
                //return await Task.FromResult(0);
            }
            else if (doMethod.Contains("bulk"))
            {
                return await DoAsyncDb(pfGlobal, schedulerConfig, dicData);
            }
            else if (doMethod.Contains("db"))
            {
                try
                {
                    int result = 0;
                    if (schedulerConfig.SaveSqlV != null)
                    {
                        result = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicData);
                    }
                    return await Task.FromResult(result);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"【插入异常】{schedulerConfig.Title}{ex.Message}条");
                    return await Task.FromResult(0);
                }
                //return await Task.FromResult(0);
            }
            else if (doMethod.Contains("sql"))
            {
                return await DoAsyncSql(pfGlobal, schedulerConfig, dicData);
            }

            if (doMethod.Contains("json"))
            {
                return await DoAsyncJson(pfGlobal, schedulerConfig, dicData);
            }
            return await Task.FromResult(0);
        }
文档更新时间: 2021-09-01 08:00   作者:admin