mirror of
https://gitee.com/qpy-solutions/tracker-v2.git
synced 2025-05-18 18:48:25 +08:00
add: alert module; update: settings & loaction
This commit is contained in:
parent
c7901a7387
commit
d25a3229e8
@ -1,6 +1,71 @@
|
||||
|
||||
import _thread
|
||||
from queue import Queue
|
||||
from usr import settings
|
||||
from usr.logging import getLogger
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
ALERTCODE = {
|
||||
30001: 'sw_fault_alert',
|
||||
30002: 'sw_low_power_alert',
|
||||
30004: 'sw_sim_out_alert',
|
||||
30005: 'sw_disassemble_alert',
|
||||
# 30006: 'sw_shock_alert', # TODO: NOT USED
|
||||
40000: 'sw_drive_behavior_alert',
|
||||
50001: 'sw_sos_alert',
|
||||
}
|
||||
|
||||
DRIVEBEHAVIORCODE = {
|
||||
40001: 'quick_start',
|
||||
40002: 'quick_stop',
|
||||
40003: 'quick_turn_left',
|
||||
40004: 'quick_turn_right',
|
||||
40005: 'over_speed',
|
||||
40006: 'low_speed',
|
||||
}
|
||||
|
||||
|
||||
class AlertMonitorError(Exception):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
|
||||
def alert_process(argv):
|
||||
'''
|
||||
alert_signals_queue data format
|
||||
|
||||
(300001, {})
|
||||
|
||||
(400000, {40001: True})
|
||||
'''
|
||||
self = argv
|
||||
while True:
|
||||
data = self.alert_signals_queue.get()
|
||||
if data:
|
||||
log.info('alert_signals_queue data: ', data)
|
||||
if ALERTCODE.get(data[0]):
|
||||
current_settings = settings.settings.get()
|
||||
alert_status = current_settings.get('app', {}).get(ALERTCODE.get(data[0]))
|
||||
if alert_status:
|
||||
self.read_cb(ALERTCODE.get(data[0]), data[1])
|
||||
else:
|
||||
log.warn('%s status is %s' % (ALERTCODE.get(data[0]), alert_status))
|
||||
else:
|
||||
log.error('altercode (%s) is not exists. alert info: %s' % data)
|
||||
|
||||
|
||||
class AlertMonitor(object):
|
||||
'''
|
||||
Recv alert signals and process them
|
||||
'''
|
||||
pass
|
||||
def __init__(self, read_cb):
|
||||
self.read_cb = read_cb
|
||||
self.alert_signals_queue = Queue(maxsize=64)
|
||||
_thread.start_new_thread(alert_process, (self,))
|
||||
|
||||
def post_alert(self, alert_code, alert_info):
|
||||
self.alert_signals_queue.put((alert_code, alert_info))
|
||||
|
@ -1,5 +1,6 @@
|
||||
|
||||
import ure
|
||||
import utime
|
||||
import _thread
|
||||
import cellLocator
|
||||
from wifilocator import wifilocator
|
||||
@ -125,66 +126,84 @@ def loc_worker(argv):
|
||||
while True:
|
||||
trigger = self.trigger_queue.get()
|
||||
if trigger:
|
||||
data = self.read()
|
||||
data = None
|
||||
retry = 0
|
||||
while retry < 3:
|
||||
data = self.read()
|
||||
if data:
|
||||
break
|
||||
else:
|
||||
retry += 1
|
||||
utime.sleep(1)
|
||||
log.debug('location data info:', data)
|
||||
if data and self.read_cb:
|
||||
self.read_cb(data)
|
||||
|
||||
|
||||
class Location(GPS, CellLocator, WiFiLocator):
|
||||
gps_enabled = False
|
||||
cellLoc_enabled = False
|
||||
wifiLoc_enabled = False
|
||||
|
||||
def __init__(self, read_cb, **kw):
|
||||
current_settings = settings.current_settings
|
||||
class Location(object):
|
||||
gps = None
|
||||
cellLoc = None
|
||||
wifiLoc = None
|
||||
|
||||
def __init__(self, read_cb):
|
||||
self.read_cb = read_cb
|
||||
|
||||
if current_settings['app']['loc_method'] & settings.default_values_app._loc_method.gps:
|
||||
if 'gps_cfg' in kw:
|
||||
super(GPS, self).__init__(kw['gps_cfg'])
|
||||
self.gps_enabled = True
|
||||
else:
|
||||
raise ValueError('Invalid gps init parameters.')
|
||||
|
||||
if current_settings['app']['loc_method'] & settings.default_values_app._loc_method.cell:
|
||||
if 'cellLocator_cfg' in kw:
|
||||
super(CellLocator, self).__init__(kw['cellLocator_cfg'])
|
||||
self.cellLoc_enabled = True
|
||||
else:
|
||||
raise ValueError('Invalid cell-locator init parameters.')
|
||||
|
||||
if current_settings['app']['loc_method'] & settings.default_values_app._loc_method.wifi:
|
||||
if 'wifiLocator_cfg' in kw:
|
||||
super(WiFiLocator, self).__init__(kw['wifiLocator_cfg'])
|
||||
self.wifiLoc_enabled = True
|
||||
else:
|
||||
raise ValueError('Invalid wifi-locator init parameters.')
|
||||
|
||||
self.trigger_queue = Queue(maxsize=64)
|
||||
_thread.start_new_thread(loc_worker, (self,))
|
||||
|
||||
def _locater_init(self):
|
||||
current_settings = settings.settings.get()
|
||||
locator_init_params = current_settings['sys']['locator_init_params']
|
||||
|
||||
if current_settings['app']['loc_method'] & settings.default_values_app._loc_method.gps:
|
||||
if self.gps is None:
|
||||
if 'gps_cfg' in locator_init_params:
|
||||
self.gps = GPS(locator_init_params['gps_cfg'])
|
||||
else:
|
||||
raise ValueError('Invalid gps init parameters.')
|
||||
else:
|
||||
self.gps = None
|
||||
|
||||
if current_settings['app']['loc_method'] & settings.default_values_app._loc_method.cell:
|
||||
if self.cellLoc is None:
|
||||
if 'cellLocator_cfg' in locator_init_params:
|
||||
self.cellLoc = CellLocator(locator_init_params['cellLocator_cfg'])
|
||||
else:
|
||||
raise ValueError('Invalid cell-locator init parameters.')
|
||||
else:
|
||||
self.cellLoc = None
|
||||
|
||||
if current_settings['app']['loc_method'] & settings.default_values_app._loc_method.wifi:
|
||||
if self.wifiLoc is None:
|
||||
if 'wifiLocator_cfg' in locator_init_params:
|
||||
self.wifiLoc = WiFiLocator(locator_init_params['wifiLocator_cfg'])
|
||||
else:
|
||||
raise ValueError('Invalid wifi-locator init parameters.')
|
||||
else:
|
||||
self.wifiLoc = None
|
||||
|
||||
def read(self):
|
||||
if self.gps_enabled:
|
||||
self._locater_init()
|
||||
|
||||
if self.gps:
|
||||
data = []
|
||||
r = super(GPS, self).read_location_GxRMC()
|
||||
r = self.gps.read_location_GxRMC()
|
||||
if r:
|
||||
data.append(r)
|
||||
|
||||
r = super(GPS, self).read_location_GxGGA()
|
||||
r = self.gps.read_location_GxGGA()
|
||||
if r:
|
||||
data.append(r)
|
||||
|
||||
if len(data):
|
||||
return (settings.default_values_app._loc_method.gps, data)
|
||||
|
||||
if self.cellLoc_enabled:
|
||||
data = super(CellLocator, self).read()
|
||||
if self.cellLoc:
|
||||
data = self.cellLoc.read()
|
||||
if data:
|
||||
return (settings.default_values_app._loc_method.cell, data)
|
||||
|
||||
if self.wifiLoc_enabled:
|
||||
data = super(WiFiLocator, self).read()
|
||||
if self.wifiLoc:
|
||||
data = self.wifiLoc.read()
|
||||
if data:
|
||||
return (settings.default_values_app._loc_method.wifi, data)
|
||||
|
||||
|
15
code/main.py
15
code/main.py
@ -10,6 +10,8 @@ version = '1.0.0'
|
||||
|
||||
tracker = None
|
||||
|
||||
current_settings = settings.settings.get()
|
||||
|
||||
|
||||
def loc_read_cb(data):
|
||||
if data:
|
||||
@ -23,7 +25,15 @@ def loc_read_cb(data):
|
||||
data_type = tracker.remote.DATA_LOCA_NON_GPS
|
||||
tracker.remote.post_data(data_type, loc_data)
|
||||
|
||||
tracker = Tracker(loc_read_cb, **settings.current_settings['sys']['locator_init_params'])
|
||||
|
||||
def alert_read_cb(data):
|
||||
if data:
|
||||
data_type = tracker.remote.DATA_NON_LOCA
|
||||
alert_data = {data[0]: data[1]}
|
||||
tracker.remote.post_data(data_type, alert_data)
|
||||
|
||||
|
||||
tracker = Tracker(loc_read_cb, alert_read_cb)
|
||||
|
||||
|
||||
def loc_timer_cb(argv):
|
||||
@ -31,9 +41,6 @@ def loc_timer_cb(argv):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
settings.init()
|
||||
current_settings = settings.current_settings
|
||||
|
||||
if (current_settings['app']['loc_mode'] & settings.default_values_app._loc_mode.cycle) \
|
||||
and current_settings['app']['loc_cycle_period']:
|
||||
loc_timer = osTimer()
|
||||
|
@ -38,6 +38,8 @@ object_model = [
|
||||
(20, ('disassemble_alert', 'rw'))
|
||||
]
|
||||
|
||||
object_model_code = {i[1][0]: i[0] for i in object_model}
|
||||
|
||||
|
||||
class QuecThing(object):
|
||||
def __init__(self, pk, ps, dk, ds, downlink_queue):
|
||||
@ -59,22 +61,21 @@ class QuecThing(object):
|
||||
def post_data(self, data_type, data):
|
||||
if data_type == DATA_NON_LOCA:
|
||||
for k, v in data.items():
|
||||
for om in object_model:
|
||||
if k == om[1][0]:
|
||||
if v:
|
||||
if quecIot.phymodelReport(1, {om[0]: v}):
|
||||
res = self.post_result_wait_queue.get()
|
||||
if res:
|
||||
v = {}
|
||||
continue
|
||||
else:
|
||||
self.rm_empty_data(data)
|
||||
return False
|
||||
else:
|
||||
self.rm_empty_data(data)
|
||||
return False
|
||||
else:
|
||||
if object_model_code.get(k) is not None and v:
|
||||
# Event Data Format From object_mode_code
|
||||
if isinstance(v, dict):
|
||||
v = {object_model_code.get(ik) if object_model_code.get(ik) else ik: iv for ik, iv in v.items()}
|
||||
if quecIot.phymodelReport(1, {object_model_code.get(k): v}):
|
||||
res = self.post_result_wait_queue.get()
|
||||
if res:
|
||||
v = {}
|
||||
continue
|
||||
else:
|
||||
self.rm_empty_data(data)
|
||||
return False
|
||||
else:
|
||||
self.rm_empty_data(data)
|
||||
return False
|
||||
self.rm_empty_data(data)
|
||||
return True
|
||||
elif data_type == DATA_LOCA_GPS:
|
||||
|
@ -14,9 +14,8 @@ from usr.battery import Battery
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
current_settings = settings.current_settings
|
||||
|
||||
if current_settings['sys']['cloud'] == settings.default_values_sys._cloud.quecIot:
|
||||
if settings.settings.get()['sys']['cloud'] == settings.default_values_sys._cloud.quecIot:
|
||||
from usr.quecthing import QuecThing
|
||||
from usr.quecthing import DATA_NON_LOCA, DATA_LOCA_NON_GPS, DATA_LOCA_GPS
|
||||
|
||||
@ -66,7 +65,7 @@ class DownLinkOption(object):
|
||||
self.remote = remote
|
||||
self.controller = Controller(self.remote)
|
||||
|
||||
def row_data(self, *args, **kwargs):
|
||||
def raw_data(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def object_model(self, *args, **kwargs):
|
||||
@ -74,7 +73,7 @@ class DownLinkOption(object):
|
||||
|
||||
for arg in args:
|
||||
if hasattr(settings.default_values_app, arg[0]):
|
||||
set_res = settings.set(arg[0], arg[1])
|
||||
set_res = settings.settings.set(arg[0], arg[1])
|
||||
log.debug('key: %s, val: %s, set_res: %s', (arg[0], arg[1], set_res))
|
||||
if setting_flag == 0:
|
||||
setting_flag = 1
|
||||
@ -84,12 +83,12 @@ class DownLinkOption(object):
|
||||
pass
|
||||
|
||||
if setting_flag:
|
||||
settings.save()
|
||||
settings.settings.save()
|
||||
|
||||
def query(self, *args, **kwargs):
|
||||
for arg in args:
|
||||
if hasattr(settings.default_values_app, arg):
|
||||
settings.query(self.remote, 'app', arg)
|
||||
settings.settings.query(self.remote, 'app', arg)
|
||||
elif hasattr(self.controller, arg):
|
||||
getattr(self.controller, arg)(*('r'))
|
||||
else:
|
||||
@ -104,7 +103,8 @@ def downlink_process(argv):
|
||||
Data format should be unified at the process module file of its own before put to downlink_queue.
|
||||
|
||||
Data format:
|
||||
TODO: =====================
|
||||
('object_model', [('phone_num', '123456789'),...])
|
||||
('query', ['phone_num',...])
|
||||
'''
|
||||
data = self.downlink_queue.get()
|
||||
log.debug('downlink_queue data:', data)
|
||||
@ -118,9 +118,6 @@ def downlink_process(argv):
|
||||
else:
|
||||
# TODO: Raise Error OR Conntinue
|
||||
raise RemoteError('DownLinkOption has no accribute %s.' % option_attr)
|
||||
'''
|
||||
TODO: processing for settings or control commands from downlink channel
|
||||
'''
|
||||
|
||||
|
||||
def uplink_process(argv):
|
||||
@ -215,14 +212,15 @@ class Remote(object):
|
||||
def __init__(self):
|
||||
self.downlink_queue = Queue(maxsize=64)
|
||||
self.uplink_queue = Queue(maxsize=64)
|
||||
cloud_init_params = settings.current_settings['sys']['cloud_init_params']
|
||||
current_settings = settings.settings.get()
|
||||
cloud_init_params = current_settings['sys']['cloud_init_params']
|
||||
if current_settings['sys']['cloud'] == settings.default_values_sys._cloud.quecIot:
|
||||
self.cloud = QuecThing(cloud_init_params['PK'], cloud_init_params['PS'], cloud_init_params['DK'], cloud_init_params['DS'], self.downlink_queue)
|
||||
self.DATA_NON_LOCA = DATA_NON_LOCA
|
||||
self.DATA_LOCA_NON_GPS = DATA_LOCA_NON_GPS
|
||||
self.DATA_LOCA_GPS = DATA_LOCA_GPS
|
||||
else:
|
||||
raise settings.Error('Current cloud (0x%X) not supported!' % current_settings['sys']['cloud'])
|
||||
raise settings.SettingsError('Current cloud (0x%X) not supported!' % current_settings['sys']['cloud'])
|
||||
|
||||
_thread.start_new_thread(downlink_process, (self,))
|
||||
_thread.start_new_thread(uplink_process, (self,))
|
||||
|
219
code/settings.py
219
code/settings.py
@ -12,27 +12,29 @@ log = getLogger(__name__)
|
||||
|
||||
tracker_settings_file = '/usr/tracker_settings.json'
|
||||
|
||||
current_settings = {}
|
||||
current_settings_app = {}
|
||||
current_settings_sys = {}
|
||||
|
||||
_settings_lock = _thread.allocate_lock()
|
||||
|
||||
|
||||
def settings_lock(func_name):
|
||||
def settings_lock_fun(func):
|
||||
def wrapperd_fun(*args, **kwargs):
|
||||
if not _settings_lock.locked():
|
||||
if _settings_lock.acquire():
|
||||
source_fun = func(*args, **kwargs)
|
||||
_settings_lock.release()
|
||||
return source_fun
|
||||
else:
|
||||
log.warn('_settings_lock acquire falied. func: %s, args: %s' % (func_name, args))
|
||||
def settings_lock(func):
|
||||
def wrapperd_fun(*args, **kwargs):
|
||||
if not _settings_lock.locked():
|
||||
if _settings_lock.acquire():
|
||||
source_fun = func(*args, **kwargs)
|
||||
_settings_lock.release()
|
||||
return source_fun
|
||||
else:
|
||||
log.warn('_settings_lock is locked. func: %s, args: %s' % (func_name, args))
|
||||
return wrapperd_fun
|
||||
return settings_lock_fun
|
||||
log.warn('_settings_lock acquire falied. func: %s, args: %s' % (func.__name__, args))
|
||||
else:
|
||||
log.warn('_settings_lock is locked. func: %s, args: %s' % (func.__name__, args))
|
||||
return wrapperd_fun
|
||||
|
||||
|
||||
class SettingsError(Exception):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
|
||||
class default_values_app(object):
|
||||
@ -165,8 +167,7 @@ class default_values_sys(object):
|
||||
|
||||
@staticmethod
|
||||
def _get_locator_init_params(loc_method):
|
||||
global current_settings
|
||||
locator_init_params = current_settings.get('sys', {}).get('locator_init_params', {})
|
||||
locator_init_params = {}
|
||||
|
||||
if loc_method & default_values_app._loc_method.gps:
|
||||
locator_init_params['gps_cfg'] = default_values_sys._gps_cfg
|
||||
@ -179,8 +180,7 @@ class default_values_sys(object):
|
||||
|
||||
@staticmethod
|
||||
def _get_cloud_init_params(cloud):
|
||||
global current_settings
|
||||
cloud_init_params = current_settings.get('sys', {}).get('cloud_init_params', {})
|
||||
cloud_init_params = {}
|
||||
|
||||
if cloud & default_values_sys._cloud.quecIot:
|
||||
cloud_init_params = default_values_sys._quecIot
|
||||
@ -204,116 +204,107 @@ class default_values_sys(object):
|
||||
return cloud_init_params
|
||||
|
||||
|
||||
@settings_lock('settings.init')
|
||||
def init():
|
||||
global current_settings
|
||||
class Settings(object):
|
||||
|
||||
default_values_sys.locator_init_params = default_values_sys._get_locator_init_params(default_values_app.loc_method)
|
||||
default_values_sys.cloud_init_params = default_values_sys._get_cloud_init_params(default_values_sys.cloud)
|
||||
def __init__(self):
|
||||
self.current_settings = {}
|
||||
self.current_settings_app = {}
|
||||
self.current_settings_sys = {}
|
||||
self.init()
|
||||
|
||||
default_settings_app = {k: v for k, v in default_values_app.__dict__.items() if not k.startswith('_')}
|
||||
default_settings_sys = {k: v for k, v in default_values_sys.__dict__.items() if not k.startswith('_')}
|
||||
default_settings = {'app': default_settings_app, 'sys': default_settings_sys}
|
||||
@settings_lock
|
||||
def init(self):
|
||||
default_values_sys.locator_init_params = default_values_sys._get_locator_init_params(default_values_app.loc_method)
|
||||
default_values_sys.cloud_init_params = default_values_sys._get_cloud_init_params(default_values_sys.cloud)
|
||||
|
||||
if not ql_fs.path_exists(tracker_settings_file):
|
||||
with open(tracker_settings_file, 'w') as f:
|
||||
ujson.dump(default_settings, f)
|
||||
current_settings = dict(default_settings)
|
||||
else:
|
||||
with open(tracker_settings_file, 'r') as f:
|
||||
current_settings = ujson.load(f)
|
||||
default_settings_app = {k: v for k, v in default_values_app.__dict__.items() if not k.startswith('_')}
|
||||
default_settings_sys = {k: v for k, v in default_values_sys.__dict__.items() if not k.startswith('_')}
|
||||
default_settings = {'app': default_settings_app, 'sys': default_settings_sys}
|
||||
|
||||
if not ql_fs.path_exists(tracker_settings_file):
|
||||
with open(tracker_settings_file, 'w') as f:
|
||||
ujson.dump(default_settings, f)
|
||||
self.current_settings = dict(default_settings)
|
||||
else:
|
||||
with open(tracker_settings_file, 'r') as f:
|
||||
self.current_settings = ujson.load(f)
|
||||
|
||||
@settings_lock('settings.get')
|
||||
def get():
|
||||
global current_settings
|
||||
return current_settings
|
||||
@settings_lock
|
||||
def get(self):
|
||||
return self.current_settings
|
||||
|
||||
@settings_lock
|
||||
def query(self, remote, set_type, set_key):
|
||||
log.debug('remote: %s, set_type: %s, set_key: %s' % (remote, set_type, set_key))
|
||||
remote.post_data(remote.DATA_NON_LOCA, {set_key: self.current_settings.get(set_type, {}).get(set_key)})
|
||||
|
||||
@settings_lock('settings.query')
|
||||
def query(remote, set_type, set_key):
|
||||
global current_settings
|
||||
log.debug('remote: %s, set_type: %s, set_key: %s' % (remote, set_type, set_key))
|
||||
remote.post_data(remote.DATA_NON_LOCA, {set_key: current_settings.get(set_type, {}).get(set_key)})
|
||||
|
||||
|
||||
@settings_lock('settings.set')
|
||||
def set(opt, val):
|
||||
global current_settings
|
||||
|
||||
if opt in current_settings['app']:
|
||||
if opt == 'phone_num':
|
||||
if not isinstance(val, str):
|
||||
@settings_lock
|
||||
def set(self, opt, val):
|
||||
if opt in self.current_settings['app']:
|
||||
if opt == 'phone_num':
|
||||
if not isinstance(val, str):
|
||||
return False
|
||||
# TODO: This ure not work in EC600N
|
||||
pattern = ure.compile(r'^(?:(?:\+)86)?1[3-9]\d\d\d\d\d\d\d\d\d$')
|
||||
if pattern.search(val):
|
||||
self.current_settings['app'][opt] = val
|
||||
return True
|
||||
return False
|
||||
# TODO: This ure not work in EC600N
|
||||
pattern = ure.compile(r'^(?:(?:\+)86)?1[3-9]\d\d\d\d\d\d\d\d\d$')
|
||||
if pattern.search(val):
|
||||
current_settings['app'][opt] = val
|
||||
|
||||
elif opt == 'loc_method':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val > default_values_app._loc_method.all:
|
||||
return False
|
||||
self.current_settings['app'][opt] = val
|
||||
self.current_settings['sys']['locator_init_params'] = default_values_sys._get_locator_init_params(val)
|
||||
return True
|
||||
return False
|
||||
|
||||
elif opt == 'loc_method':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val > default_values_app._loc_method.all:
|
||||
return False
|
||||
current_settings['app'][opt] = val
|
||||
current_settings['sys']['locator_init_params'] = default_values_sys._get_locator_init_params(val)
|
||||
return True
|
||||
elif opt == 'loc_mode':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val > default_values_app._loc_mode.all:
|
||||
return False
|
||||
self.current_settings['app'][opt] = val
|
||||
return True
|
||||
|
||||
elif opt == 'loc_mode':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val > default_values_app._loc_mode.all:
|
||||
return False
|
||||
current_settings['app'][opt] = val
|
||||
return True
|
||||
elif opt == 'loc_cycle_period':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val < 1:
|
||||
return False
|
||||
self.current_settings['app'][opt] = val
|
||||
return True
|
||||
|
||||
elif opt == 'loc_cycle_period':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val < 1:
|
||||
return False
|
||||
current_settings['app'][opt] = val
|
||||
return True
|
||||
elif opt == 'low_power_alert_threshold' or opt == 'low_power_shutdown_threshold':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val < 0 or val > 100:
|
||||
return False
|
||||
self.current_settings['app'][opt] = val
|
||||
return True
|
||||
|
||||
elif opt == 'low_power_alert_threshold' or opt == 'low_power_shutdown_threshold':
|
||||
if not isinstance(val, int):
|
||||
return False
|
||||
if val < 0 or val > 100:
|
||||
return False
|
||||
current_settings['app'][opt] = val
|
||||
return True
|
||||
elif opt in (
|
||||
'sw_ota', 'sw_ota_auto_upgrade', 'sw_voice_listen', 'sw_voice_record',
|
||||
'sw_fault_alert', 'sw_low_power_alert', 'sw_over_speed_alert',
|
||||
'sw_sim_out_alert', 'sw_disassemble_alert', 'sw_drive_behavior_alert'):
|
||||
if not isinstance(val, bool):
|
||||
return False
|
||||
self.current_settings['app'][opt] = val
|
||||
return True
|
||||
|
||||
elif opt in (
|
||||
'sw_ota', 'sw_ota_auto_upgrade', 'sw_voice_listen', 'sw_voice_record',
|
||||
'sw_fault_alert', 'sw_low_power_alert', 'sw_over_speed_alert',
|
||||
'sw_sim_out_alert', 'sw_disassemble_alert', 'sw_drive_behavior_alert'):
|
||||
if not isinstance(val, bool):
|
||||
else:
|
||||
return False
|
||||
current_settings['app'][opt] = val
|
||||
return True
|
||||
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
@settings_lock
|
||||
def save(self):
|
||||
with open(tracker_settings_file, 'w') as f:
|
||||
ujson.dump(self.current_settings, f)
|
||||
|
||||
@settings_lock('settings.save')
|
||||
def save():
|
||||
with open(tracker_settings_file, 'w') as f:
|
||||
ujson.dump(current_settings, f)
|
||||
@settings_lock
|
||||
def reset(self):
|
||||
uos.remove(tracker_settings_file)
|
||||
|
||||
|
||||
@settings_lock('settings.reset')
|
||||
def reset():
|
||||
uos.remove(tracker_settings_file)
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
settings = Settings()
|
||||
|
@ -1,13 +1,15 @@
|
||||
|
||||
from usr.location import Location
|
||||
from usr.location import Remote
|
||||
from usr.sensor import Sensor
|
||||
from usr.led import LED
|
||||
from usr.remote import Remote
|
||||
# from usr.sensor import Sensor
|
||||
# from usr.led import LED
|
||||
from usr.alert import AlertMonitor
|
||||
|
||||
|
||||
class Tracker():
|
||||
def __init__(self, loc_read_cb, **kw):
|
||||
self.locator = Location(loc_read_cb, **kw)
|
||||
def __init__(self, loc_read_cb, alert_read_cb, **kw):
|
||||
self.remote = Remote()
|
||||
self.sensor = Sensor()
|
||||
self.led = LED()
|
||||
self.locator = Location(loc_read_cb)
|
||||
# self.sensor = Sensor()
|
||||
# self.led = LED()
|
||||
self.alert = AlertMonitor(alert_read_cb)
|
||||
|
Loading…
x
Reference in New Issue
Block a user