import ujson,utime,_thread,sys_bus
from umqtt import MQTTClient
from usr.modules.logging import getLogger
from usr.modules.common import numiter, option_lock, CloudObservable,UtilsDataEncryption,TimeResolver
from usr.modules.common import Singleton
log = getLogger("TimerService")
class CloudIOT(CloudObservable):
"""This is a class for universal mqtt iot.
This class extend CloudObservable.
This class has the following functions:
1. Cloud connect and disconnect
2. Publish data to cloud
3. Subscribe data from cloud
Attribute:
pub_topic_dict: topic dict for publish dtu through data
sub_topic_dict: topic dict for subscribe cloud through data
conn_type:cloud name
Run step:
1. cloud = MqttIot(server, port, clean_session, client_id, pub_topic, sub_topic)
2. cloud.addObserver(RemoteSubscribe)
3. cloud.init()
4. cloud.post_data(data)
5. cloud.close()
"""
def __init__(self, client_id,settings):
"""
1. Init parent class CloudObservable
2. Init cloud connect params and topic
"""
super().__init__()
self.conn_type = "mqtt"
self.__settings = settings
cloudConfig = self.__settings.current_settings.get("cloud")
self.__productCode = self.__settings.current_settings.get("PRODUCT_CODE")
self.__server = cloudConfig.get("host",None)
self.__port = cloudConfig.get("port",1883)
self.__mqtt = None
self.__clean_session = cloudConfig.get("cleanSession",False)
self.__life_time = cloudConfig.get("lifeTime",120)
self.__client_id = client_id
self.__key = cloudConfig.get("userName",None)
self.__secret = cloudConfig.get("secret",None)
self.__pubTopic = cloudConfig.get("pubTopic").format(self.__productCode ,client_id)
self.__subTopic = cloudConfig.get("subTopic").format(self.__productCode ,client_id)
self.__qos = cloudConfig.get("qos",0)
ts = utime.mktime(utime.localtime())+(utime.getTimeZone())*60*60
#cloud userName
self.__userName = "{}_{}_{}_{}".format(client_id,self.__productCode,self.__key,ts)
self.__password = UtilsDataEncryption.md5_32_U("{}_{}".format(self.__userName,self.__secret))
def init(self, enforce=False):
"""mqtt connect and subscribe topic
Parameter:
enforce:
True: enfore cloud connect and subscribe topic
False: check connect status, return True if cloud connected
Return:
Ture: Success
False: Failed
"""
log.debug("[init start] enforce: %s" % enforce)
if enforce is False and self.__mqtt is not None:
log.debug("self.get_status(): %s" % self.get_status())
if self.get_status():
return True
if self.__mqtt is not None:
self.close()
log.debug("mqtt init. self.__client_id: %s, self.__userName: %s, self.__password: %s" % (self.__client_id, self.__userName, self.__password))
self.__mqtt = MQTTClient(client_id=self.__client_id, server=self.__server, port=self.__port,
user=self.__userName, password=self.__password, keepalive=self.__life_time, ssl=False)
try:
self.__mqtt.connect(clean_session=self.__clean_session)
except Exception as e:
log.error("mqtt connect error: %s" % e)
else:
self.__mqtt.set_callback(self.__sub_cb)
self.__subscribe_topic()
log.debug("========mqtt n_subscribe_topic")
self.__start_listen()
log.debug("mqtt start.")
log.debug("self.get_status(): %s" % self.get_status())
if self.get_status():
return True
else:
return False
def __subscribe_topic(self):
if self.__mqtt.subscribe(self.__subTopic, qos=0) == -1:
log.error("Topic [%s] Subscribe Falied." % self.__subTopic)
def __sub_cb(self, topic, data):
"""mqtt subscribe topic callback
Parameter:
topic: topic info
data: response dictionary info
"""
topic = topic.decode()
try:
log.debug("Cloud收到数据:{}\r\n{}".format(topic,data))
data = ujson.loads(data)
except:
pass
try:
sys_bus.publish("CloudDataProcess",data)
except Exception as e:
log.error("{}".format(e))
def __listen(self):
while True:
self.__mqtt.wait_msg()
utime.sleep_ms(100)
def __start_listen(self):
"""Start a new thread to listen to the cloud publish
"""
_thread.start_new_thread(self.__listen, ())
def get_status(self):
"""Get mqtt connect status
Return:
True -- connect success
False -- connect falied
"""
try:
return True if self.__mqtt.get_mqttsta() == 0 else False
except:
return False
def close(self):
self.__mqtt.disconnect()
def post_data(self, data):
try:
log.info("up,{},{}".format(self.__pubTopic,data))
self.__mqtt.publish(self.__pubTopic, data, self.__qos)
except Exception:
log.error("mqtt publish topic %s failed. data: %s" % (self.__pubTopic, data))
return False
else:
return True
def post_data2(self, serviceName,jsonData):
return self.post_data(self.UpdataPack(serviceName,jsonData))
def ota_request(self):
pass
def ota_action(self, action, module=None):
pass
def device_report(self):
pass
def UpdataPack(self,serviceName,jsonData):
msg = configCloudiot.current_settings.get("Cloud")
msg["service_id"] = serviceName
msg["data"] = jsonData
msg["event_time"] = TimeResolver.getUtcTime()
return "[{}]".format(msg)
def UploadData(self):
self.UploadData_GatewayBasicInfo()
def UploadData_GatewayBasicInfo(self):
server = ModelGateway()
er = server.current_settings.get("Gateway_BasicInfo")
er["currentDateTime"] = TimeResolver.getUtcTime()
return self.post_data2(server.current_settings.get("cloud_service_id"),er)
class CloudOTA(object):
def __init__(self) -> None:
pass