# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #!/usr/bin/env python # -*- coding: utf-8 -*- """ @file :dtu_transaction.py @author :elian.wang@quectel.com @brief :Dtu transaction related interfaces @version :0.1 @date :2022-08-04 11:33:23 @copyright :Copyright (c) 2022 """ import log import sim import net import usys import ujson import utime import modem import _thread from misc import Power from usr.modules.common import Singleton from usr.modules.logging import getLogger from usr.modules.serial import Serial from usr.modules.remote import RemotePublish from usr.modules.history import History from usr.settings import settings from usr.settings import PROJECT_NAME, PROJECT_VERSION, DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION log = getLogger(__name__) class DownlinkTransaction(Singleton): """Data downlink:Receive data from the cloud and send it to serial """ def __init__(self): self.__serial = None def add_module(self, module, callback=None): if isinstance(module, Serial): self.__serial = module return True return False def __get_sub_topic_id(self, topic): """Locate the topic id from the cloud setting Args: topic (str): topic in mqtt protocol Returns: str: topic id """ cloud_name = settings.current_settings["system_config"]["cloud"] cloud_config = settings.current_settings.get(cloud_name + "_config") if cloud_config == None: raise Exception("Cloud config parameter error") for k, v in cloud_config.get("subscribe").items(): if topic == v: return k def downlink_main(self, *args, **kwargs): """Parsing cloud data, send to serial port Args: args (tuple): Not use kwargs (dict): The data received by the cloud,contains topic and data """ # Get mqtt protocol message id cloud_type = settings.current_settings["system_config"]["cloud"] if cloud_type in ["aliyun", "txyun", "hwyun", "mqtt_private_cloud"]: msg_id = self.__get_sub_topic_id(kwargs.get("topic")) if msg_id == None: raise Exception("Not found correct topic id") elif cloud_type == "quecthing": msg_id = kwargs.get("pkgid") elif cloud_type == "tcp_private_cloud": msg_id = None else: raise Exception("This cloud is not support now") if isinstance(kwargs["data"], bytes): data = kwargs["data"].decode() elif isinstance(kwargs["data"], dict): data = ujson.dumps(kwargs["data"]) elif isinstance(kwargs["data"], str): data = kwargs["data"] else: data = str(kwargs["data"]) if cloud_type == "tcp_private_cloud": packed_data = data else: packed_data = "%s,%s,%s".encode('utf-8') % (str(msg_id), str(len(data)), data) # Send packed data through serial self.__serial.write(packed_data) class OtaTransaction(Singleton): """Device firmware OTA and project file OTA transaction """ def __init__(self): self.__remote_pub = None def __remote_ota_check(self): if not self.__remote_pub: raise TypeError("self.__remote_pub is not registered.") return self.__remote_pub.cloud_ota_check() def __remote_ota_action(self, action, module): if not self.__remote_pub: raise TypeError("self.__remote_pub is not registered.") return self.__remote_pub.cloud_ota_action(action, module) def __remote_device_report(self): if not self.__remote_pub: raise TypeError("self.__remote_pub is not registered.") return self.__remote_pub.cloud_device_report() def add_module(self, module, callback=None): if isinstance(module, RemotePublish): self.__remote_pub = module return True return False def ota_check(self): """After powering on, release module information and check for update """ print("ota_check") try: if settings.current_settings["system_config"]["base_function"]["fota"]: self.__remote_ota_check() self.__remote_device_report() utime.sleep(1) except Exception as e: log.error("periodic_ota_check fault", e) def event_ota_plain(self, *args, **kwargs): """Determine the parameters and perform the OTA plan Args: args (tuple): Ota parameters sent from the cloud kwargs (dict): None """ log.debug("ota_plain args: %s, kwargs: %s" % (str(args), str(kwargs))) current_settings = settings.get() if current_settings["system_config"]["cloud"] == "quecthing": if args and args[0]: if args[0][0] == "ota_cfg": module = args[0][1].get("componentNo") target_version = args[0][1].get("targetVersion") if module == DEVICE_FIRMWARE_NAME and current_settings["system_config"]["base_function"]["fota"] == True: source_version = DEVICE_FIRMWARE_VERSION elif module == PROJECT_NAME and current_settings["system_config"]["base_function"]["sota"] == True: source_version = PROJECT_VERSION else: return if target_version != source_version: self.__remote_ota_action(action=1, module=module) elif current_settings["system_config"]["cloud"] == "aliyun": if args and args[0]: if args[0][0] == "ota_cfg": module = args[0][1].get("module") target_version = args[0][1].get("version") if module == DEVICE_FIRMWARE_NAME and current_settings["system_config"]["base_function"]["fota"] == 1: source_version = DEVICE_FIRMWARE_VERSION elif module == PROJECT_NAME and current_settings["system_config"]["base_function"]["sota"] == 1: source_version = PROJECT_VERSION else: return if target_version != source_version: self.__remote_ota_action(action=1, module=module) else: log.error("Current Cloud Not Supported OTA!") class UplinkTransaction(Singleton): """Data uplink: read data from the serial and send it to cloud """ def __init__(self): self.__remote_pub = None self.__serial = None self.__history = None self.__gui_tools_interac = None self.__parse_data = "" self.__send_to_cloud_data = [] def __remote_post_data(self, data=None, topic_id=None): if not self.__remote_pub: raise TypeError("self.__remote_pub is not registered.") return self.__remote_pub.post_data(data, topic_id) def __get_pub_topic_id_list(self): """Get the publish topic id list in setting Returns: str: publist topic id list """ cloud_name = settings.current_settings["system_config"]["cloud"] cloud_config = settings.current_settings.get(cloud_name + "_config") if cloud_config == None: raise Exception("Cloud config parameter error") elif cloud_name == "quecthing": return ["0"] else: return cloud_config.get("subscribe").keys() def __parse(self): """Recursive parse uart data """ params_list = self.__parse_data.split(",", 2) if len(params_list) < 3: # The received data is not a complete frame return topic_id = params_list[0] data_len = params_list[1] msg_data = params_list[2] pub_topic_id_list = self.__get_pub_topic_id_list() if int(data_len) > len(msg_data): # The received data is not a complete frame return elif int(data_len) < len(msg_data): # The received data may contain part of the data of another frame if topic_id in pub_topic_id_list: self.__send_to_cloud_data.append((topic_id, msg_data[:int(data_len)])) # Frame format: ,, frame_len = len(topic_id)+len(data_len)+int(data_len)+2 self.__parse_data = self.__parse_data[frame_len:] self.__parse() else: self.__parse_data = "" # Read data format eroor,restart read return else: self.__parse_data = "" if topic_id in pub_topic_id_list: self.__send_to_cloud_data.append((topic_id, msg_data)) else: return def __mqtt_protocol_uart_data_parse(self, data): """When cloud is mqtt protocol, parse uart data. Args: data (str): Data read from uart Returns: list: topic_id and data tuple list """ self.__parse_data += data self.__send_to_cloud_data = [] self.__parse() def __send_mqtt_data(self, data): for send_data in data: self.__remote_post_data(data=send_data[1], topic_id=send_data[0]) utime.sleep_ms(10) def __uplink_data(self, data): """Parsing uart data, send data to cloud Args: data (bytes): data read from uart """ gui_tool_ack = self.__gui_tools_interac.parse_serial_data(data) if gui_tool_ack: # GUI tools command data self.__serial.write(gui_tool_ack) return if settings.current_settings["system_config"]["cloud"] == "tcp_private_cloud": self.__remote_post_data(data=data) else: try: self.__mqtt_protocol_uart_data_parse(data) if len(self.__send_to_cloud_data) != 0: _thread.start_new_thread(self.__send_mqtt_data, (self.__send_to_cloud_data,)) except Exception as e: log.error(e) def __post_history_data(self, data): """Post history data to cloud Args: data (str): history data Returns: True: Successfully post False:Failure to post """ log.info("post_history_data") try: if settings.current_settings["system_config"]["cloud"] == "tcp_private_cloud_config": return self.__remote_post_data(data=data) else: # In this case, topic id is not used specified, topic id default 0. return self.__remote_post_data(data=data, topic_id=0) except Exception as e: log.error(e) return False def add_module(self, module, callback=None): if isinstance(module, RemotePublish): self.__remote_pub = module return True elif isinstance(module, Serial): self.__serial = module return True elif isinstance(module, History): self.__history = module return True elif isinstance(module, GuiToolsInteraction): self.__gui_tools_interac = module return True return False def uplink_main(self): """Read serial data, parse and upload to the cloud """ while 1: # Read uart data read_byte = self.__serial.read(nbytes=1024, timeout=100) if read_byte: try: self.__uplink_data(read_byte) except Exception as e: usys.print_exception(e) log.error("Parse uart data error: %s" % e) def report_history(self): """Report history data to cloud Returns: boolen: True: Successfully post False:Failure to post """ if not self.__history: raise TypeError("self.__history is not registered.") res = True hist = self.__history.read() print("hist[data]:", hist["data"]) if hist["data"]: pt_count = 0 for i, data in enumerate(hist["data"]): pt_count += 1 if not self.__post_history_data(data): res = False break hist["data"] = hist["data"][pt_count:] if hist["data"]: # Flush data in hist-dictionary to tracker_data.hist file. self.__history.write(hist["data"]) return res class GuiToolsInteraction(): def __init__(self): self.__query_command = { 0: "get_imei", 1: "get_number", 2: "get_csq", 3: "get_cur_config", 4: "get_iccid", } self.__basic_setting_command = { 255: "restart", 50: "set_fota", 51: "set_sota", 52: "set_history_data", 53: "set_uart_conf", 54: "set_cloud_conf", 55: "retore_factory_setting", } def __get_imei(self, code, data): return {"code": code, "data": modem.getDevImei(), "status": 1} def __get_number(self, code, data): log.info(sim.getPhoneNumber()) return {"code": code, "data": sim.getPhoneNumber(), "status": 1} def __get_csq(self, code, data): return {"code": code, "data": net.csqQueryPoll(), "status": 1} def __get_cur_config(self, code, data): log.info("get_cur_config") current_settings = settings.get() return {"code": code, "data": current_settings, "status": 1} def __restart(self, code, data): log.info("Restarting...") Power.powerRestart() def __set_fota(self, code, data): try: settings.set("fota", data["fota"]) settings.save() return {"code": code, "status": 1} except Exception as e: log.error("e = {}".format(e)) return {"code": code, "status": 0} def __set_sota(self, code, data): try: settings.set("sota", data["sota"]) settings.save() return {"code": code, "status": 1} except Exception as e: log.error("e = {}".format(e)) return {"code": code, "status": 0} def __set_history_data(self, code, data): try: settings.set("offline_storage", data["offline_storage"]) settings.save() return {"code": code, "status": 1} except Exception as e: log.error("e = {}".format(e)) return {"code": code, "status": 0} def __set_uart_conf(self, code, data): try: uart_conf = data["uart_config"] if not isinstance(uart_conf, dict): raise Exception("Data type error") settings.set("uart_config", uart_conf) settings.save() return {"code": code, "status": 1} except Exception as e: log.error("e = {}".format(e)) return {"code": code, "status": 0} def __set_cloud_conf(self, code, data): try: cloud_type = data["cloud_type"] cloud_conf = data["cloud_conf"] if not isinstance(cloud_conf, dict): raise Exception("Data type error") settings.set(cloud_type+"_config", cloud_conf) settings.set("cloud", cloud_type) settings.save() return {"code": code, "status": 1} except Exception as e: log.error("e = {}".format(e)) return {"code": code, "status": 0} def __retore_factory_setting(self, code, data): try: settings.reset() Power.powerRestart() return {"code": code, "status": 1} except Exception as e: log.error("e = {}".format(e)) return {"code": code, "status": 0} def __exec_command_code(self, cmd_code, data=None): if cmd_code in self.__query_command.keys(): try: cmd = "__" + self.__query_command.get(cmd_code) func = getattr(self, cmd) ret = func(cmd_code, data) except Exception as e: log.error("search_command_func_code_list:", e) elif cmd_code in self.__basic_setting_command.keys(): try: cmd = "__" + self.__basic_setting_command.get(cmd_code) func = getattr(self, cmd) ret = func(cmd_code, data) except Exception as e: log.error("basic_setting_command_list:", e) else: log.error("Command code error") ret = {"code": cmd_code, "status": 0, "error": "Command code error"} return ret def parse_serial_data(self, serial_data): """Parse uart data in the format specified by the GUI Args: gui_data (bytes): data read from uart sid (str): uart channel id Returns: True: GUI data was successfully obtained False: get GUI data failed """ print("serial data:", serial_data) data_list = serial_data.split(",", 2) if len(data_list) != 3: log.info("DTU CMD list length validate fail. CMD Parse end.") return "" gui_code = data_list[0] if gui_code != "99": return "" data_length = data_list[1] msg_data = data_list[2] try: data_len_int = int(data_length) except: log.error("DTU CMD data error.") return "" if len(msg_data) > data_len_int: log.error("DTU CMD length validate failed.") return "" elif len(msg_data) < data_len_int: log.info("Msg length shorter than length") return "" try: data = ujson.loads(msg_data) except Exception as e: log.error(e) return "" cmd_code = data.get("cmd_code") # No command code was obtained if cmd_code is None: return "" params_data = data.get("data") rec = self.__exec_command_code(int(cmd_code), data=params_data) rec_str = ujson.dumps(rec) rec_format = "99,{},{}".format(len(rec_str), rec_str) print("GUI CMD SUCCESS") return rec_format