# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #!/usr/bin/env python # -*- coding: utf-8 -*- """ @file :tracker_quec.py @author :Jack Sun (jack.sun@quectel.com) @brief : @version :1.0.0 @date :2022-10-31 11:15:57 @copyright :Copyright (c) 2022 """ import gc import utime import modem from machine import I2C from misc import Power from usr.modules.battery import Battery from usr.modules.history import History from usr.modules.logging import getLogger from usr.modules.net_manage import NetManage from usr.modules.mpower import LowEnergyManage from usr.modules.temp_humidity_sensor import TempHumiditySensor from usr.modules.quecthing import QuecObjectModel, QuecThing, QuecOTA from usr.modules.location import NMEAParse, GPS, CellLocator, WiFiLocator from usr.settings_user import UserConfig from usr.settings import Settings, PROJECT_NAME, PROJECT_VERSION, DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION, LOWENERGYMAP log = getLogger(__name__) class Tracker: def __init__(self): self.__settings = None self.__cell = None self.__wifi = None self.__gps = None self.__nmea_parse = None self.__battery = None self.__history = None self.__net_manage = None self.__quec_ota = None self.__quec_cloud = None self.__quec_objmodel = None self.__low_energy_manage = None self.__temp_humidity_sensor = None self.__wakeup = True def __format_loc_method(self, data): loc_method = "%04d" % int(bin(data)[2:]) _loc_method = { "gps": bool(int(loc_method[-1])), "cell": bool(int(loc_method[-2])), "wifi": bool(int(loc_method[-3])), } return _loc_method def __get_local_time(self): return str(utime.mktime(utime.localtime()) * 1000) def __get_location(self): res = {} loc_method = self.__settings.get()["user_cfg"]["loc_method"] if loc_method & UserConfig._loc_method.gps: gps_res = self.__gps.read(retry=300) if gps_res[0] == 0: self.__nmea_parse.set_gps_data(gps_res[1]) res['gps'] = [self.__nmea_parse.GxRMC, self.__nmea_parse.GxGGA, self.__nmea_parse.GxVTG] return res if loc_method & UserConfig._loc_method.cell: res["cell"] = ["LBS"] return res if loc_method & UserConfig._loc_method.wifi: res["wifi"] = ["LBS"] return res return res def __init_report_data(self, power_switch=True): _data = {} _settings = self.__settings.get() _data.update({ "power_switch": power_switch, "local_time": self.__get_local_time(), "gps_mode": _settings["loc_cfg"]["gps_cfg"]["gps_mode"] }) _data.update(_settings["user_cfg"]) _data.update({"loc_method": self.__format_loc_method(_settings["user_cfg"]["loc_method"])}) _data.update(self.__get_location()) temperature, humidity = self.__temp_humidity_sensor.read() _data.update({ "temperature": temperature if temperature is not None else "", "humidity": humidity if humidity is not None else "", }) self.__battery.set_temp(temperature if temperature is not None else 20) _data.update({ "energy": self.__battery.energy, "voltage": self.__battery.voltage, }) _data.update(self.__init_alarm_data(_data)) log.debug("report_data: %s" % _data) return _data def __init_alarm_data(self, data): alarm_data = {} _settings = self.__settings.get() if _settings["user_cfg"]["sw_low_power_alert"]: if data["energy"] <= _settings["user_cfg"]["low_power_alert_threshold"]: alarm_data["low_power_alert"] = {"local_time": self.__get_local_time()} if data["energy"] <= _settings["user_cfg"]["low_power_shutdown_threshold"]: alarm_data["power_switch"] = False self.__wakeup = False if _settings["user_cfg"]["sw_over_speed_alert"] and data.get("gps"): vtg_data = self.__nmea_parse.GxVTGData alarm_data["current_speed"] = float(vtg_data[7]) if vtg_data else -1.0 if alarm_data["current_speed"] >= _settings["user_cfg"]["over_speed_threshold"]: alarm_data["over_speed_alert"] = {"local_time": self.__get_local_time()} net_status = self.__net_manage.status alarm_data["device_module_status"] = {} alarm_data["device_module_status"]["net"] = 1 if net_status == (3, 1) else 0 if self.__gps or self.__cell or self.__wifi: alarm_data["device_module_status"]["location"] = 1 if data.get("gps") or data.get("cell") or data.get("wifi") else 0 if self.__temp_humidity_sensor: alarm_data["device_module_status"]["temp_sensor"] = 1 if data.get("temperature") is not None and data.get("humidity") is not None else 0 # TODO: Add light-Sensor, G-Senor, Mike modules. if 0 in alarm_data["device_module_status"].values(): alarm_data["fault_alert"] = {"local_time": self.__get_local_time()} if net_status[0] == 1: alarm_data["sim_abnormal_alert"] = {"local_time": self.__get_local_time()} return alarm_data def __data_report(self, data): res = False if self.__cloud_conn_status(): for _method in ["gps", "cell", "wifi"]: if data.get(_method): _res = self.__quec_cloud.loc_report(data[_method], mode=_method) log.debug("Quec %s report %s" % (_method, "success" if _res else "falied")) if _res: data.pop(_method) _data = self.__quec_objmodel.convert_to_server(data) log.debug("objmodel_report data: %s" % str(_data)) res = self.__quec_cloud.objmodel_report(_data) log.debug("Quec object model report %s." % ("success" if res else "falied")) if not res: self.__history.write([data]) return res def __history_report(self): if self.__history and self.__quec_cloud and self.__quec_objmodel: history_data = self.__history.read()["data"] if history_data: for index, _data in enumerate(history_data): if not self.__data_report(data=_data): break self.__history.write(history_data[index + 1:]) def __set_config(self, data): _settings = self.__settings.get() for k, v in data.items(): mode = "" if k in _settings["user_cfg"].keys(): mode = "user_cfg" if k == "user_ota_action": if _settings["user_cfg"]["sw_ota"] and not _settings["user_cfg"]["sw_ota_auto_upgrade"]: if not (_settings["user_cfg"]["ota_status"]["upgrade_status"] == 1 and _settings["user_cfg"]["user_ota_action"] == -1): continue if k == "loc_method": v = (int(v.get("wifi", 0)) << 2) + (int(v.get("cell", 0)) << 1) + int(v.get("gps", 0)) res = self.__settings.set(mode, k, v) if k == "user_ota_action": self.__quec_cloud.ota_search() elif k in _settings["cloud_cfg"].keys(): mode = "cloud_cfg" res = self.__settings.set(mode, k, v) else: log.warn("Key %s is not find in settings. Value: %s" % (k, v)) if mode: log.debug("Settings set %s %s to %s %s" % (mode, k, v, "success" if res else "falied")) self.__settings.save() def __set_objmodel(self, data): data = self.__quec_objmodel.convert_to_client(data) log.debug("set_objmodel data: %s" % str(data)) self.__set_config(data) if "power_switch" in data.keys(): _data = self.__init_report_data(power_switch=bool(data["power_switch"])) self.__data_report(_data) if bool(data["power_switch"]) is False: Power.powerDown() if "power_restart" in data.keys(): _data = self.__init_report_data(power_switch=False) self.__data_report(_data) Power.powerRestart() if "work_cycle_period" in data.keys(): self.__low_energy_manage.stop() self.__low_energy_manage.set_period(data["work_cycle_period"]) method = self.init_low_energy_method(data["work_cycle_period"]) self.__low_energy_manage.set_method(method) self.__low_energy_manage.set_callback(self.running) self.__low_energy_manage.start() def __query_objmodel(self, data): objmodel_codes = [self.__quec_objmodel.id_code.get(i) for i in data if self.__quec_objmodel.id_code.get(i)] log.debug("query_objmodel ids: %s, codes: %s" % (str(data, str(objmodel_codes)))) report_data = self.__init_report_data() self.__data_report(report_data) def __set_ota_status(self, target_module, target_version, status): ota_status = self.__settings.get()["user_cfg"]["ota_status"] ota_info = {} pass_flag = False if ota_status["sys_target_version"] == "--" and ota_status["app_target_version"] == "--": if target_module == DEVICE_FIRMWARE_NAME: if target_version != DEVICE_FIRMWARE_VERSION: ota_info["upgrade_module"] = 1 ota_info["sys_target_version"] = target_version else: pass_flag = True elif target_module == PROJECT_NAME: if target_version != PROJECT_VERSION: ota_info["upgrade_module"] = 2 ota_info["app_target_version"] = target_version else: pass_flag = True if pass_flag is False: ota_info["upgrade_status"] = status ota_status.update(ota_info) self.__set_config({"ota_status": ota_status}) def __ota_plain_check(self, target_module, target_version, battery_limit, min_signal_intensity, use_space): _settings = self.__settings.get() if _settings["user_cfg"]["sw_ota"]: self.__set_ota_status(target_module, target_version, 1) if _settings["user_cfg"]["sw_ota_auto_upgrade"] or _settings["user_cfg"]["user_ota_action"] != -1: if _settings["user_cfg"]["sw_ota_auto_upgrade"]: _ota_action = 1 else: if _settings["user_cfg"]["user_ota_action"] != -1: _ota_action = _settings["user_cfg"]["user_ota_action"] else: return upgrade_module = 1 if target_module == DEVICE_FIRMWARE_NAME else 2 source_version = DEVICE_FIRMWARE_VERSION if target_module == DEVICE_FIRMWARE_NAME else PROJECT_VERSION if _settings['user_cfg']['ota_status']['upgrade_module'] == upgrade_module and \ _settings['user_cfg']['ota_status']['upgrade_status'] <= 1 and \ target_version != source_version: if _ota_action == 1: # TODO: Check battery_limit, min_signal_intensity, use_space pass self.__quec_cloud.ota_action(action=_ota_action) else: self.__quec_cloud.ota_action(action=0) def __ota(self, errcode, data): if errcode == 10700 and data: data = eval(data) target_module = data[0] # source_version = data[1] target_version = data[2] battery_limit = data[3] min_signal_intensity = data[4] use_space = data[5] self.__ota_plain_check(target_module, target_version, battery_limit, min_signal_intensity, use_space) elif errcode == 10701: data = eval(data) target_module = data[0] length = data[1] md5 = data[2] self.__set_ota_status(None, None, 2) self.__quec_ota.set_ota_info(length, md5) elif errcode == 10702: self.__set_ota_status(None, None, 2) elif errcode == 10703: data = eval(data) target_module = data[0] length = data[1] start_addr = data[2] piece_length = data[3] self.__set_ota_status(None, None, 2) self.__quec_ota.start_ota(start_addr, piece_length) elif errcode == 10704: self.__set_ota_status(None, None, 3) elif errcode == 10705: self.__set_ota_status(None, None, 4) elif errcode == 10706: self.__set_ota_status(None, None, 4) def __cloud_conn_status(self): if not self.__quec_cloud.status: self.__net_manage.reconnect() if not self.__quec_cloud.status: disconn_res = self.__quec_cloud.disconnect() conn_res = self.__quec_cloud.connect() log.debug("Quec cloud reconnect. disconnect: %s connect: %s" % (disconn_res, conn_res)) return self.__quec_cloud.status def __init_ota_status(self): _settings = self.__settings.get() ota_status = _settings["user_cfg"]["ota_status"] log.debug("ota_status_init ota_status: %s" % str(ota_status)) save_flag = False if ota_status["sys_target_version"] != "--": if ota_status["sys_target_version"] == DEVICE_FIRMWARE_VERSION: if ota_status["upgrade_status"] != 3: ota_status["upgrade_status"] = 3 save_flag = True else: if ota_status["upgrade_status"] != 4: ota_status["upgrade_status"] = 4 save_flag = True if ota_status["app_target_version"] != "--": if ota_status["app_target_version"] == PROJECT_VERSION: if ota_status["upgrade_status"] != 3: ota_status["upgrade_status"] = 3 save_flag = True else: if ota_status["upgrade_status"] != 4: ota_status["upgrade_status"] = 4 save_flag = True if save_flag: self.__set_config({"ota_status": ota_status}) def add_module(self, module): if isinstance(module, Settings): self.__settings = module return True elif isinstance(module, Battery): self.__battery = module return True elif isinstance(module, GPS): self.__gps = module return True elif isinstance(module, History): self.__history = module return True elif isinstance(module, NetManage): self.__net_manage = module return True elif isinstance(module, TempHumiditySensor): self.__temp_humidity_sensor = module return True elif isinstance(module, LowEnergyManage): self.__low_energy_manage = module return True elif isinstance(module, QuecObjectModel): self.__quec_objmodel = module return True elif isinstance(module, QuecThing): self.__quec_cloud = module return True elif isinstance(module, QuecOTA): self.__quec_ota = module return True elif isinstance(module, NMEAParse): self.__nmea_parse = module return True elif isinstance(module, CellLocator): self.__cell = module return True elif isinstance(module, WiFiLocator): self.__wifi = module return True return False def running(self, args): # Init device ota infomation. self.__init_ota_status() # Check device net. if not self.__net_manage.status: if self.__net_manage.wait_connect() != (3, 1): self.__net_manage.reconnect() _settings = self.__settings.get() # QuecIot connect and save device secret. if _settings["cloud_cfg"]["dk"] and not _settings["cloud_cfg"]["ds"] and self.__quec_cloud.device_secret: self.__set_config({"ds": self.__quec_cloud.device_secret}) # Histort report self.__history_report() # Data report power_switch = True if args != "POWERDOWN" else False report_data = self.__init_report_data(power_switch=power_switch) self.__data_report(report_data) # OTA status reset if _settings["user_cfg"]["ota_status"]["upgrade_status"] in (3, 4): cfg_data = { "ota_status": { "sys_current_version": DEVICE_FIRMWARE_VERSION, "sys_target_version": "--", "app_current_version": PROJECT_VERSION, "app_target_version": "--", "upgrade_module": 0, "upgrade_status": 0, } } if _settings["user_cfg"]["user_ota_action"] != -1: cfg_data["user_ota_action"] = -1 self.__set_config(cfg_data) # Device version report and OTA plain search if self.__cloud_conn_status(): _res = self.__quec_cloud.device_report() log.debug("Quec device report %s" % "success" if _res else "falied") _res = self.__quec_cloud.ota_search() log.debug("Quec ota search %s" % "success" if _res else "falied") # Device need to powerdown and not to wakeup if self.__wakeup: _res = self.__low_energy_manage.start() log.debug("Module start low enenery manage %s." % ("success" if _res else "falied")) else: _res = self.__low_energy_manage.stop() log.debug("Module stop low enenery manage %s." % ("success" if _res else "falied")) # Device powerdown if report_data["power_switch"] is False: Power.powerDown() gc.collect() def execute(self, args): if args[0] == 5 and args[1] == 10200: log.debug("transparent data: %s" % args[1]) elif args[0] == 5 and args[1] == 10210: self.__set_objmodel(args[2]) elif args[0] == 5 and args[1] == 10220: self.__query_objmodel(args[2]) elif args[0] == 7: log.debug("QuecIot OTA errcode[%s] data[%s]" % tuple(args[1:])) self.__ota(*args[1:]) else: log.error("Mode %s is not support. data: %s" % (str(args[0]), str(args[1]))) def init_low_energy_method(self, period): device_model = modem.getDevModel() support_methods = [_method for _method in LOWENERGYMAP.keys() if device_model in LOWENERGYMAP[_method]] method = "NULL" if support_methods: if period >= self.__settings.get()["user_cfg"]["work_mode_timeline"]: if "PSM" in support_methods: method = "PSM" elif "POWERDOWN" in support_methods: method = "POWERDOWN" elif "PM" in support_methods: method = "PM" else: if "PM" in support_methods: method = "PM" log.debug("init_low_energy_method: %s" % method) return method def main(): log.info("PROJECT_NAME: %s, PROJECT_VERSION: %s" % (PROJECT_NAME, PROJECT_VERSION)) log.info("DEVICE_FIRMWARE_NAME: %s, DEVICE_FIRMWARE_VERSION: %s" % (DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION)) # 初始化配置参数模块 settings = Settings() _settings = settings.get() # 初始化电池模块 battery = Battery() # 初始化历史信息模块 history = History() # 初始化网络管理模块 net_manage = NetManage(PROJECT_NAME, PROJECT_VERSION) # 初始化温湿度传感器模块 temp_humidity_sensor = TempHumiditySensor(I2C.I2C1, I2C.STANDARD_MODE) # 初始化低功耗管理模块 low_energy_manage = LowEnergyManage() # 初始化GPS原始数据解析模块 nema_parse = NMEAParse() # 初始化GPS模块 gps = GPS(**_settings["loc_cfg"]["gps_cfg"]) # 初始化基站定位模块 cell = CellLocator(**_settings["loc_cfg"]["cell_cfg"]) # 初始化Wifi定位模块 wifi = WiFiLocator(**_settings["loc_cfg"]["wifi_cfg"]) # 初始化移远云与OTA模块 quec_ota = QuecOTA() # 初始化移远云物模型模块,用于解析云端上下行数据 quec_objmodel = QuecObjectModel() _cloud_cfg = dict(_settings["cloud_cfg"]) _cloud_cfg["fw_name"] = DEVICE_FIRMWARE_NAME _cloud_cfg["fw_version"] = DEVICE_FIRMWARE_VERSION quec_cloud = QuecThing(**_cloud_cfg) # Tracker 实例化对象 tracker = Tracker() # 注册功能模块 tracker.add_module(settings) tracker.add_module(battery) tracker.add_module(history) tracker.add_module(net_manage) tracker.add_module(temp_humidity_sensor) tracker.add_module(low_energy_manage) tracker.add_module(quec_objmodel) tracker.add_module(quec_cloud) tracker.add_module(quec_ota) tracker.add_module(nema_parse) tracker.add_module(gps) tracker.add_module(cell) tracker.add_module(wifi) # 云端设置下行消息回调函数 quec_cloud.set_callback(tracker.execute) quec_cloud.connect() # 低功耗设置唤醒回调函数 low_energy_manage.set_callback(tracker.running) low_energy_manage.set_period(_settings["user_cfg"]["work_cycle_period"]) low_energy_manage.set_method(tracker.init_low_energy_method(_settings["user_cfg"]["work_cycle_period"])) # 启动Tracker业务功能(循环设备状态检测与信息上报, 进入低功耗模式) tracker.running(None) if __name__ == "__main__": main()