1. import ujson,utime,_thread,sys_bus
    2. from umqtt import MQTTClient
    3. from usr.modules.logging import getLogger
    4. from usr.modules.common import numiter, option_lock, CloudObservable,UtilsDataEncryption,TimeResolver
    5. from usr.modules.common import Singleton
    6. log = getLogger("TimerService")
    7. class CloudIOT(CloudObservable):
    8. """This is a class for universal mqtt iot.
    9. This class extend CloudObservable.
    10. This class has the following functions:
    11. 1. Cloud connect and disconnect
    12. 2. Publish data to cloud
    13. 3. Subscribe data from cloud
    14. Attribute:
    15. pub_topic_dict: topic dict for publish dtu through data
    16. sub_topic_dict: topic dict for subscribe cloud through data
    17. conn_type:cloud name
    18. Run step:
    19. 1. cloud = MqttIot(server, port, clean_session, client_id, pub_topic, sub_topic)
    20. 2. cloud.addObserver(RemoteSubscribe)
    21. 3. cloud.init()
    22. 4. cloud.post_data(data)
    23. 5. cloud.close()
    24. """
    25. def __init__(self, client_id,settings):
    26. """
    27. 1. Init parent class CloudObservable
    28. 2. Init cloud connect params and topic
    29. """
    30. super().__init__()
    31. self.conn_type = "mqtt"
    32. self.__settings = settings
    33. cloudConfig = self.__settings.current_settings.get("cloud")
    34. self.__productCode = self.__settings.current_settings.get("PRODUCT_CODE")
    35. self.__server = cloudConfig.get("host",None)
    36. self.__port = cloudConfig.get("port",1883)
    37. self.__mqtt = None
    38. self.__clean_session = cloudConfig.get("cleanSession",False)
    39. self.__life_time = cloudConfig.get("lifeTime",120)
    40. self.__client_id = client_id
    41. self.__key = cloudConfig.get("userName",None)
    42. self.__secret = cloudConfig.get("secret",None)
    43. self.__pubTopic = cloudConfig.get("pubTopic").format(self.__productCode ,client_id)
    44. self.__subTopic = cloudConfig.get("subTopic").format(self.__productCode ,client_id)
    45. self.__qos = cloudConfig.get("qos",0)
    46. ts = utime.mktime(utime.localtime())+(utime.getTimeZone())*60*60
    47. #cloud userName
    48. self.__userName = "{}_{}_{}_{}".format(client_id,self.__productCode,self.__key,ts)
    49. self.__password = UtilsDataEncryption.md5_32_U("{}_{}".format(self.__userName,self.__secret))
    50. def init(self, enforce=False):
    51. """mqtt connect and subscribe topic
    52. Parameter:
    53. enforce:
    54. True: enfore cloud connect and subscribe topic
    55. False: check connect status, return True if cloud connected
    56. Return:
    57. Ture: Success
    58. False: Failed
    59. """
    60. log.debug("[init start] enforce: %s" % enforce)
    61. if enforce is False and self.__mqtt is not None:
    62. log.debug("self.get_status(): %s" % self.get_status())
    63. if self.get_status():
    64. return True
    65. if self.__mqtt is not None:
    66. self.close()
    67. log.debug("mqtt init. self.__client_id: %s, self.__userName: %s, self.__password: %s" % (self.__client_id, self.__userName, self.__password))
    68. self.__mqtt = MQTTClient(client_id=self.__client_id, server=self.__server, port=self.__port,
    69. user=self.__userName, password=self.__password, keepalive=self.__life_time, ssl=False)
    70. try:
    71. self.__mqtt.connect(clean_session=self.__clean_session)
    72. except Exception as e:
    73. log.error("mqtt connect error: %s" % e)
    74. else:
    75. self.__mqtt.set_callback(self.__sub_cb)
    76. self.__subscribe_topic()
    77. log.debug("========mqtt n_subscribe_topic")
    78. self.__start_listen()
    79. log.debug("mqtt start.")
    80. log.debug("self.get_status(): %s" % self.get_status())
    81. if self.get_status():
    82. return True
    83. else:
    84. return False
    85. def __subscribe_topic(self):
    86. if self.__mqtt.subscribe(self.__subTopic, qos=0) == -1:
    87. log.error("Topic [%s] Subscribe Falied." % self.__subTopic)
    88. def __sub_cb(self, topic, data):
    89. """mqtt subscribe topic callback
    90. Parameter:
    91. topic: topic info
    92. data: response dictionary info
    93. """
    94. topic = topic.decode()
    95. try:
    96. log.debug("Cloud收到数据:{}\r\n{}".format(topic,data))
    97. data = ujson.loads(data)
    98. except:
    99. pass
    100. try:
    101. sys_bus.publish("CloudDataProcess",data)
    102. except Exception as e:
    103. log.error("{}".format(e))
    104. def __listen(self):
    105. while True:
    106. self.__mqtt.wait_msg()
    107. utime.sleep_ms(100)
    108. def __start_listen(self):
    109. """Start a new thread to listen to the cloud publish
    110. """
    111. _thread.start_new_thread(self.__listen, ())
    112. def get_status(self):
    113. """Get mqtt connect status
    114. Return:
    115. True -- connect success
    116. False -- connect falied
    117. """
    118. try:
    119. return True if self.__mqtt.get_mqttsta() == 0 else False
    120. except:
    121. return False
    122. def close(self):
    123. self.__mqtt.disconnect()
    124. def post_data(self, data):
    125. try:
    126. log.info("up,{},{}".format(self.__pubTopic,data))
    127. self.__mqtt.publish(self.__pubTopic, data, self.__qos)
    128. except Exception:
    129. log.error("mqtt publish topic %s failed. data: %s" % (self.__pubTopic, data))
    130. return False
    131. else:
    132. return True
    133. def post_data2(self, serviceName,jsonData):
    134. return self.post_data(self.UpdataPack(serviceName,jsonData))
    135. def ota_request(self):
    136. pass
    137. def ota_action(self, action, module=None):
    138. pass
    139. def device_report(self):
    140. pass
    141. def UpdataPack(self,serviceName,jsonData):
    142. msg = configCloudiot.current_settings.get("Cloud")
    143. msg["service_id"] = serviceName
    144. msg["data"] = jsonData
    145. msg["event_time"] = TimeResolver.getUtcTime()
    146. return "[{}]".format(msg)
    147. def UploadData(self):
    148. self.UploadData_GatewayBasicInfo()
    149. def UploadData_GatewayBasicInfo(self):
    150. server = ModelGateway()
    151. er = server.current_settings.get("Gateway_BasicInfo")
    152. er["currentDateTime"] = TimeResolver.getUtcTime()
    153. return self.post_data2(server.current_settings.get("cloud_service_id"),er)
    154. class CloudOTA(object):
    155. def __init__(self) -> None:
    156. pass