import ujson
import utime
import _thread
from umqtt import MQTTClient
from usr.modules.logging import getLogger
from usr.modules.common import CloudObservable
log = getLogger(__name__)
class MqttIot(CloudObservable):
"""This is a class for universal mqtt iot.
This class extend CloudObservable.
This class has the following functions:
1. Cloud connect and disconnect
2. Publish data to cloud
3. Subscribe data from cloud
Attribute:
pub_topic_dict: topic dict for publish dtu through data
sub_topic_dict: topic dict for subscribe cloud through data
conn_type:cloud name
Run step:
1. cloud = MqttIot(server, qos, port, clean_session, client_id, pub_topic, sub_topic)
2. cloud.addObserver(RemoteSubscribe)
3. cloud.init()
4. cloud.post_data(data)
5. cloud.close()
"""
def __init__(self, server, qos, port, clean_session, client_id, pub_topic=None, sub_topic=None, life_time=120):
"""
1. Init parent class CloudObservable
2. Init cloud connect params and topic
"""
super().__init__()
self.conn_type = "mqtt"
self.__pk = None
self.__ps = None
self.__dk = None
self.__ds = None
self.__server = server
self.__qos = qos
self.__port = port
self.__mqtt = None
self.__clean_session = clean_session
self.__life_time = life_time
self.__client_id = client_id
self.__password = None
if pub_topic == None:
self.pub_topic_dict = {"0": "/python/mqtt/pub"}
else:
self.pub_topic_dict = pub_topic
if sub_topic == None:
self.sub_topic_dict = {"0": "/python/mqtt/sub"}
else:
self.sub_topic_dict = sub_topic
def __subscribe_topic(self):
for id, usr_sub_topic in self.sub_topic_dict.items():
if self.__mqtt.subscribe(usr_sub_topic, qos=0) == -1:
log.error("Topic [%s] Subscribe Falied." % usr_sub_topic)
def __sub_cb(self, topic, data):
"""mqtt subscribe topic callback
Parameter:
topic: topic info
data: response dictionary info
"""
topic = topic.decode()
try:
data = ujson.loads(data)
except:
pass
try:
self.notifyObservers(self, *("raw_data", {"topic":topic, "data":data} ) )
except Exception as e:
log.error("{}".format(e))
def __listen(self):
while True:
self.__mqtt.wait_msg()
utime.sleep_ms(100)
def __start_listen(self):
"""Start a new thread to listen to the cloud publish
"""
_thread.start_new_thread(self.__listen, ())
def init(self, enforce=False):
"""mqtt connect and subscribe topic
Parameter:
enforce:
True: enfore cloud connect and subscribe topic
False: check connect status, return True if cloud connected
Return:
Ture: Success
False: Failed
"""
log.debug("[init start] enforce: %s" % enforce)
if enforce is False and self.__mqtt is not None:
log.debug("self.get_status(): %s" % self.get_status())
if self.get_status():
return True
if self.__mqtt is not None:
self.close()
log.debug("mqtt init. self.__client_id: %s, self.__password: %s, self.__dk: %s, self.__ds: %s" % (self.__client_id, self.__password, self.__dk, self.__ds))
self.__mqtt = MQTTClient(client_id=self.__client_id, server=self.__server, port=self.__port,
user=self.__dk, password=self.__password, keepalive=self.__life_time, ssl=False)
try:
self.__mqtt.connect(clean_session=self.__clean_session)
except Exception as e:
log.error("mqtt connect error: %s" % e)
else:
self.__mqtt.set_callback(self.__sub_cb)
self.__subscribe_topic()
log.debug("mqtt n_subscribe_topic")
self.__start_listen()
log.debug("mqtt start.")
log.debug("self.get_status(): %s" % self.get_status())
if self.get_status():
return True
else:
return False
def close(self):
self.__mqtt.disconnect()
def get_status(self):
"""Get mqtt connect status
Return:
True -- connect success
False -- connect falied
"""
try:
return True if self.__mqtt.get_mqttsta() == 0 else False
except:
return False
def through_post_data(self, data, topic_id):
try:
self.__mqtt.publish(self.pub_topic_dict[topic_id], data, self.__qos)
except Exception:
log.error("mqtt publish topic %s failed. data: %s" % (self.pub_topic_dict[topic_id], data))
return False
else:
return True
def post_data(self, data):
pass
def ota_request(self):
pass
def ota_action(self, action, module=None):
pass
def device_report(self):
pass