1. import ujson
    2. import utime
    3. import _thread
    4. from umqtt import MQTTClient
    5. from usr.modules.logging import getLogger
    6. from usr.modules.common import CloudObservable
    7. log = getLogger(__name__)
    8. class MqttIot(CloudObservable):
    9. """This is a class for universal mqtt iot.
    10. This class extend CloudObservable.
    11. This class has the following functions:
    12. 1. Cloud connect and disconnect
    13. 2. Publish data to cloud
    14. 3. Subscribe data from cloud
    15. Attribute:
    16. pub_topic_dict: topic dict for publish dtu through data
    17. sub_topic_dict: topic dict for subscribe cloud through data
    18. conn_type:cloud name
    19. Run step:
    20. 1. cloud = MqttIot(server, qos, port, clean_session, client_id, pub_topic, sub_topic)
    21. 2. cloud.addObserver(RemoteSubscribe)
    22. 3. cloud.init()
    23. 4. cloud.post_data(data)
    24. 5. cloud.close()
    25. """
    26. def __init__(self, server, qos, port, clean_session, client_id, pub_topic=None, sub_topic=None, life_time=120):
    27. """
    28. 1. Init parent class CloudObservable
    29. 2. Init cloud connect params and topic
    30. """
    31. super().__init__()
    32. self.conn_type = "mqtt"
    33. self.__pk = None
    34. self.__ps = None
    35. self.__dk = None
    36. self.__ds = None
    37. self.__server = server
    38. self.__qos = qos
    39. self.__port = port
    40. self.__mqtt = None
    41. self.__clean_session = clean_session
    42. self.__life_time = life_time
    43. self.__client_id = client_id
    44. self.__password = None
    45. if pub_topic == None:
    46. self.pub_topic_dict = {"0": "/python/mqtt/pub"}
    47. else:
    48. self.pub_topic_dict = pub_topic
    49. if sub_topic == None:
    50. self.sub_topic_dict = {"0": "/python/mqtt/sub"}
    51. else:
    52. self.sub_topic_dict = sub_topic
    53. def __subscribe_topic(self):
    54. for id, usr_sub_topic in self.sub_topic_dict.items():
    55. if self.__mqtt.subscribe(usr_sub_topic, qos=0) == -1:
    56. log.error("Topic [%s] Subscribe Falied." % usr_sub_topic)
    57. def __sub_cb(self, topic, data):
    58. """mqtt subscribe topic callback
    59. Parameter:
    60. topic: topic info
    61. data: response dictionary info
    62. """
    63. topic = topic.decode()
    64. try:
    65. data = ujson.loads(data)
    66. except:
    67. pass
    68. try:
    69. self.notifyObservers(self, *("raw_data", {"topic":topic, "data":data} ) )
    70. except Exception as e:
    71. log.error("{}".format(e))
    72. def __listen(self):
    73. while True:
    74. self.__mqtt.wait_msg()
    75. utime.sleep_ms(100)
    76. def __start_listen(self):
    77. """Start a new thread to listen to the cloud publish
    78. """
    79. _thread.start_new_thread(self.__listen, ())
    80. def init(self, enforce=False):
    81. """mqtt connect and subscribe topic
    82. Parameter:
    83. enforce:
    84. True: enfore cloud connect and subscribe topic
    85. False: check connect status, return True if cloud connected
    86. Return:
    87. Ture: Success
    88. False: Failed
    89. """
    90. log.debug("[init start] enforce: %s" % enforce)
    91. if enforce is False and self.__mqtt is not None:
    92. log.debug("self.get_status(): %s" % self.get_status())
    93. if self.get_status():
    94. return True
    95. if self.__mqtt is not None:
    96. self.close()
    97. log.debug("mqtt init. self.__client_id: %s, self.__password: %s, self.__dk: %s, self.__ds: %s" % (self.__client_id, self.__password, self.__dk, self.__ds))
    98. self.__mqtt = MQTTClient(client_id=self.__client_id, server=self.__server, port=self.__port,
    99. user=self.__dk, password=self.__password, keepalive=self.__life_time, ssl=False)
    100. try:
    101. self.__mqtt.connect(clean_session=self.__clean_session)
    102. except Exception as e:
    103. log.error("mqtt connect error: %s" % e)
    104. else:
    105. self.__mqtt.set_callback(self.__sub_cb)
    106. self.__subscribe_topic()
    107. log.debug("mqtt n_subscribe_topic")
    108. self.__start_listen()
    109. log.debug("mqtt start.")
    110. log.debug("self.get_status(): %s" % self.get_status())
    111. if self.get_status():
    112. return True
    113. else:
    114. return False
    115. def close(self):
    116. self.__mqtt.disconnect()
    117. def get_status(self):
    118. """Get mqtt connect status
    119. Return:
    120. True -- connect success
    121. False -- connect falied
    122. """
    123. try:
    124. return True if self.__mqtt.get_mqttsta() == 0 else False
    125. except:
    126. return False
    127. def through_post_data(self, data, topic_id):
    128. try:
    129. self.__mqtt.publish(self.pub_topic_dict[topic_id], data, self.__qos)
    130. except Exception:
    131. log.error("mqtt publish topic %s failed. data: %s" % (self.pub_topic_dict[topic_id], data))
    132. return False
    133. else:
    134. return True
    135. def post_data(self, data):
    136. pass
    137. def ota_request(self):
    138. pass
    139. def ota_action(self, action, module=None):
    140. pass
    141. def device_report(self):
    142. pass