简要描述:
- 详见:
https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/websockets?view=aspnetcore-5.0
https://docs.microsoft.com/zh-cn/aspnet/signalr/overview/guide-to-the-api/hubs-api-guide-server
https://github.com/mqttjs/MQTT.js
- 通知接口采用websoket技术,用于大屏展示、缓存问题、第三方集成等。
版本号 | 制定人 | 制定日期 | 修订日期 |
---|---|---|---|
v3 | 陈碧贵 | 2019-01-20 | xxxx-xx-xx |
请求URL:
http://{url参数}/ws/api/v3/mkey
请求方式:
- GET
请求头:
参数名 | 是否必须 | 类型 | 说明 |
---|---|---|---|
XownerId | 是 | string | 项目唯一ID,对应bo_project {ownerId} |
XsysId | 否 | string | 所属系统 对应 bo_system 表 |
XuserFromFirstShareId | string | 否 | 一级分享用户ID, bo_user user_id |
XuserFromSecondShareId | string | 否 | 二级分享用户ID, bo_user user_id |
XverifyApi | 是 | string | 加密规则encryptByDES(`${newGuid()} |
XfilterAreaCode | 否 | string | 行政区编码, 对应 bo_sys_area area_code |
Content-Type: | 是 | string | application/json; charset=utf-8 请求类型 |
Authorization | 是 | string | 当前用户认证信息,通过登录接口获取 Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6 |
请求参数:
参数名 | 是否必须 | 类型 | 说明 |
---|---|---|---|
_nodejs | 是 | string | common-data 对应/Config/AppNodeJs/Plugins/fn- common-data.js, 通过nodejs插件,对响应结果进行二次处理 common-data 普通数据(小写驼峰)common-data-under 普通数据下划线 |
返回示例:
正确时返回:
{
options: {},
result: "{"lastDate":"2021-12-09T08:02:31.3064024+08:00","ops":{"queues":"bt|mall-business|manager"}}"
}
错误时返回:
{
"code": 0,
"message": ""
"error":{
errorCode:null,
errorText:null
}
"success": false,
}
返回参数说明:
参数名 | 类型 | 说明 |
---|---|---|
data | object | result:ops:queues |
cacheType | string | redis/sqlite/local 数据来源缓存 的类型(便于开发调试和性能优化) |
cacheDate | date | 最后数据获取时间 |
初始化redis 变更推送到websoket
private readonly ISubPush _isubPush;
var pf = new PFGlobalParameter { OwnerId = RequestPFUtil.GetOwnerId(httpContext), SysId = RequestPFUtil.GetSysId(httpContext) ,ModuleKey=mkey};
_isubPush.SubInit<SubResult>(new SubResult { }, pf);
ws前端发送
// 发送到服务器
that.connection.invoke('send', options.ownerId, options.mkey, value);
ws后端发送
/**
* 服务端发送消息 模块 数据变更进行通知到前端
*
* owner=ownerId
* mkey=mkey
var result = new
{
lastDate = DateTime.Now,
userId = pf.UserId,
ownerId = pf.OwnerId,
sysId = pf.SysId,
moduleKey = pf.ModuleKey,
reponseField = pf.ReponseField
};
*/
public async Task Send(string owner, string mkey, string json)
{
// Call the broadcastMessage method to update clients.
await Clients.All.SendAsync($"owner-{owner}-mkey-{mkey}", json);
}
redis后端发送
/**
SubResult result = new SubResult
{
LastDate = DateTime.Now,
UserId = pf.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
OwnerId = pf.OwnerId,
SysId = pf.SysId,
ModuleKey = pf.ModuleKey,
ReponseField = pf.ReponseField
};
_subscribePublishRedis.Publish<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", result);
Job中推送使用
public async Task Execute(IJobExecutionContext context)
{
var _sc = (SchedulerConfig)context.JobDetail.JobDataMap["_sc"];
if (_sc.Enabled != true)
{
await Console.Out.WriteLineAsync(string.Format(_sc.Msg.ProcessError, DateTime.Now, 0, "服务未开启"));
return;
}
var _configuration = (IConfiguration)context.JobDetail.JobDataMap["_configuration"];
var _serviceProvider = (ServiceProvider)context.JobDetail.JobDataMap["_serviceProvider"];
try
{
var _sub = _serviceProvider.GetService<ISubscribePublishRedis>();
foreach (var item in _sc.SubModule)
{
if (string.IsNullOrEmpty(item.Channel) || string.IsNullOrEmpty(item.OwnerId) || string.IsNullOrEmpty(item.Mkey)) continue;
_sub.Publish<SubResult>(item.Params, item.Channel, new SubResult
{
LastDate = DateTime.Now,
UserId = item.Params.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
OwnerId = item.Params.OwnerId,
SysId = item.Params.SysId,
ModuleKey = item.Params.ModuleKey,
ReponseField = item.Params.ReponseField
});
}
}
catch (Exception e)
{
await FileUtil.CreateFileAndContentAsync($"~/App_Logs/quartz/{DateTime.Now.ToFileTime()}.json", e.Message);
Console.WriteLine($"【{_sc.Title}】[{e.Message}]" + JsonUtil.Serialize(e.StackTrace));
}
}
后端发送案例
var result = new
{
lastDate = DateTime.Now,
userId = pf.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
ownerId = pf.OwnerId,
sysId = pf.SysId,
moduleKey = pf.ModuleKey,
reponseField = pf.ReponseField
};
if (_configuration.GetValue<bool>("websoket:open"))
{
_hubContext.Clients.All.SendAsync($"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", JsonUtil.Serialize(result));
}
后端注入的服务
/// <summary>
/// 订阅实现
/// </summary>
public class SubPushProvider : ISubPush
{
private readonly DesService _desService;
private readonly IConfiguration _configuration;
private readonly IHubContext<MkeyHub> _hubContext;
private readonly ILogger<SubPushProvider> _logger;
//private readonly IAuthProvider _authProvider;
public SubPushProvider(ILogger<SubPushProvider> logger, IConfiguration configuration, DesService desService, IHubContext<MkeyHub> hubContext)
{
_configuration = configuration;
_desService = desService;
_hubContext = hubContext;
_logger = logger;
// IAuthProvider authProvider,
//_authProvider = authProvider;
}
}
通知订阅参考代码 :
pull: function (callback, options = {
ownerId: "rg",
mkey: "bo-enum-sign"
}) {
// debugger;
let that = this;
if (!that.connection || that.connection.state === 0) {
that.build();
}
// 服务器推送过来的信息
// Create a function that the hub can call to broadcast messages.
that.connection.on(`owner-${options.ownerId}-mkey-${options.mkey}`, function (json) {
callback({
options: options,
result: json
});
console.log("websoket-pull-success");
});
}
通知订阅和发送使用
methods: {
// handleClick(tab, event) {
// console.log(tab, event);
// },
onWsPull() {
let that = this;
that.$reqWs.pull(
(res) => {
// console.log(res);
that.wsdata.push(JSON.parse(res.result));
},
{
ownerId: "rg",
mkey: "bo-enum-sign",
}
);
},
onWsSend() {
let that = this;
that.$reqWs.send(
JSON.stringify({ name: "测试" + that.$verifyGuid.guid() }),
{
ownerId: "rg",
mkey: "bo-enum-sign",
}
);
},
通知订阅和发送使用
methods: {
// handleClick(tab, event) {
// console.log(tab, event);
// },
onWsPull() {
let that = this;
that.$reqWs.pull(
(res) => {
// console.log(res);
that.wsdata.push(JSON.parse(res.result));
},
{
ownerId: "rg",
mkey: "bo-enum-sign",
}
);
},
onWsSend() {
let that = this;
that.$reqWs.send(
JSON.stringify({ name: "测试" + that.$verifyGuid.guid() }),
{
ownerId: "rg",
mkey: "bo-enum-sign",
}
);
},
redis消息订阅
全局订阅服务端源码
private readonly ISubscribePublish _subscribePublishRedis;
/// <summary>
///
/// </summary>
/// <param name="logger"></param>
/// <param name="configuration"></param>
/// <param name="desService"></param>
/// <param name="hubContext"></param>
public SubPushProvider(ISubscribePublishRedis subscribePublishRedis)
{
_subscribePublishRedis = subscribePublishRedis;
}
if (_configuration.GetValue<bool>("messageQ:redis:open"))
{
_subscribePublishRedis.Publish<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", result);
//发布其它订阅信息
if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
{
foreach (var item in pf.SubMkey)
{
string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
_subscribePublishRedis.Publish<SubResult>(pf, key, result);
}
}
}
配置开启redis订阅发布
//消息队列 发布 订阅
"messageQ": {
"redis": {
"open": true,
"connection": "127.0.0.1:6379,password=redis-123-!@#",
"password": null,
"instance": "Redis",
"database": 8
}
},
订阅消息
//订阅消息
_subscribePublishRedis.Subscribe(pf, $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}", (channel, result) =>
{
var subResult = (SubResult)result;
if (channel == $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}")
{
}
});
推送源码websoket、redis
/// <summary>
/// 订阅,通常是修改后记录的id
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="value">通常为Id {data:[]}</param>
/// <param name="pf">$"{OwnerId}JJ{SysId}JJ{ModuleKey}";</param>
public void Sub<T>(T value, PFGlobalParameter pf)
{
try
{
Task.Run(() =>
{
if (value is ResultOps<object>)
{
string content = JsonUtil.Serialize(value, true, true, true);
string fileName = MD5.MD5Encrypt(content);
try
{
SubResult result = new SubResult
{
LastDate = DateTime.Now,
UserId = pf.UserId,// _authProvider.GetCurrentUserId(pf.OwnerId, pf.SysId),
OwnerId = pf.OwnerId,
SysId = pf.SysId,
ModuleKey = pf.ModuleKey,
ReponseField = pf.ReponseField
};
//如果有web
if (!_configuration.GetValue<bool>("websoket:open") && _configuration.GetValue<bool>("websoket:open"))
{
_hubContext.Clients.All.SendAsync($"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", JsonUtil.Serialize(result));
//发布其它订阅信息
if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
{
foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
{
string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
_hubContext.Clients.All.SendAsync(key, JsonUtil.Serialize(result));
}
}
}
//订阅推送信息
try
{
if (_configuration.GetValue<bool>("websoket:open") && _configuration.GetValue<bool>("messageQ:redis:open"))
{
//订阅消息, redis更新,推送到 websoket
_subscribePublishRedis.Subscribe<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", (channel, resultT) =>
{
_hubContext.Clients.All.SendAsync(channel, JsonUtil.Serialize(resultT));
});
//发布其它订阅信息
if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
{
foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
{
string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
//订阅消息, redis更新,推送到 websoket
_subscribePublishRedis.Subscribe<SubResult>(pf, key, (channel, resultT) =>
{
_hubContext.Clients.All.SendAsync(channel, JsonUtil.Serialize(resultT));
});
}
}
}
}
catch (Exception ec)
{
if (_configuration.GetValue<bool>("websoket:open"))
{
_hubContext.Clients.All.SendAsync($"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", JsonUtil.Serialize(result));
//发布其它订阅信息
if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
{
foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
{
string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
_hubContext.Clients.All.SendAsync(key, JsonUtil.Serialize(result));
}
}
}
_logger.LogError(ec, "【websoket错误】RG3.BO.Core.Provider/SubPushProvider/Sub<T>");
}
//通过redis发布信息
if (_configuration.GetValue<bool>("messageQ:redis:open"))
{
_subscribePublishRedis.Publish<SubResult>(pf, $"owner-{pf.OwnerId}-mkey-{pf.ModuleKey}", result);
//发布其它订阅信息
if (pf.SubMkey != null && pf.SubMkey.Any() && pf.SubMkey.Count() > 0)
{
foreach (var item in pf.SubMkey.OrderBy(temp => temp.SortId))
{
string key = $"owner-{item.OwnerId ?? pf.OwnerId}-mkey-{item.Mkey ?? pf.ModuleKey}";
_subscribePublishRedis.Publish<SubResult>(pf, key, result);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "【websoket错误】RG3.BO.Core.Provider/SubPushProvider/Sub<T>");
}
FileUtil.CreateFileAndContentVirPath($"~/App_Caches/Ops/{fileName}.json", content);
FileUtil.CreateFileAndContentVirPath($"~/App_Caches/OpsKey/{pf.CacheKey}.json", content);
}
});
}
catch (Exception e)
{
_logger.LogError(e, "【订阅错误】RG3.BO.Core.Provider/SubPushProvider/Sub<T>");
}
}
前端订阅
<!DOCTYPE html>
<html>
<head>
<title>SignalR Simple Chat</title>
<style type="text/css">
.container {
background-color: #99CCFF;
border: thick solid #808080;
padding: 20px;
margin: 20px;
}
</style>
</head>
<body>
<div class="container">
<input type="text" id="message" />
<input type="button" id="sendmessage" value="Send" />
<ul id="discussion"></ul>
</div>
<!--Script references. -->
<!--<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/3.1.7/signalr.min.js"></script>-->
<!--Reference the SignalR library. -->
<script type="text/javascript" src="http://localhost:8800/plugins/signalr/1.1.4/signalr.min.js"></script>
<!--Add script to update the page and send messages.-->
<script type="text/javascript">
document.addEventListener('DOMContentLoaded', function () {
var messageInput = document.getElementById('message');
// Get the user name and store it to prepend to messages.
var name = prompt('Enter your name:', '');
// Set initial focus to message input box.
messageInput.focus();
// Start the connection.
var connection = new signalR.HubConnectionBuilder()
.withUrl('/ws/api/v3/mkey')
.build();
// Create a function that the hub can call to broadcast messages.
connection.on(`owner-bt-mkey-bt-company`, function (owner,mkey, json) {
// Html encode display name and message.
var encodedName = owner + mkey;
var encodedMsg = json;
// Add the message to the page.
var liElement = document.createElement('li');
liElement.innerHTML = '<strong>' + encodedName + '</strong>: ' + encodedMsg;
document.getElementById('discussion').appendChild(liElement);
});
// Transport fallback functionality is now built into start.
connection.start()
.then(function () {
console.log('connection started');
document.getElementById('sendmessage').addEventListener('click', function (event) {
// Call the Send method on the hub.
connection.invoke('send', "bt","mkey-bt-company", messageInput.value);
// Clear text box and reset focus for next comment.
messageInput.value = '';
messageInput.focus();
event.preventDefault();
});
})
.catch(error => {
console.error(error.message);
});
});
</script>
</body>
</html>
文档更新时间: 2023-06-04 16:52 作者:admin