RG3.PF.Quartz.StartUsed

SchedulerConfig

/************************************************************************************************************
* file    :     SchedulerConfig.cs
* author  : cbg
* function: 
* history : created by CBG 03/06/2021
************************************************************************************************************/
using Newtonsoft.Json.Linq;
using RG3.PF.Abstractions.Entity;
using RG3.PF.Abstractions.Interfaces;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RG3.PF.Quartz.StartUsed.Entity
{
    /// <summary>
    /// 作业主配置文件
    /// </summary>
    public class SchedulerConfig : IEntity
    {
        /// <summary>
        /// 差值秒,如果不为空使用当前时间延迟多少后开启服务
        /// </summary>
        public double? Seconds { get; set; }

        /// <summary>
        /// 在多少分钟后开启服务
        /// </summary>
        public long StartMinutes { get; set; }

        /// <summary>
        /// 结束钟数
        /// </summary>
        public long EndMinutes { get; set; }

        /// <summary>
        /// 当前时间是否使用utc
        /// </summary>
        public long UtcSearchHour { get; set; }

        /// <summary>
        ///data 当前时间是否使用utc
        /// </summary>
        public long UtcDataHour { get; set; }

        /// <summary>
        /// url传递的参数
        /// </summary>
        public string PostData { get; set; }

        /// <summary>
        /// 服务标题
        /// </summary>
        public string Title { get; set; }

        /// <summary>
        /// 服务过程提示文本
        /// </summary>
        public SchedulerMsg Msg { get; set; }

        /// <summary>
        /// 数据库连接名
        /// </summary>
        public string ConnName { get; set; }

        /// <summary>
        /// 启用
        /// </summary>
        public bool Enabled { get; set; }

        /// <summary>
        /// 数据缓存分钟数
        /// </summary>
        public int CahceMinutes { get; set; }

        /// <summary>
        /// 定时器执行频率
        /// </summary>
        public SchedulerJob Job { get; set; }

        /// <summary>
        /// 批量入库配置信息
        /// </summary>
        public SchedulerBulk Bulk { get; set; }

        /// <summary>
        /// 查询
        /// </summary>
        public IEnumerable<TreeData> SelectSql { get; set; }

        /// <summary>
        /// 删除 vaue/sortId
        /// </summary>
        public IEnumerable<TreeData> DeleteSql { get; set; }

        /// <summary>
        /// 保存 vaue/sortId
        /// </summary>
        public IEnumerable<TreeData> SaveSql { get; set; }

        /// <summary>
        /// 来源参数文件 目录
        /// </summary>
        public string FromParamsDir { get; set; }


        /// <summary>
        /// 来源参数包含文件名   .json
        /// </summary>
        public string FromParamsSuffix { get; set; }

        /// <summary>
        /// 所有的参数 用于拼接替换sql语句
        /// </summary>
        public string Columns { get; set; }


        /// <summary>
        /// 查询  组合后的语句
        /// </summary>
        public string SelectSqlV
        {
            get
            {
                if (SelectSql == null || !SelectSql.Any()) return null;
                IEnumerable<TreeData> tdSaveSql = SelectSql;
                StringBuilder sbSaveSql = new StringBuilder();
                foreach (var item in tdSaveSql.OrderBy(temp => temp.SortId))
                {
                    sbSaveSql.Append(item.Value);
                }
                return sbSaveSql.ToString();
            }
        }

        /// <summary>
        /// 保存  组合后的语句
        /// </summary>
        public string SaveSqlV
        {
            get
            {
                if (SaveSql == null || !SaveSql.Any()) return null;
                IEnumerable<TreeData> tdSaveSql = SaveSql;
                StringBuilder sbSaveSql = new StringBuilder();
                foreach (var item in tdSaveSql.OrderBy(temp => temp.SortId))
                {
                    sbSaveSql.Append(item.Value);
                }
                return sbSaveSql.ToString();
            }
        }


        /// <summary>
        /// 删除  组合后的语句
        /// </summary>
        public string DeleteSqlV
        {
            get
            {
                if (DeleteSql == null || !DeleteSql.Any()) return null;
                IEnumerable<TreeData> tdSaveSql = DeleteSql.Where(item => item.Enabled == true && item.Value != null && !string.IsNullOrEmpty(item.Value.ToString()));
                StringBuilder sbSaveSql = new StringBuilder();
                foreach (var item in tdSaveSql.OrderBy(temp => temp.SortId))
                {
                    sbSaveSql.Append(item.Value);
                }
                return sbSaveSql.ToString();
            }
        }

        /// <summary>
        /// 删除  组合后的语句
        /// </summary>
        public IEnumerable<string> ColumnsV
        {
            get
            {
                if (string.IsNullOrEmpty(Columns)) return new List<string>();
                return Columns.Split(',', System.StringSplitOptions.RemoveEmptyEntries);
            }
        }


        /// <summary>
        /// 推送管理配置
        /// </summary>

        public IEnumerable<SubModule> SubModule { get; set; }

        /// <summary>
        /// 响应结果存储的虚拟路径
        /// </summary>
        public string ResponseData { get; set; }



    }
}

