网络端

  1. using MQTT;
  2. using MQTTnet.Client;
  3. using MQTTnet.Protocol;
  4. using MQTTnet;
  5. using MQTTnet.Server;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using MQTTnet.Client.Receiving;
  12. namespace MQTT
  13. {
  14. internal class Program
  15. {
  16. static void Main(string[] args)
  17. {
  18. MqttServerClass serverClass = new MqttServerClass();
  19. serverClass.StartMqttServer().Wait();
  20. Console.ReadLine();
  21. }
  22. }
  23. public static class Config
  24. {
  25. public static int Port { get; set; } = 1883;
  26. public static string UserName { get; set; } = "cyw";
  27. public static string Password { get; set; } = "123456";
  28. }
  29. public class UserInstance
  30. {
  31. public string ClientId { get; set; }
  32. public string UserName { get; set; }
  33. public string Password { get; set; }
  34. }
  35. public class MqttServerClass
  36. {
  37. private IMqttServer mqttServer;
  38. private List<MqttApplicationMessage> messages = new List<MqttApplicationMessage>();
  39. public async Task StartMqttServer()
  40. {
  41. try
  42. {
  43. if (mqttServer == null)
  44. {
  45. var optionsBuilder = new MqttServerOptionsBuilder()
  46. .WithDefaultEndpoint()
  47. .WithDefaultEndpointPort(Config.Port)
  48. .WithConnectionValidator(
  49. c =>
  50. {
  51. var flag = c.Username == Config.UserName && c.Password == Config.Password;
  52. if (!flag)
  53. {
  54. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  55. return;
  56. }
  57. //设置代码为 Success
  58. c.ReasonCode = MqttConnectReasonCode.Success;
  59. //instances.Add(new UserInstance() //缓存到内存的List集合当中
  60. //{
  61. // ClientId = c.ClientId,
  62. // UserName = c.Username,
  63. // Password = c.Password
  64. //});
  65. })
  66. //订阅拦截器
  67. .WithSubscriptionInterceptor(
  68. c =>
  69. {
  70. if (c == null) return;
  71. c.AcceptSubscription = true;
  72. })
  73. //应用程序消息拦截器
  74. .WithApplicationMessageInterceptor(
  75. c =>
  76. {
  77. if (c == null) return;
  78. c.AcceptPublish = true;
  79. })
  80. //clean sesison是否生效
  81. .WithPersistentSessions();
  82. mqttServer = new MqttFactory().CreateMqttServer();
  83. //客户端断开连接拦截器
  84. //mqttServer.UseClientDisconnectedHandler(c =>
  85. //{
  86. // //var user = instances.FirstOrDefault(t => t.ClientId == c.ClientId);
  87. // //if (user != null)
  88. // //{
  89. // // instances.Remove(user);
  90. // //}
  91. //});
  92. //服务开始
  93. mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
  94. //服务停止
  95. mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);
  96. //客户端连接
  97. mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
  98. //客户端断开连接(此事件会覆盖拦截器)
  99. mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
  100. //客户端订阅
  101. mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate(OnMqttServerClientSubscribedTopic);
  102. //客户端取消订阅
  103. mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
  104. //服务端收到消息
  105. mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServerApplicationMessageReceived);
  106. await mqttServer.StartAsync(optionsBuilder.Build());
  107. //主动发送消息到客户端
  108. //await mqttServer.PublishAsync(new
  109. // MqttApplicationMessage
  110. //{
  111. // Topic = "testtopic",
  112. // Payload = Encoding.UTF8.GetBytes("dsdsd")
  113. //});
  114. //mqttServer.GetClientStatusAsync();
  115. //mqttServer.GetRetainedApplicationMessagesAsync();
  116. //mqttServer.GetSessionStatusAsync();
  117. }
  118. }
  119. catch (Exception ex)
  120. {
  121. Console.WriteLine($"MQTT Server start fail.>{ex.Message}");
  122. }
  123. }
  124. private void OnMqttServerStarted(EventArgs e)
  125. {
  126. if (mqttServer.IsStarted)
  127. {
  128. Console.WriteLine("MQTT服务启动完成!");
  129. }
  130. }
  131. private void OnMqttServerStopped(EventArgs e)
  132. {
  133. if (!mqttServer.IsStarted)
  134. {
  135. Console.WriteLine("MQTT服务停止完成!");
  136. }
  137. }
  138. private void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e)
  139. {
  140. Console.WriteLine($"客户端[{e.ClientId}]已连接");
  141. }
  142. private void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e)
  143. {
  144. Console.WriteLine($"客户端[{e.ClientId}]已断开连接!");
  145. }
  146. private void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
  147. {
  148. Console.WriteLine($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter}]!");
  149. }
  150. private void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
  151. {
  152. Console.WriteLine($"客户端[{e.ClientId}]已成功取消订阅主题[{e.TopicFilter}]!");
  153. }
  154. private void OnMqttServerApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
  155. {
  156. messages.Add(e.ApplicationMessage);
  157. Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
  158. Console.WriteLine($"客户端[{e.ClientId}]>> Topic[{e.ApplicationMessage.Topic}] Payload[{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[] { })}] Qos[{e.ApplicationMessage.QualityOfServiceLevel}] Retain[{e.ApplicationMessage.Retain}]");
  159. }
  160. }
  161. }