同步数据方式
/// <summary>
/// 要同步到数据
/// </summary>
/// <param name="pfGlobal"></param>
/// <param name="schedulerConfig"></param>
/// <param name="dicData"></param>
/// <param name="table">his/last</param>
/// <returns></returns>
public async Task<int> DoAsync(PFGlobalParameter pfGlobal, SchedulerConfig schedulerConfig, IEnumerable<Dictionary<string, object>> dicData, string table)
{
var doMethod = schedulerConfig.Bulk.DoMethod;
if (doMethod.Contains("result") && (!string.IsNullOrEmpty(schedulerConfig.ResponseDataLs)))
{
Console.WriteLine($"【交互结果】【{DateTime.Now}】{schedulerConfig.ResponseDataLs}");
FileUtil.CreateFileAndContentAsync(schedulerConfig.ResponseDataLs, JsonUtil.Serialize(new { success = true, data = dicData }, false, false, false));
}
if (doMethod.Contains("db-del"))
{
//count == 0 && int count = Convert.ToInt32(_db.QueryDataTable(pfGlobal, schedulerConfig.ConnName, "select count(*) trx_started from information_schema.INNODB_TRX where trx_started is not null", new { }).Rows[0][0]);
if (!ValidateUtil.ValidateExistsValue(schedulerConfig.DeleteSqlV))
{
Console.WriteLine($"【删除】【{DateTime.Now}】{schedulerConfig.Title},DeleteSql未配置");
return await Task.FromResult(0);
}
int resultT = 0;
try
{
//执行对应历史数据删除
Dictionary<string, object> dicParams = new Dictionary<string, object>();
RequestPFUtil.ConvertDicBySystemParamDate(dicParams);
resultT = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.DeleteSqlV.Replace("{table}", table), dicParams);
Console.WriteLine($"【删除】【{DateTime.Now}】{schedulerConfig.Title},{resultT}条");
}
catch (Exception ex)
{
Console.WriteLine($"【删除异常】【{DateTime.Now}】{schedulerConfig.Title},{ex.Message}条");
}
return await Task.FromResult(resultT);
}
else if (doMethod.Contains("db") && !string.IsNullOrEmpty(schedulerConfig.FromParamsDir) && !string.IsNullOrEmpty(schedulerConfig.FromParamsSuffix))
{
try
{
DirectoryInfo dir = new DirectoryInfo(PhyPathUtil.ConvertVirToPhyPath(schedulerConfig.FromParamsDir));
FileInfo[] finfo = dir.GetFiles(schedulerConfig.FromParamsSuffix);
int resultC = 0;
foreach (var item in finfo)
{
string valueT = FileUtil.ReadFileAndContentByVirUrl(item.FullName);
if (!string.IsNullOrEmpty(valueT) && valueT.Contains("\\"data\\""))
{
var jobject = JsonUtil.Deserialize<JObject>(valueT);
List<Dictionary<string, object>> dicT = new List<Dictionary<string, object>>();
JTokenUtil.ConvertJArrayToDictionary(jobject.SelectToken("data") as JArray, dicT, new Dictionary<string, object>());
resultC = resultC + _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicT);
}
}
return await Task.FromResult(resultC);
}
catch (Exception ex)
{
Console.WriteLine($"【插入异常】{schedulerConfig.Title},{ex.Message}条");
return await Task.FromResult(0);
}
//return await Task.FromResult(0);
}
else if (doMethod.Contains("bulk"))
{
return await DoAsyncDb(pfGlobal, schedulerConfig, dicData);
}
else if (doMethod.Contains("db"))
{
try
{
int result = 0;
if (schedulerConfig.SaveSqlV != null)
{
result = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicData);
}
return await Task.FromResult(result);
}
catch (Exception ex)
{
Console.WriteLine($"【插入异常】{schedulerConfig.Title},{ex.Message}条");
return await Task.FromResult(0);
}
//return await Task.FromResult(0);
}
else if (doMethod.Contains("sql"))
{
return await DoAsyncSql(pfGlobal, schedulerConfig, dicData);
}
if (doMethod.Contains("json"))
{
return await DoAsyncJson(pfGlobal, schedulerConfig, dicData);
}
return await Task.FromResult(0);
}
文档更新时间: 2021-09-01 08:00 作者:admin