https://blog.csdn.net/dengyaan/article/details/51752327
最近因为工作需要,需要使用C# 语言编写一个通过MQTT协议 ,上传数据到云端的工具。因为之前没有用过MQTT,所以 使用的时候遇到很多问题.下面将会把我遇到的问题一一解释。
1.引用源码库地址 https://github.com/eclipse/paho.mqtt.m2mqtt 2.说明 https://m2mqtt.wordpress.com/m2mqtt_doc/ 3.使用后遇到的问题 当网络中断后,MQTT 程序有时候不会自动重连。 解决方案 添加监控MQTT连接状态
1.添加全局静态变量 uPLibrary.Networking.M2Mqtt.MQTTConfig.IsSocketRun;
class MQTTConfig{ public static bool IsSocketRun = false; } 1 2 3 2.修改MqttClient 类 的Connect 方法,在连接成功后把IsSocketRun = true. MQTTConfig.IsSocketRun = true;
/// <summary> /// Connect to broker /// </summary> /// <param name="clientId">Client identifier</param> /// <param name="username">Username</param> /// <param name="password">Password</param> /// <param name="willRetain">Will retain flag</param> /// <param name="willQosLevel">Will QOS level</param> /// <param name="willFlag">Will flag</param> /// <param name="willTopic">Will topic</param> /// <param name="willMessage">Will message</param> /// <param name="cleanSession">Clean sessione flag</param> /// <param name="keepAlivePeriod">Keep alive period</param> /// <returns>Return code of CONNACK message from broker</returns> public byte Connect(string clientId, string username, string password, bool willRetain, byte willQosLevel, bool willFlag, string willTopic, string willMessage, bool cleanSession, ushort keepAlivePeriod) { // create CONNECT message MqttMsgConnect connect = new MqttMsgConnect(clientId, username, password, willRetain, willQosLevel, willFlag, willTopic, willMessage, cleanSession, keepAlivePeriod, (byte)this.ProtocolVersion);
try { // connect to the broker this.channel.Connect(); } catch (Exception ex) { throw new MqttConnectionException("Exception connecting to the broker", ex); }
this.lastCommTime = 0; this.isRunning = true; MQTTConfig.IsSocketRun = true; this.isConnectionClosing = false; // start thread for receiving messages from broker Fx.StartThread(this.ReceiveThread);
....
3.继续修改 MqttClient .cs类中的Ping() 方法
/// <summary> /// Execute ping to broker for keep alive /// </summary> /// <returns>PINGRESP message from broker</returns> private MqttMsgPingResp Ping() { MqttMsgPingReq pingreq = new MqttMsgPingReq(); try { // broker must send PINGRESP within timeout equal to keep alive period return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod); } catch (Exception e) { #if TRACE MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString()); #endif MQTTConfig.IsSocketRun = false; // client must close connection this.OnConnectionClosing(); return null; } }
4.最后在我们程序集入口初始化程序的时候 添加线程调用 。当MQTT中断后就会自动重连 ,另外提醒方法异常时一定要异常处理哦。
while (true) { LogWriter.DebugLog(string.Format("执行次数{0} IsSocketRun {1}", i, uPLibrary.Networking.M2Mqtt.MQTTConfig.IsSocketRun)); if (!uPLibrary.Networking.M2Mqtt.MQTTConfig.IsSocketRun) { 程序执行到吗。。。 } System.Threading.Thread.Sleep(10000); }
MQTT 订阅
// create client instance MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS));
// register to message received client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
string clientId = Guid.NewGuid().ToString(); client.Connect(clientId);
// subscribe to the topic "/home/temperature" with QoS 2 client.Subscribe(new string[] { "/home/temperature" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
...
static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { // handle message received }
MQTT 发布
…
// create client instance MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS));
string clientId = Guid.NewGuid().ToString(); client.Connect(clientId);
string strValue = Convert.ToString(value);
// publish a message on "/home/temperature" topic with QoS 2 client.Publish("/home/temperature", Encoding.UTF8.GetBytes(strValue), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
... --------------------- 作者:dengyaan 来源:CSDN 原文:https://blog.csdn.net/dengyaan/article/details/51752327 版权声明:本文为博主原创文章,转载请附上博文链接!
|
请发表评论