简要描述:

  • 详见:

https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/websockets?view=aspnetcore-5.0

https://docs.microsoft.com/zh-cn/aspnet/signalr/overview/guide-to-the-api/hubs-api-guide-server

https://github.com/mqttjs/MQTT.js

  • 通知接口采用websoket技术,用于大屏展示、缓存问题、第三方集成等。
版本号 制定人 制定日期 修订日期
v3 陈碧贵 2019-01-20 xxxx-xx-xx

请求URL:

请求方式:

  • GET

请求头:

参数名 是否必须 类型 说明
XownerId string 项目唯一ID,对应bo_project {ownerId}
XsysId string 所属系统 对应 bo_system 表
XuserFromFirstShareId string 一级分享用户ID, bo_user user_id
XuserFromSecondShareId string 二级分享用户ID, bo_user user_id
XverifyApi string 加密规则encryptByDES(`${newGuid()}
XfilterAreaCode string 行政区编码, 对应 bo_sys_area area_code
Content-Type: string application/json; charset=utf-8 请求类型
Authorization string 当前用户认证信息,通过登录接口获取 Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6

请求参数:



参数名 是否必须 类型 说明
_nodejs string common-data 对应/Config/AppNodeJs/Plugins/fn- common-data.js, 通过nodejs插件,对响应结果进行二次处理 common-data 普通数据(小写驼峰)common-data-under 普通数据下划线

返回示例:

正确时返回:

{    
    options: {},
    result: "{"lastDate":"2021-12-09T08:02:31.3064024+08:00","ops":{"queues":"bt|mall-business|manager"}}"
    }

错误时返回:

{
    "code": 0,
    "message": ""
    "error":{
      errorCode:null,
      errorText:null
    }
    "success": false,
}

返回参数说明:

参数名 类型 说明
data object result:ops:queues
cacheType string redis/sqlite/local 数据来源缓存 的类型(便于开发调试和性能优化)
cacheDate date 最后数据获取时间

初始化redis 变更推送到websoket

  private readonly ISubPush _isubPush;

 var pf = new PFGlobalParameter { OwnerId = RequestPFUtil.GetOwnerId(httpContext), SysId = RequestPFUtil.GetSysId(httpContext) ,ModuleKey=mkey};
 _isubPush.SubInit<SubResult>(new SubResult { }, pf);

ws前端发送


 // 发送到服务器
    that.connection.invoke('send', options.ownerId, options.mkey, value);

ws后端发送

/**
         * 服务端发送消息  模块 数据变更进行通知到前端 
         * 
         * owner=ownerId
         * mkey=mkey
         var result = new
        {
            lastDate = DateTime.Now,
            userId = pf.UserId,
            ownerId = pf.OwnerId,
            sysId = pf.SysId,
            moduleKey = pf.ModuleKey,
            reponseField = pf.ReponseField
        };
         */
        public async Task Send(string owner, string mkey, string json)
        {
            // Call the broadcastMessage method to update clients.
            await Clients.All.SendAsync($"owner-{owner}-mkey-{mkey}", json);
        }

redis后端发送

/**

        SubResult result = new SubResult
                            {
                                LastDate = DateTime.Now,
                                UserId = pf.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
                                OwnerId = pf.OwnerId,
                                SysId = pf.SysId,
                                ModuleKey = pf.ModuleKey,
                                ReponseField = pf.ReponseField
                            };
      _subscribePublishRedis.Publish<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", result);

Job中推送使用

public async Task Execute(IJobExecutionContext context)
        {
            var _sc = (SchedulerConfig)context.JobDetail.JobDataMap["_sc"];
            if (_sc.Enabled != true)
            {
                await Console.Out.WriteLineAsync(string.Format(_sc.Msg.ProcessError, DateTime.Now, 0, "服务未开启"));
                return;
            }
            var _configuration = (IConfiguration)context.JobDetail.JobDataMap["_configuration"];
            var _serviceProvider = (ServiceProvider)context.JobDetail.JobDataMap["_serviceProvider"];
            try
            {
                var _sub = _serviceProvider.GetService<ISubscribePublishRedis>();

                foreach (var item in _sc.SubModule)
                {
                  if (string.IsNullOrEmpty(item.Channel) || string.IsNullOrEmpty(item.OwnerId) || string.IsNullOrEmpty(item.Mkey)) continue;
                    _sub.Publish<SubResult>(item.Params, item.Channel, new SubResult
                    {
                        LastDate = DateTime.Now,
                        UserId = item.Params.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
                        OwnerId = item.Params.OwnerId,
                        SysId = item.Params.SysId,
                        ModuleKey = item.Params.ModuleKey,
                        ReponseField = item.Params.ReponseField
                    });
                }

            }
            catch (Exception e)
            {
                await FileUtil.CreateFileAndContentAsync($"~/App_Logs/quartz/{DateTime.Now.ToFileTime()}.json", e.Message);
                Console.WriteLine($"【{_sc.Title}】[{e.Message}]" + JsonUtil.Serialize(e.StackTrace));

            }
        }

后端发送案例


    var result = new
    {
        lastDate = DateTime.Now,
        userId = pf.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
        ownerId = pf.OwnerId,
        sysId = pf.SysId,
        moduleKey = pf.ModuleKey,
        reponseField = pf.ReponseField
    };
    if (_configuration.GetValue<bool>("websoket:open"))
    {
        _hubContext.Clients.All.SendAsync($"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", JsonUtil.Serialize(result));
    }

后端注入的服务


 /// <summary>
    /// 订阅实现
    /// </summary>
    public class SubPushProvider : ISubPush
    {
        private readonly DesService _desService;
        private readonly IConfiguration _configuration;
        private readonly IHubContext<MkeyHub> _hubContext;
        private readonly ILogger<SubPushProvider> _logger;
        //private readonly IAuthProvider _authProvider;

        public SubPushProvider(ILogger<SubPushProvider> logger, IConfiguration configuration, DesService desService, IHubContext<MkeyHub> hubContext)
        {
            _configuration = configuration;
            _desService = desService;
            _hubContext = hubContext;
            _logger = logger;
            // IAuthProvider authProvider,
            //_authProvider = authProvider;
        }
    }

通知订阅参考代码 :

pull: function (callback, options = {
    ownerId: "rg",
    mkey: "bo-enum-sign"
  }) {
    // debugger;
    let that = this;
    if (!that.connection || that.connection.state === 0) {
      that.build();
    }
    // 服务器推送过来的信息
    // Create a function that the hub can call to broadcast messages.
    that.connection.on(`owner-${options.ownerId}-mkey-${options.mkey}`, function (json) {
      callback({
        options: options,
        result: json
      });
      console.log("websoket-pull-success");
    });
  }

通知订阅和发送使用

methods: {
    // handleClick(tab, event) {
    //   console.log(tab, event);
    // },
    onWsPull() {
      let that = this;
      that.$reqWs.pull(
        (res) => {
          // console.log(res);
          that.wsdata.push(JSON.parse(res.result));
        },
        {
          ownerId: "rg",
          mkey: "bo-enum-sign",
        }
      );
    },
    onWsSend() {
      let that = this;
      that.$reqWs.send(
        JSON.stringify({ name: "测试" + that.$verifyGuid.guid() }),
        {
          ownerId: "rg",
          mkey: "bo-enum-sign",
        }
      );
},

通知订阅和发送使用

methods: {
    // handleClick(tab, event) {
    //   console.log(tab, event);
    // },
    onWsPull() {
      let that = this;
      that.$reqWs.pull(
        (res) => {
          // console.log(res);
          that.wsdata.push(JSON.parse(res.result));
        },
        {
          ownerId: "rg",
          mkey: "bo-enum-sign",
        }
      );
    },
    onWsSend() {
      let that = this;
      that.$reqWs.send(
        JSON.stringify({ name: "测试" + that.$verifyGuid.guid() }),
        {
          ownerId: "rg",
          mkey: "bo-enum-sign",
        }
      );
},

redis消息订阅

全局订阅服务端源码

private readonly ISubscribePublish _subscribePublishRedis;

/// <summary>
/// 
/// </summary>
/// <param name="logger"></param>
/// <param name="configuration"></param>
/// <param name="desService"></param>
/// <param name="hubContext"></param>
public SubPushProvider(ISubscribePublishRedis subscribePublishRedis)
{

    _subscribePublishRedis = subscribePublishRedis;
}
if (_configuration.GetValue<bool>("messageQ:redis:open"))
                    {
                        _subscribePublishRedis.Publish<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", result);
                        //发布其它订阅信息
                        if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
                        {
                            foreach (var item in pf.SubMkey)
                            {
                                string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
_subscribePublishRedis.Publish<SubResult>(pf, key, result);
                            }
}
                    }

配置开启redis订阅发布

//消息队列  发布  订阅
  "messageQ": {
    "redis": {
      "open": true,
      "connection": "127.0.0.1:6379,password=redis-123-!@#",
      "password": null,
      "instance": "Redis",
      "database": 8
    }
  },

订阅消息

//订阅消息
 _subscribePublishRedis.Subscribe(pf, $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}", (channel, result) =>
 {
     var subResult = (SubResult)result;
     if (channel == $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}")
     {

     }
 });

推送源码websoket、redis

/// <summary>
        /// 订阅,通常是修改后记录的id
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="value">通常为Id {data:[]}</param>
        /// <param name="pf">$"{OwnerId}JJ{SysId}JJ{ModuleKey}";</param>
        public void Sub<T>(T value, PFGlobalParameter pf)
        {
            try
            {
                Task.Run(() =>
                {
                    if (value is ResultOps<object>)
                    {
                        string content = JsonUtil.Serialize(value, true, true, true);
                        string fileName = MD5.MD5Encrypt(content);
                        try
                        {
                            SubResult result = new SubResult
                            {
                                LastDate = DateTime.Now,
                                UserId = pf.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
                                OwnerId = pf.OwnerId,
                                SysId = pf.SysId,
                                ModuleKey = pf.ModuleKey,
                                ReponseField = pf.ReponseField
                            };
                            //如果有web
                            if (!_configuration.GetValue<bool>("websoket:open") && _configuration.GetValue<bool>("websoket:open"))
                            {
                                _hubContext.Clients.All.SendAsync($"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", JsonUtil.Serialize(result));
                                //发布其它订阅信息
                                if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
                                {
                                    foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
                                    {
                                        string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
                                        _hubContext.Clients.All.SendAsync(key, JsonUtil.Serialize(result));
                                    }
                                }
                            }
                            //订阅推送信息
                            try
                            {
                                if (_configuration.GetValue<bool>("websoket:open") && _configuration.GetValue<bool>("messageQ:redis:open"))
                                {
                                    //订阅消息, redis更新,推送到 websoket
                                    _subscribePublishRedis.Subscribe<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", (channel, resultT) =>
                                    {
                                        _hubContext.Clients.All.SendAsync(channel, JsonUtil.Serialize(resultT));
                                    });
                                    //发布其它订阅信息
                                    if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
                                    {
                                        foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
                                        {
                                            string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
                                            //订阅消息, redis更新,推送到 websoket
                                            _subscribePublishRedis.Subscribe<SubResult>(pf, key, (channel, resultT) =>
                                            {
                                                _hubContext.Clients.All.SendAsync(channel, JsonUtil.Serialize(resultT));
                                            });
                                        }
                                    }
                                }
                            }
                            catch (Exception ec)
                            {
                                if (_configuration.GetValue<bool>("websoket:open"))
                                {
                                    _hubContext.Clients.All.SendAsync($"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", JsonUtil.Serialize(result));
                                    //发布其它订阅信息
                                    if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
                                    {
                                        foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
                                        {
                                            string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
                                            _hubContext.Clients.All.SendAsync(key, JsonUtil.Serialize(result));
                                        }
                                    }
                                }
                                _logger.LogError(ec, "【websoket错误】RG3.BO.Core.Provider/SubPushProvider/Sub<T>");
                            }
                            //通过redis发布信息
                            if (_configuration.GetValue<bool>("messageQ:redis:open"))
                            {
                                _subscribePublishRedis.Publish<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", result);
                                //发布其它订阅信息
                                if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
                                {
                                    foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
                                    {
                                        string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
                                        _subscribePublishRedis.Publish<SubResult>(pf, key, result);
                                    }
                                }
                            }

                        }
                        catch (Exception ex)
                        {
                            _logger.LogError(ex, "【websoket错误】RG3.BO.Core.Provider/SubPushProvider/Sub<T>");
                        }
                        FileUtil.CreateFileAndContentVirPath($"~/App_Caches/Ops/{fileName}.json", content);
                        FileUtil.CreateFileAndContentVirPath($"~/App_Caches/OpsKey/{pf.CacheKey}.json", content);
                    }
                });
            }
            catch (Exception e)
            {
                _logger.LogError(e, "【订阅错误】RG3.BO.Core.Provider/SubPushProvider/Sub<T>");
            }
        }

前端订阅

<!DOCTYPE html>
<html>
<head>
    <title>SignalR Simple Chat</title>
    <style type="text/css">
        .container {
            background-color: #99CCFF;
            border: thick solid #808080;
            padding: 20px;
            margin: 20px;
        }
    </style>
</head>
<body>
    <div class="container">
        <input type="text" id="message" />
        <input type="button" id="sendmessage" value="Send" />
        <ul id="discussion"></ul>
    </div>
    <!--Script references. -->
    <!--<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/3.1.7/signalr.min.js"></script>-->
    <!--Reference the SignalR library. -->
    <script type="text/javascript" src="http://localhost:8800/plugins/signalr/1.1.4/signalr.min.js"></script>
    <!--Add script to update the page and send messages.-->
    <script type="text/javascript">
        document.addEventListener('DOMContentLoaded', function () {

            var messageInput = document.getElementById('message');

            // Get the user name and store it to prepend to messages.
            var name = prompt('Enter your name:', '');
            // Set initial focus to message input box.
            messageInput.focus();

            // Start the connection.
            var connection = new signalR.HubConnectionBuilder()
                .withUrl('/ws/api/v3/mkey')
                .build();

            // Create a function that the hub can call to broadcast messages.
            connection.on(`owner-bt-mkey-bt-company`, function (owner,mkey, json) {
                // Html encode display name and message.
                var encodedName = owner + mkey;
                var encodedMsg = json;
                // Add the message to the page.
                var liElement = document.createElement('li');
                liElement.innerHTML = '<strong>' + encodedName + '</strong>:  ' + encodedMsg;
                document.getElementById('discussion').appendChild(liElement);
            });

            // Transport fallback functionality is now built into start.
            connection.start()
                .then(function () {
                    console.log('connection started');
                    document.getElementById('sendmessage').addEventListener('click', function (event) {
                        // Call the Send method on the hub.
                        connection.invoke('send', "bt","mkey-bt-company", messageInput.value);

                        // Clear text box and reset focus for next comment.
                        messageInput.value = '';
                        messageInput.focus();
                        event.preventDefault();
                    });
                })
                .catch(error => {
                    console.error(error.message);
                });
        });
    </script>
</body>
</html>
文档更新时间: 2023-06-04 16:52   作者:admin