mirror of
https://gitee.com/qpy-solutions/tracker-v2.git
synced 2025-05-19 11:08:26 +08:00
update: settings; device error handling; device loction check; modules.
This commit is contained in:
parent
85ae15f9df
commit
849a4b3f89
@ -1 +1 @@
|
|||||||
Subproject commit 02f26f83e73cdf090729a32454c600c01225342c
|
Subproject commit 853e2ccafe0a6a715bfd86f4134759ecdbf00c16
|
@ -124,16 +124,16 @@ class Settings(Singleton):
|
|||||||
elif opt in ("sw_ota", "sw_ota_auto_upgrade", "sw_voice_listen", "sw_voice_record",
|
elif opt in ("sw_ota", "sw_ota_auto_upgrade", "sw_voice_listen", "sw_voice_record",
|
||||||
"sw_fault_alert", "sw_low_power_alert", "sw_over_speed_alert",
|
"sw_fault_alert", "sw_low_power_alert", "sw_over_speed_alert",
|
||||||
"sw_sim_abnormal_alert", "sw_disassemble_alert", "sw_drive_behavior_alert"):
|
"sw_sim_abnormal_alert", "sw_disassemble_alert", "sw_drive_behavior_alert"):
|
||||||
if not isinstance(val, bool):
|
if not isinstance(val, bool) and val not in (0, 1):
|
||||||
return False
|
return False
|
||||||
self.current_settings["user_cfg"][opt] = val
|
self.current_settings["user_cfg"][opt] = bool(val)
|
||||||
return True
|
return True
|
||||||
elif opt == "ota_status":
|
elif opt == "ota_status":
|
||||||
if not isinstance(val, dict):
|
if not isinstance(val, dict):
|
||||||
return False
|
return False
|
||||||
self.current_settings["user_cfg"][opt] = val
|
self.current_settings["user_cfg"][opt] = val
|
||||||
return True
|
return True
|
||||||
elif opt in ("user_ota_action", "drive_behavior_code"):
|
elif opt in ("user_ota_action", "drive_behavior_code", "loc_gps_read_timeout", "work_mode_timeline"):
|
||||||
if not isinstance(val, int):
|
if not isinstance(val, int):
|
||||||
return False
|
return False
|
||||||
self.current_settings["sys"][opt] = val
|
self.current_settings["sys"][opt] = val
|
||||||
|
@ -54,6 +54,9 @@ class Collector(Singleton):
|
|||||||
self.__gps_match = GPSMatch()
|
self.__gps_match = GPSMatch()
|
||||||
self.__gps_parse = GPSParse()
|
self.__gps_parse = GPSParse()
|
||||||
|
|
||||||
|
self.__net_status = False
|
||||||
|
self.__loc_status = False
|
||||||
|
|
||||||
def __format_loc_method(self, data):
|
def __format_loc_method(self, data):
|
||||||
"""Decimal to Binary for loc method
|
"""Decimal to Binary for loc method
|
||||||
The first is gps, second is cell, third is wifi from binary right.
|
The first is gps, second is cell, third is wifi from binary right.
|
||||||
@ -349,6 +352,8 @@ class Collector(Singleton):
|
|||||||
|
|
||||||
net_status = self.__devicecheck.net()
|
net_status = self.__devicecheck.net()
|
||||||
location_status = self.__devicecheck.location()
|
location_status = self.__devicecheck.location()
|
||||||
|
self.__net_status = True if net_status == (3, 1) else False
|
||||||
|
self.__loc_status = location_status
|
||||||
temp_status = self.__devicecheck.temp()
|
temp_status = self.__devicecheck.temp()
|
||||||
light_status = self.__devicecheck.light()
|
light_status = self.__devicecheck.light()
|
||||||
triaxial_status = self.__devicecheck.triaxial()
|
triaxial_status = self.__devicecheck.triaxial()
|
||||||
@ -422,17 +427,18 @@ class Collector(Singleton):
|
|||||||
device_data.update({"loc_method": self.__format_loc_method(current_settings["user_cfg"]["loc_method"])})
|
device_data.update({"loc_method": self.__format_loc_method(current_settings["user_cfg"]["loc_method"])})
|
||||||
|
|
||||||
# Get cloud location data
|
# Get cloud location data
|
||||||
loc_info = self.__read_location()
|
if self.__loc_status:
|
||||||
cloud_loc = self.__read_cloud_location(loc_info)
|
loc_info = self.__read_location()
|
||||||
device_data.update(cloud_loc)
|
cloud_loc = self.__read_cloud_location(loc_info)
|
||||||
gps_data = loc_info.get(_loc_method.gps)
|
device_data.update(cloud_loc)
|
||||||
if gps_data:
|
gps_data = loc_info.get(_loc_method.gps)
|
||||||
gga_satellite = self.__gps_parse.GxGGA_satellite_num(self.__gps_match.GxGGA(gps_data))
|
if gps_data:
|
||||||
log.debug("GxGGA Satellite Num %s" % gga_satellite)
|
gga_satellite = self.__gps_parse.GxGGA_satellite_num(self.__gps_match.GxGGA(gps_data))
|
||||||
gsv_satellite = self.__gps_parse.GxGSV_satellite_num(self.__gps_match.GxGSV(gps_data))
|
log.debug("GxGGA Satellite Num %s" % gga_satellite)
|
||||||
log.debug("GxGSV Satellite Num %s" % gsv_satellite)
|
gsv_satellite = self.__gps_parse.GxGSV_satellite_num(self.__gps_match.GxGSV(gps_data))
|
||||||
# Get gps speed
|
log.debug("GxGSV Satellite Num %s" % gsv_satellite)
|
||||||
device_data.update(self.__check_speed(gps_data))
|
# Get gps speed
|
||||||
|
device_data.update(self.__check_speed(gps_data))
|
||||||
|
|
||||||
# Get battery energy
|
# Get battery energy
|
||||||
battery_data = self.__read_battery()
|
battery_data = self.__read_battery()
|
||||||
@ -580,7 +586,9 @@ class Collector(Singleton):
|
|||||||
|
|
||||||
def event_query(self, *args, **kwargs):
|
def event_query(self, *args, **kwargs):
|
||||||
"""Hanle quering object model downlink message from cloud."""
|
"""Hanle quering object model downlink message from cloud."""
|
||||||
return self.device_data_report()
|
power_switch = kwargs.get("power_switch", 1)
|
||||||
|
power_switch = bool(power_switch)
|
||||||
|
return self.device_data_report(power_switch=power_switch)
|
||||||
|
|
||||||
def event_ota_plain(self, *args, **kwargs):
|
def event_ota_plain(self, *args, **kwargs):
|
||||||
"""Hanle OTA plain from cloud."""
|
"""Hanle OTA plain from cloud."""
|
||||||
@ -649,11 +657,12 @@ class Collector(Singleton):
|
|||||||
log.debug("RRPC data: %s" % data)
|
log.debug("RRPC data: %s" % data)
|
||||||
self.__controller.remote_rrpc_response(message_id, data)
|
self.__controller.remote_rrpc_response(message_id, data)
|
||||||
|
|
||||||
def power_switch(self, onoff=None):
|
def power_switch(self, onoff=1):
|
||||||
"""Control device power"""
|
"""Control device power"""
|
||||||
if not self.__controller:
|
if not self.__controller:
|
||||||
raise TypeError("self.__controller is not registered.")
|
raise TypeError("self.__controller is not registered.")
|
||||||
|
|
||||||
|
onoff = bool(onoff)
|
||||||
self.event_query(power_switch=onoff)
|
self.event_query(power_switch=onoff)
|
||||||
if onoff is False:
|
if onoff is False:
|
||||||
self.__controller.power_down()
|
self.__controller.power_down()
|
||||||
@ -749,30 +758,36 @@ class Collector(Singleton):
|
|||||||
|
|
||||||
def low_engery_option(self, low_energy_method):
|
def low_engery_option(self, low_energy_method):
|
||||||
"""Business option after low energy waking up."""
|
"""Business option after low energy waking up."""
|
||||||
|
log.debug("start low_engery_option")
|
||||||
if not self.__controller:
|
if not self.__controller:
|
||||||
raise TypeError("self.__controller is not registered.")
|
raise TypeError("self.__controller is not registered.")
|
||||||
|
|
||||||
self.report_history()
|
self.report_history()
|
||||||
self.__controller.remote_device_report()
|
report_flag = True
|
||||||
self.__controller.remote_ota_check()
|
|
||||||
current_settings = settings.get()
|
current_settings = settings.get()
|
||||||
if current_settings["user_cfg"]["work_mode"] == UserConfig._work_mode.intelligent:
|
if current_settings["user_cfg"]["work_mode"] == UserConfig._work_mode.intelligent:
|
||||||
speed_info = self.__check_speed()
|
# TODO: Check speed by sensor
|
||||||
if speed_info.get("current_speed") > 0:
|
if self.__loc_status:
|
||||||
self.device_data_report()
|
loc_info = self.__read_location()
|
||||||
else:
|
gps_data = loc_info.get(_loc_method.gps)
|
||||||
self.device_data_report()
|
speed_info = self.__check_speed(gps_data)
|
||||||
|
if speed_info.get("current_speed") <= 0:
|
||||||
|
report_flag = False
|
||||||
|
|
||||||
|
if report_flag is True:
|
||||||
|
self.device_status_check()
|
||||||
|
if self.__net_status:
|
||||||
|
self.__controller.remote_device_report()
|
||||||
|
self.__controller.remote_ota_check()
|
||||||
|
|
||||||
# Check battery low enery power down.
|
# Check battery low enery power down.
|
||||||
self.__check_battery_low_energy_power_down()
|
self.__check_battery_low_energy_power_down()
|
||||||
|
|
||||||
self.__controller.low_energy_start()
|
self.__controller.low_energy_start()
|
||||||
|
|
||||||
if low_energy_method == "PSM":
|
if low_energy_method == "POWERDOWN":
|
||||||
# TODO: PSM option.
|
|
||||||
pass
|
|
||||||
elif low_energy_method == "POWERDOWN":
|
|
||||||
self.__controller.power_down()
|
self.__controller.power_down()
|
||||||
|
log.debug("end low_engery_option")
|
||||||
|
|
||||||
def thing_services(self, data):
|
def thing_services(self, data):
|
||||||
log.debug("thing_services data: %s" % str(data))
|
log.debug("thing_services data: %s" % str(data))
|
||||||
|
@ -57,6 +57,11 @@ class DeviceCheck(object):
|
|||||||
sleep_time = 1
|
sleep_time = 1
|
||||||
|
|
||||||
while retry < 5:
|
while retry < 5:
|
||||||
|
if retry > 0:
|
||||||
|
retry += 1
|
||||||
|
utime.sleep(sleep_time)
|
||||||
|
sleep_time *= 2
|
||||||
|
|
||||||
if current_settings["user_cfg"].get("loc_method"):
|
if current_settings["user_cfg"].get("loc_method"):
|
||||||
loc_method = current_settings["user_cfg"].get("loc_method")
|
loc_method = current_settings["user_cfg"].get("loc_method")
|
||||||
elif current_settings["sys"]["base_cfg"]["LocConfig"]:
|
elif current_settings["sys"]["base_cfg"]["LocConfig"]:
|
||||||
@ -64,13 +69,13 @@ class DeviceCheck(object):
|
|||||||
else:
|
else:
|
||||||
loc_method = 7
|
loc_method = 7
|
||||||
|
|
||||||
gps_data = self.__locator.read(loc_method)
|
loc_info = self.__locator.read(loc_method)
|
||||||
|
for k, v in loc_info.items():
|
||||||
|
gps_data = v
|
||||||
|
if gps_data:
|
||||||
|
break
|
||||||
if gps_data:
|
if gps_data:
|
||||||
break
|
break
|
||||||
else:
|
|
||||||
retry += 1
|
|
||||||
utime.sleep(sleep_time)
|
|
||||||
sleep_time *= 2
|
|
||||||
|
|
||||||
if gps_data:
|
if gps_data:
|
||||||
return True
|
return True
|
||||||
|
Loading…
x
Reference in New Issue
Block a user