quartz-job-monitor-his-space.json json映射文件


{
  "scheduler": {
    "quartz-job-monitor-his-space": {
      "title": "监控采集(his)(空间)",
      //服务过程提示信息
      "msg": {
        "start": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss}服务启动中",
        "error": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss}服务启动失败",
        "success": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss}服务启动成功",
        "process": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss},操作时间{1}s,下次执行时间{2:yyyy/MM/dd HH:mm:ss},入库{3}条",
        "processError": "【监控采集(his)(空间)】{0:yyyy/MM/dd HH:mm:ss},异常说明:{2}"
      },
      //数据库连接名
      "connName": "ConnIdc",
      "enabled": true,
      "cahceMinutes": 1440,
      //同步的开始当前时间向后推的分钟数
      "startMinutes": 3,
      //同步的开始当前时间未来的分钟数
      "endMinutes": 0,
      //查询条件差值
      "utcSearchHour": 0,
      //数据条件差值
      "utcDataHour": 0,
      //任务调度执行信息
      "job": {
        "name": "quartz-job-monitor-his-space",
        "group": "quartz-group-idc",
        //"withIntervalInSeconds": 310,
        "withIntervalInSeconds": 310
      },
      "bulk": {
        //跨服务器
        "local": false,
        //生成批量操作的sql语句 multitasks-sql 使用
        "sqlPath": "~/App_Temp/db-async-sql/{fileName}.sql",
        //   if (doMethod.Contains("bulk")) await DoAsyncDb(pfGlobal, schedulerConfig, dicData);
        //   else if (doMethod.Contains("db"))  _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV, dicData);
        //   if (doMethod.Contains("sql"))  await DoAsyncSql(pfGlobal, schedulerConfig, dicData);
        //   if (doMethod.Contains("json"))
        //   await DoAsyncJson(pfGlobal, schedulerConfig, dicData);
        //  "db": "bulk",
        "doMethod": "db",
        //批量入库,临时交换文件路径
        "tempPathCsv": "~/App_Temp/db-csv/{fileName}.csv",
        //不需要索引,作为临时表
        "tableName": "idc_resource_data_temp"
        //批量插入完成后是否删除临时文件~/App_Temp/.csv
      },
      //所有的参数 用于拼接替换sql语句
      "columns": "@rl_id, @status, @server_time, @resource_id, @real_value, @event_count,@alias, @save_time",
      "selectSql": [
        {
          "value": "select resource_id from idc_resource_info",
          "sortId": 1
        },
        {
          "value": " union ",
          "sortId": 5
        },
        {
          "value": "select monitor_id from idc_resource_monitor",
          "sortId": 10
        },
        {
          "value": "  union ",
          "sortId": 15
        },
        {
          "value": "select space_id from idc_resource_space",
          "sortId": 20
        }
      ],
      "saveSql": [
        //{
        //  "value": " if not exists (select 1 from t where server_time=@server_time and resource_id=@resource_id )  ",
        //  "sortId": 5
        //},
        {
          "value": "insert into idc_resource_data_his(",
          "sortId": 10
        },
        {
          "value": "rl_id, status, server_time, resource_id, real_value, event_count, alias, save_time,sys_id,unit",
          "sortId": 15
        },
        {
          "value": ") values(",
          "sortId": 20
        },
        {
          "value": "@rl_id, @status, @server_time, @resource_id, @real_value, @event_count,@alias, @save_time,'gj',unit",
          "sortId": 25
        },
        {
          "value": ") ",
          "sortId": 30
        },
        //{
        //  "value": "   else   ",
        //  "sortId": 35
        //},

        //{
        //  "value": "update idc_resource_data_{table} set ",
        //  "sortId": 40
        //},
        {
          "value": " ON DUPLICATE KEY UPDATE  ",
          "sortId": 38
        },
        {
          "value": " status=@status,real_value=@real_value,event_count=@event_count,alias=@alias,server_time=@server_time,unit=@unit;",
          "sortId": 45
        }
        //,
        //{
        //  "value": "where server_time=@server_time and resource_id=@resource_id ",
        //  "sortId": 50
        //}
      ],
      "deleteSql": [
      ],
      //传递的参数
      "postData": "~/Config/Scheduler/idc/quartz-job-monitor-space_body.json",
      //响应结果数据
      "responseData": "~/App_Data/idc/Scheduler/ResponseData/quartz-job-monitor-lzy-space.json",
      "fromParamsDir":"",
      "fromParamsSuffix":""
    }
  }
}

