C#MQTT協(xié)議服務(wù)器與客戶端通訊實(shí)現(xiàn)(客戶端包含斷開重連模塊)
1 DLL版本
MQTTnet.DLL版本-2.7.5.0
基于比較老的項(xiàng)目中應(yīng)用的DLL,其他更高版本變化可能較大,謹(jǐn)慎參考。
2 服務(wù)器
開啟服務(wù)器
關(guān)閉服務(wù)器
綁定事件【客戶端連接服務(wù)器事件】
綁定事件【客戶端斷開(服務(wù)器)連接事件】
綁定事件【客戶端訂閱主題事件】
綁定事件【客戶端退訂主題事件】
綁定事件【接收客戶端(發(fā)送)消息事件】
using System;
using System.Net;
using MQTTnet;
using MQTTnet.Server;
namespace Demo_MQTT.Model
{
public class ServerModel
{
private static MqttServer _mqttServer = null;
private readonly Action<string> _callbackLog;
public ServerModel(Action<string> callbackLog)
{
_callbackLog = callbackLog;
}
/// <summary>
/// 綁定客戶端連接服務(wù)器事件
/// </summary>
private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
{
WriteLog($"客戶端[{e.Client.ClientId}]已連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
}
/// <summary>
/// 綁定客戶端斷開連接事件
/// </summary>
private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
{
WriteLog($"客戶端[{e.Client.ClientId}]已斷開連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
}
/// <summary>
/// 綁定客戶端訂閱主題事件
/// </summary>
private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e)
{
WriteLog($">>> 客戶端{(lán)e.ClientId}訂閱主題{e.TopicFilter.Topic}");
}
/// <summary>
/// 綁定客戶端退訂主題事件
/// </summary>
/// <param name="e"></param>
private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e)
{
WriteLog($">>> 客戶端{(lán)e.ClientId}退訂主題{e.TopicFilter}");
}
/// <summary>
/// 綁定接收客戶端消息事件
/// </summary>
private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
WriteLog($"接收到{e.ClientId}發(fā)送來的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}");
}
private void WriteLog(string log)
{
_callbackLog?.Invoke(log);
}
/// <summary>
/// 開啟服務(wù)器
/// </summary>
/// <param name="ip">IP地址</param>
/// <param name="port">端口號(hào)</param>
public void StartServer(string ip, int port)
{
if (_mqttServer == null)
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))
.WithConnectionBacklog(1000)
.WithDefaultEndpointPort(port);
IMqttServerOptions options = optionsBuilder.Build();
try
{
_mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
_mqttServer.ClientConnected += MqttServer_ClientConnected;
_mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
_mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;
_mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic;
_mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic;
_mqttServer.StartAsync(options);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return;
}
WriteLog($"MQTT服務(wù)器啟動(dòng)成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
}
}
/// <summary>
/// 關(guān)閉服務(wù)器
/// </summary>
public void CloseServer()
{
_mqttServer?.StopAsync();
}
}
}
3 客戶端
連接服務(wù)器
屬性:客戶端連接狀態(tài)
客戶端斷開重連線程
獲取所有訂閱主題
訂閱主題
退訂主題
發(fā)送消息
綁定事件【客戶端連接服務(wù)器事件】
綁定事件【客戶端斷開(服務(wù)器)連接事件】
綁定事件【客戶端接收消息事件】
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace Demo_MQTT.Model
{
public class ClientModel
{
/// <summary>
/// 記錄所有訂閱主題,用于斷開重連時(shí)重新訂閱主題
/// </summary>
private readonly List<string> _subscribeTopics = new List<string>();
private MqttClient _mqttClient = null;
private string _serverIp;
private int _nServerPort;
private bool _isRunningReConnectThreadStart = false;
private string _clienID;
/// <summary>
/// 接受消息回調(diào)函數(shù),參數(shù):主題,消息內(nèi)容
/// </summary>
private readonly Action<string, byte[]> _callbackReceived;
private readonly Action<string> _callbackLog;
/// <summary>
/// 構(gòu)造函數(shù)
/// </summary>
/// <param name="callbackReceived">接受消息回調(diào)函數(shù),參數(shù):主題,消息內(nèi)容</param>
/// <param name="callbackLog"></param>
public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog)
{
_callbackReceived = callbackReceived;
_callbackLog = callbackLog;
}
/// <summary>
/// 連接服務(wù)器
/// </summary>
private async void ConnectServer()
{
try
{
if (_mqttClient == null)
{
_mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;
_mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已連接到MQTT服務(wù)器!");
_mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已斷開MQTT連接!");
_mqttClient.ApplicationMessageReceived += (sender, args) =>
{
_callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload);
};
}
if (_mqttClient.IsConnected) return;
IMqttClientOptions options = new MqttClientOptions
{
ChannelOptions = new MqttClientTcpOptions()
{
Server = _serverIp,
Port = _nServerPort
},
CleanSession = true
};
_clienID = options.ClientId;
await _mqttClient.ConnectAsync(options);
if (_mqttClient.IsConnected)
{
ReConnectThreadStart();
SubscribeAsync();
}
}
catch (Exception ex)
{
WriteLog("連接到MQTT服務(wù)器失敗!");
}
}
/// <summary>
/// 客戶端重連服務(wù)器線程-啟動(dòng)
/// </summary>
/// <returns></returns>
private void ReConnectThreadStart()
{
if (_isRunningReConnectThreadStart) return;
if (_mqttClient != null)
{
new Task(() =>
{
_isRunningReConnectThreadStart = true;
Thread.Sleep(5000);
while (true)
{
Thread.Sleep(1000);
if (!IsConnect)
{
WriteLog($"客戶端[{_clienID}]斷開連接,嘗試重新連接服務(wù)器中...");
int i;
for (i = 0; i < 60; i++)
{
if (IsConnect) break;
WriteLog($"嘗試第{i + 1}次連接服務(wù)器");
ConnectServer();
Thread.Sleep(1000);
if (IsConnect) break;
}
_isRunningReConnectThreadStart = i < 60;
}
if (!_isRunningReConnectThreadStart) break;
}
}).Start();
}
}
private void WriteLog(string log)
{
_callbackLog?.Invoke(log);
}
/// <summary>
/// 客戶端連接狀態(tài)
/// </summary>
public bool IsConnect => _mqttClient?.IsConnected == true;
/// <summary>
/// 連接服務(wù)器
/// </summary>
/// <param name="serverIp">服務(wù)器IP</param>
/// <param name="serverPort">服務(wù)器端口</param>
/// <param name="topic"></param>
public async void ConnectServer(string serverIp, int serverPort)
{
_serverIp = serverIp;
_nServerPort = serverPort;
await Task.Run(() => { ConnectServer(); });
}
/// <summary>
/// 關(guān)閉客戶端,斷開客戶端和服務(wù)器的連接
/// </summary>
public void CloseClient()
{
_mqttClient.DisconnectAsync();
}
/// <summary>
/// 發(fā)送消息
/// </summary>
/// <param name="topic">發(fā)送主題</param>
/// <param name="cmd">發(fā)送內(nèi)容</param>
[Obsolete("Obsolete")]
public void PublishAsync(string topic, string cmd)
{
var bytes = Encoding.UTF8.GetBytes(cmd);
var mode = MqttQualityOfServiceLevel.AtMostOnce;
var appMsg = new MqttApplicationMessage(topic, bytes, mode, false);
_mqttClient.PublishAsync(appMsg);//發(fā)送消息
}
/// <summary>
/// 訂閱主題
/// </summary>
/// <param name="topics">主題標(biāo)識(shí)</param>
public void SubscribeAsync(params string[] topics)
{
foreach (var topic in topics)
{
if (!_subscribeTopics.Contains(topic))
{
_subscribeTopics.Add(topic);
}
}
var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
_mqttClient?.SubscribeAsync(topicFilters);
}
/// <summary>
/// 退訂已訂閱主題
/// </summary>
/// <param name="topics">主題標(biāo)識(shí)</param>
public void UnSubscribeAsync(params string[] topics)
{
if (topics == null || topics.Length == 0) return;
var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
_mqttClient.SubscribeAsync(topicFilters);
}
/// <summary>
/// 獲取所有訂閱主題
/// </summary>
public string[] GetAllTopic => _subscribeTopics.ToArray();
}
}
到此這篇關(guān)于C#MQTT協(xié)議服務(wù)器與客戶端通訊實(shí)現(xiàn)(客戶端包含斷開重連模塊)的文章就介紹到這了,更多相關(guān)C#中MQTT服務(wù)器與客戶端通訊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#實(shí)現(xiàn)的4種常用數(shù)據(jù)校驗(yàn)方法小結(jié)(CRC校驗(yàn),LRC校驗(yàn),BCC校驗(yàn),累加和校驗(yàn))
本文主要介紹了C#實(shí)現(xiàn)的4種常用數(shù)據(jù)校驗(yàn)方法小結(jié)(CRC校驗(yàn),LRC校驗(yàn),BCC校驗(yàn),累加和校驗(yàn)),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
C#中的SQLCommand命令與DbTransaction事務(wù)處理
這篇文章介紹了C#中的SQLCommand命令與DbTransaction事務(wù)處理,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-05-05
C#窗口轉(zhuǎn)向方式(由一個(gè)窗口,跳轉(zhuǎn)到另一個(gè)窗口)
這篇文章主要介紹了C#窗口轉(zhuǎn)向方式(由一個(gè)窗口,跳轉(zhuǎn)到另一個(gè)窗口)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07

