RG3.PF.Quartz.StartUsed
SchedulerConfig
/************************************************************************************************************
* file : SchedulerConfig.cs
* author : cbg
* function:
* history : created by CBG 03/06/2021
************************************************************************************************************/
using Newtonsoft.Json.Linq;
using RG3.PF.Abstractions.Entity;
using RG3.PF.Abstractions.Interfaces;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace RG3.PF.Quartz.StartUsed.Entity
{
/// <summary>
/// 作业主配置文件
/// </summary>
public class SchedulerConfig : IEntity
{
/// <summary>
/// 差值秒,如果不为空使用当前时间延迟多少后开启服务
/// </summary>
public double? Seconds { get; set; }
/// <summary>
/// 在多少分钟后开启服务
/// </summary>
public long StartMinutes { get; set; }
/// <summary>
/// 结束钟数
/// </summary>
public long EndMinutes { get; set; }
/// <summary>
/// 当前时间是否使用utc
/// </summary>
public long UtcSearchHour { get; set; }
/// <summary>
///data 当前时间是否使用utc
/// </summary>
public long UtcDataHour { get; set; }
/// <summary>
/// url传递的参数
/// </summary>
public string PostData { get; set; }
/// <summary>
/// 服务标题
/// </summary>
public string Title { get; set; }
/// <summary>
/// 服务过程提示文本
/// </summary>
public SchedulerMsg Msg { get; set; }
/// <summary>
/// 数据库连接名
/// </summary>
public string ConnName { get; set; }
/// <summary>
/// 启用
/// </summary>
public bool Enabled { get; set; }
/// <summary>
/// 数据缓存分钟数
/// </summary>
public int CahceMinutes { get; set; }
/// <summary>
/// 定时器执行频率
/// </summary>
public SchedulerJob Job { get; set; }
/// <summary>
/// 批量入库配置信息
/// </summary>
public SchedulerBulk Bulk { get; set; }
/// <summary>
/// 查询
/// </summary>
public IEnumerable<TreeData> SelectSql { get; set; }
/// <summary>
/// 删除 vaue/sortId
/// </summary>
public IEnumerable<TreeData> DeleteSql { get; set; }
/// <summary>
/// 保存 vaue/sortId
/// </summary>
public IEnumerable<TreeData> SaveSql { get; set; }
/// <summary>
/// 来源参数文件 目录
/// </summary>
public string FromParamsDir { get; set; }
/// <summary>
/// 来源参数包含文件名 .json
/// </summary>
public string FromParamsSuffix { get; set; }
/// <summary>
/// 所有的参数 用于拼接替换sql语句
/// </summary>
public string Columns { get; set; }
/// <summary>
/// 查询 组合后的语句
/// </summary>
public string SelectSqlV
{
get
{
if (SelectSql == null || !SelectSql.Any()) return null;
IEnumerable<TreeData> tdSaveSql = SelectSql;
StringBuilder sbSaveSql = new StringBuilder();
foreach (var item in tdSaveSql.OrderBy(temp => temp.SortId))
{
sbSaveSql.Append(item.Value);
}
return sbSaveSql.ToString();
}
}
/// <summary>
/// 保存 组合后的语句
/// </summary>
public string SaveSqlV
{
get
{
if (SaveSql == null || !SaveSql.Any()) return null;
IEnumerable<TreeData> tdSaveSql = SaveSql;
StringBuilder sbSaveSql = new StringBuilder();
foreach (var item in tdSaveSql.OrderBy(temp => temp.SortId))
{
sbSaveSql.Append(item.Value);
}
return sbSaveSql.ToString();
}
}
/// <summary>
/// 删除 组合后的语句
/// </summary>
public string DeleteSqlV
{
get
{
if (DeleteSql == null || !DeleteSql.Any()) return null;
IEnumerable<TreeData> tdSaveSql = DeleteSql.Where(item => item.Enabled == true && item.Value != null && !string.IsNullOrEmpty(item.Value.ToString()));
StringBuilder sbSaveSql = new StringBuilder();
foreach (var item in tdSaveSql.OrderBy(temp => temp.SortId))
{
sbSaveSql.Append(item.Value);
}
return sbSaveSql.ToString();
}
}
/// <summary>
/// 删除 组合后的语句
/// </summary>
public IEnumerable<string> ColumnsV
{
get
{
if (string.IsNullOrEmpty(Columns)) return new List<string>();
return Columns.Split(',', System.StringSplitOptions.RemoveEmptyEntries);
}
}
/// <summary>
/// 推送管理配置
/// </summary>
public IEnumerable<SubModule> SubModule { get; set; }
/// <summary>
/// 响应结果存储的虚拟路径
/// </summary>
public string ResponseData { get; set; }
}
}
quartz-job-monitor-his-space.json json映射文件
{
"scheduler": {
"quartz-job-monitor-his-space": {
"title": "监控采集(his)(空间)",
//服务过程提示信息
"msg": {
"start": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss}服务启动中",
"error": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss}服务启动失败",
"success": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss}服务启动成功",
"process": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss},操作时间{1}s,下次执行时间{2:yyyy/MM/dd HH:mm:ss},入库{3}条",
"processError": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss},异常说明:{2}"
},
//数据库连接名
"connName": "ConnIdc",
"enabled": true,
"cahceMinutes": 1440,
//同步的开始当前时间向后推的分钟数
"startMinutes": 3,
//同步的开始当前时间未来的分钟数
"endMinutes": 0,
//查询条件差值
"utcSearchHour": 0,
//数据条件差值
"utcDataHour": 0,
//任务调度执行信息
"job": {
"name": "quartz-job-monitor-his-space",
"group": "quartz-group-idc",
//"withIntervalInSeconds": 310,
"withIntervalInSeconds": 310
},
"bulk": {
//跨服务器
"local": false,
//生成批量操作的sql语句 multitasks-sql 使用
"sqlPath": "~/App_Temp/db-async-sql/{fileName}.sql",
// if (doMethod.Contains("bulk")) await DoAsyncDb(pfGlobal, schedulerConfig, dicData);
// else if (doMethod.Contains("db")) _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV, dicData);
// if (doMethod.Contains("sql")) await DoAsyncSql(pfGlobal, schedulerConfig, dicData);
// if (doMethod.Contains("json"))
// await DoAsyncJson(pfGlobal, schedulerConfig, dicData);
// "db": "bulk",
"doMethod": "db",
//批量入库,临时交换文件路径
"tempPathCsv": "~/App_Temp/db-csv/{fileName}.csv",
//不需要索引,作为临时表
"tableName": "idc_resource_data_temp"
//批量插入完成后是否删除临时文件~/App_Temp/.csv
},
//所有的参数 用于拼接替换sql语句
"columns": "@rl_id, @status, @server_time, @resource_id, @real_value, @event_count,@alias, @save_time",
"selectSql": [
{
"value": "select resource_id from idc_resource_info",
"sortId": 1
},
{
"value": " union ",
"sortId": 5
},
{
"value": "select monitor_id from idc_resource_monitor",
"sortId": 10
},
{
"value": " union ",
"sortId": 15
},
{
"value": "select space_id from idc_resource_space",
"sortId": 20
}
],
"saveSql": [
//{
// "value": " if not exists (select 1 from t where server_time=@server_time and resource_id=@resource_id ) ",
// "sortId": 5
//},
{
"value": "insert into idc_resource_data_his(",
"sortId": 10
},
{
"value": "rl_id, status, server_time, resource_id, real_value, event_count, alias, save_time,sys_id,unit",
"sortId": 15
},
{
"value": ") values(",
"sortId": 20
},
{
"value": "@rl_id, @status, @server_time, @resource_id, @real_value, @event_count,@alias, @save_time,'gj',unit",
"sortId": 25
},
{
"value": ") ",
"sortId": 30
},
//{
// "value": " else ",
// "sortId": 35
//},
//{
// "value": "update idc_resource_data_{table} set ",
// "sortId": 40
//},
{
"value": " ON DUPLICATE KEY UPDATE ",
"sortId": 38
},
{
"value": " status=@status,real_value=@real_value,event_count=@event_count,alias=@alias,server_time=@server_time,unit=@unit;",
"sortId": 45
}
//,
//{
// "value": "where server_time=@server_time and resource_id=@resource_id ",
// "sortId": 50
//}
],
"deleteSql": [
],
//传递的参数
"postData": "~/Config/Scheduler/idc/quartz-job-monitor-space_body.json",
//响应结果数据
"responseData": "~/App_Data/idc/Scheduler/ResponseData/quartz-job-monitor-lzy-space.json",
"fromParamsDir":"",
"fromParamsSuffix":""
}
}
}
doMethod 源码参考
/// <summary>
/// 要同步到数据
/// </summary>
/// <param name="pfGlobal"></param>
/// <param name="schedulerConfig"></param>
/// <param name="dicData"></param>
/// <param name="table">his/last</param>
/// <returns></returns>
public async Task<int> DoAsync(PFGlobalParameter pfGlobal, SchedulerConfig schedulerConfig, IEnumerable<Dictionary<string, object>> dicData, string table)
{
var doMethod = schedulerConfig.Bulk.DoMethod;
if (doMethod.Contains("db") && !string.IsNullOrEmpty(schedulerConfig.FromParamsDir) && !string.IsNullOrEmpty(schedulerConfig.FromParamsSuffix))
{
//count == 0 && int count = Convert.ToInt32(_db.QueryDataTable(pfGlobal, schedulerConfig.ConnName, "select count(*) trx_started from information_schema.INNODB_TRX where trx_started is not null", new { }).Rows[0][0]);
if (ValidateUtil.ValidateExistsValue(schedulerConfig.DeleteSqlV))
{
try
{
//执行对应历史数据删除
Dictionary<string, object> dicParams = new Dictionary<string, object>();
RequestPFUtil.ConvertDicBySystemParamDate(dicParams);
int resultT = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.DeleteSqlV.Replace("{table}", table), dicParams);
Console.WriteLine($"【删除】{schedulerConfig.Title},{resultT}条");
}
catch (Exception ex)
{
Console.WriteLine($"【删除异常】{schedulerConfig.Title},{ex.Message}条");
}
}
try
{
DirectoryInfo dir = new DirectoryInfo(PhyPathUtil.ConvertVirToPhyPath(schedulerConfig.FromParamsDir));
FileInfo[] finfo = dir.GetFiles(schedulerConfig.FromParamsSuffix);
int resultC = 0;
foreach (var item in finfo)
{
string valueT = FileUtil.ReadFileAndContentByVirUrl(item.FullName);
if (!string.IsNullOrEmpty(valueT)&& valueT.Contains("\\"data\\""))
{
var jobject = JsonUtil.Deserialize<JObject>(valueT);
List<Dictionary<string, object>> dicT = new List<Dictionary<string, object>>();
JTokenUtil.ConvertJArrayToDictionary(jobject.SelectToken("data") as JArray, dicT, new Dictionary<string, object>());
resultC = resultC + _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicT);
}
}
return await Task.FromResult(resultC);
}
catch (Exception ex)
{
Console.WriteLine($"【插入异常】{schedulerConfig.Title},{ex.Message}条");
return await Task.FromResult(0);
}
//return await Task.FromResult(0);
}
else if (doMethod.Contains("bulk"))
{
return await DoAsyncDb(pfGlobal, schedulerConfig, dicData);
}
else if (doMethod.Contains("db"))
{
//count == 0 && int count = Convert.ToInt32(_db.QueryDataTable(pfGlobal, schedulerConfig.ConnName, "select count(*) trx_started from information_schema.INNODB_TRX where trx_started is not null", new { }).Rows[0][0]);
if (ValidateUtil.ValidateExistsValue(schedulerConfig.DeleteSqlV))
{
try
{
//执行对应历史数据删除
Dictionary<string, object> dicParams = new Dictionary<string, object>();
RequestPFUtil.ConvertDicBySystemParamDate(dicParams);
int resultT = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.DeleteSqlV.Replace("{table}", table), dicParams);
Console.WriteLine($"【删除】{schedulerConfig.Title},{resultT}条");
}
catch (Exception ex)
{
Console.WriteLine($"【删除异常】{schedulerConfig.Title},{ex.Message}条");
}
}
try
{
return await Task.FromResult(_db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicData));
}
catch (Exception ex)
{
Console.WriteLine($"【插入异常】{schedulerConfig.Title},{ex.Message}条");
return await Task.FromResult(0);
}
//return await Task.FromResult(0);
}
else if (doMethod.Contains("sql"))
{
return await DoAsyncSql(pfGlobal, schedulerConfig, dicData);
}
if (doMethod.Contains("json"))
{
return await DoAsyncJson(pfGlobal, schedulerConfig, dicData);
}
return await Task.FromResult(0);
}
#region 直接入库
private async Task<int> DoAsyncDb(PFGlobalParameter pfGlobal, SchedulerConfig schedulerConfig, IEnumerable<Dictionary<string, object>> dicData)
{
DataTable dt = new DataTable();
dt.TableName = schedulerConfig.Bulk.TableName;
foreach (var column in schedulerConfig.ColumnsV)
{
dt.Columns.Add(column);
}
foreach (var item in dicData)
{
DataRow dr = dt.NewRow();
foreach (var column in schedulerConfig.ColumnsV)
{
dr[column] = item[column];
}
dt.Rows.Add(dr);
}
DateTime dateS = DateTime.Now;
//,BulkLocal=true
int resultT = _db.BatchInsert(pfGlobal, schedulerConfig.ConnName, dt);
DateTime dateE = DateTime.Now;
Console.WriteLine(string.Format(schedulerConfig.Msg.Process, DateTime.Now, (dateE - dateS).TotalSeconds, resultT));
return await Task.FromResult(resultT);
}
#endregion 生成sql语句
文档更新时间: 2021-08-22 08:00 作者:admin