doMethod 源码参考

  /// <summary>
        /// 要同步到数据
        /// </summary>
        /// <param name="pfGlobal"></param>
        /// <param name="schedulerConfig"></param>
        /// <param name="dicData"></param>
        /// <param name="table">his/last</param>
        /// <returns></returns>
        public async Task<int> DoAsync(PFGlobalParameter pfGlobal, SchedulerConfig schedulerConfig, IEnumerable<Dictionary<string, object>> dicData, string table)
        {
            var doMethod = schedulerConfig.Bulk.DoMethod;

            if (doMethod.Contains("db") && !string.IsNullOrEmpty(schedulerConfig.FromParamsDir) && !string.IsNullOrEmpty(schedulerConfig.FromParamsSuffix))
            {
                //count == 0 &&  int count = Convert.ToInt32(_db.QueryDataTable(pfGlobal, schedulerConfig.ConnName, "select count(*) trx_started from information_schema.INNODB_TRX where trx_started is not null", new { }).Rows[0][0]);
                if (ValidateUtil.ValidateExistsValue(schedulerConfig.DeleteSqlV))
                {
                    try
                    {
                        //执行对应历史数据删除
                        Dictionary<string, object> dicParams = new Dictionary<string, object>();
                        RequestPFUtil.ConvertDicBySystemParamDate(dicParams);
                        int resultT = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.DeleteSqlV.Replace("{table}", table), dicParams);

                        Console.WriteLine($"【删除】{schedulerConfig.Title}{resultT}条");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"【删除异常】{schedulerConfig.Title}{ex.Message}条");
                    }
                }
                try
                {
                    DirectoryInfo dir = new DirectoryInfo(PhyPathUtil.ConvertVirToPhyPath(schedulerConfig.FromParamsDir));
                    FileInfo[] finfo = dir.GetFiles(schedulerConfig.FromParamsSuffix);
                    int resultC = 0;
                    foreach (var item in finfo)
                    {
                        string valueT = FileUtil.ReadFileAndContentByVirUrl(item.FullName);
                        if (!string.IsNullOrEmpty(valueT)&& valueT.Contains("\\"data\\""))
                        {
                            var jobject = JsonUtil.Deserialize<JObject>(valueT);
                            List<Dictionary<string, object>> dicT = new List<Dictionary<string, object>>();
                            JTokenUtil.ConvertJArrayToDictionary(jobject.SelectToken("data") as JArray, dicT, new Dictionary<string, object>());
                            resultC = resultC + _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicT);

                        }
                    }

                    return await Task.FromResult(resultC);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"【插入异常】{schedulerConfig.Title}{ex.Message}条");
                    return await Task.FromResult(0);
                }
                //return await Task.FromResult(0);
            }
            else if (doMethod.Contains("bulk"))
            {
                return await DoAsyncDb(pfGlobal, schedulerConfig, dicData);
            }
            else if (doMethod.Contains("db"))
            {
                //count == 0 &&  int count = Convert.ToInt32(_db.QueryDataTable(pfGlobal, schedulerConfig.ConnName, "select count(*) trx_started from information_schema.INNODB_TRX where trx_started is not null", new { }).Rows[0][0]);
                if (ValidateUtil.ValidateExistsValue(schedulerConfig.DeleteSqlV))
                {
                    try
                    {
                        //执行对应历史数据删除
                        Dictionary<string, object> dicParams = new Dictionary<string, object>();
                        RequestPFUtil.ConvertDicBySystemParamDate(dicParams);
                        int resultT = _db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.DeleteSqlV.Replace("{table}", table), dicParams);

                        Console.WriteLine($"【删除】{schedulerConfig.Title}{resultT}条");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"【删除异常】{schedulerConfig.Title}{ex.Message}条");
                    }
                }
                try
                {
                    return await Task.FromResult(_db.Execute(pfGlobal, schedulerConfig.ConnName, schedulerConfig.SaveSqlV.Replace("{table}", table), dicData));
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"【插入异常】{schedulerConfig.Title}{ex.Message}条");
                    return await Task.FromResult(0);
                }
                //return await Task.FromResult(0);
            }
            else if (doMethod.Contains("sql"))
            {
                return await DoAsyncSql(pfGlobal, schedulerConfig, dicData);
            }
            if (doMethod.Contains("json"))
            {
                return await DoAsyncJson(pfGlobal, schedulerConfig, dicData);
            }
            return await Task.FromResult(0);
        }


        #region 直接入库
        private async Task<int> DoAsyncDb(PFGlobalParameter pfGlobal, SchedulerConfig schedulerConfig, IEnumerable<Dictionary<string, object>> dicData)
        {

            DataTable dt = new DataTable();
            dt.TableName = schedulerConfig.Bulk.TableName;
            foreach (var column in schedulerConfig.ColumnsV)
            {
                dt.Columns.Add(column);
            }
            foreach (var item in dicData)
            {
                DataRow dr = dt.NewRow();
                foreach (var column in schedulerConfig.ColumnsV)
                {
                    dr[column] = item[column];
                }
                dt.Rows.Add(dr);
            }

            DateTime dateS = DateTime.Now;
            //,BulkLocal=true 
            int resultT = _db.BatchInsert(pfGlobal, schedulerConfig.ConnName, dt);
            DateTime dateE = DateTime.Now;
            Console.WriteLine(string.Format(schedulerConfig.Msg.Process, DateTime.Now, (dateE - dateS).TotalSeconds, resultT));


            return await Task.FromResult(resultT);
        }
        #endregion 生成sql语句

文档更新时间: 2021-08-22 08:00   作者:admin