demo.tracker-v2/code/remote.py
2022-03-16 11:38:50 +08:00

387 lines
14 KiB
Python

import uos
import utime
import ql_fs
import ujson
import _thread
from misc import Power
from queue import Queue
import usr.settings as settings
from usr.battery import Battery
from usr.common import Singleton
from usr.logging import getLogger
if settings.settings.get()['sys']['cloud'] == settings.default_values_sys._cloud.quecIot:
from usr.quecthing import QuecThing
from usr.quecthing import DATA_NON_LOCA, DATA_LOCA_NON_GPS, DATA_LOCA_GPS
log = getLogger(__name__)
class RemoteError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class ControllerError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class Controller(Singleton):
def __init__(self, tracker):
self.tracker = tracker
def power_switch(self, perm, flag=None):
if perm == 'r':
self.tracker.remote.post_data(self.tracker.remote.DATA_NON_LOCA, {'power_switch': True})
elif perm == 'w':
if flag is True:
self.tracker.machine_info_report()
elif flag is False:
self.tracker.machine_info_report(power_switch=flag)
utime.sleep(3)
self.tracker.energy_led.period = None
self.tracker.energy_led.switch(0)
self.tracker.running_led.period = None
self.tracker.running_led.switch(0)
Power.powerDown()
else:
raise ControllerError('Controller switch permission error %s.' % perm)
def energy(self, perm):
if perm == 'r':
battery_energy = Battery().energy()
self.tracker.remote.post_data(self.tracker.remote.DATA_NON_LOCA, {'energy': battery_energy})
else:
raise ControllerError('Controller energy permission error %s.' % perm)
def user_ota_action(self, perm, action):
if perm == 'w':
if action is False:
self.tracker.remote.cloud_ota_action(0)
elif action is True:
self.tracker.remote.cloud_ota_action(1)
def ota_status(self, perm, status=None):
if perm == 'r':
current_settings = settings.settings.get()
ota_status = current_settings['sys']['ota_status']
self.tracker.remote.post_data(self.tracker.remote.DATA_NON_LOCA, {'ota_status': ota_status})
elif perm == 'w':
if status is not None:
settings.settings.set('ota_status', status)
settings.settings.save()
class DownLinkOption(object):
def __init__(self, tracker):
self.tracker = tracker
self.controller = Controller(self.tracker)
def raw_data(self, *args, **kwargs):
pass
def object_model(self, *args, **kwargs):
setting_flag = 0
for arg in args:
if hasattr(settings.default_values_app, arg[0]):
set_res = settings.settings.set(arg[0], arg[1])
log.debug('key: %s, val: %s, set_res: %s', (arg[0], arg[1], set_res))
if setting_flag == 0:
setting_flag = 1
if hasattr(self.controller, arg[0]):
getattr(self.controller, arg[0])(*('w', arg[1]))
if setting_flag:
settings.settings.save()
def query(self, *args, **kwargs):
for arg in args:
if hasattr(settings.default_values_app, arg):
current_settings = settings.settings.get()
self.tracker.remote.post_data(self.tracker.remote.DATA_NON_LOCA, {arg: current_settings.get('app', {}).get(arg)})
elif hasattr(self.controller, arg):
getattr(self.controller, arg)(*('r'))
else:
pass
def ota_plain(self, *args, **kwargs):
current_settings = settings.settings.get()
if current_settings['app']['sw_ota'] and current_settings['app']['sw_ota_auto_upgrade']:
self.tracker.remote.cloud_ota_action()
def downlink_process(argv):
self = argv
while True:
'''
Recv data from quecIot or AliYun or other server.
Data format should be unified at the process module file of its own before put to downlink_queue.
Data format:
('object_model', [('phone_num', '123456789'),...])
('query', ['phone_num',...])
'''
data = self.downlink_queue.get()
log.debug('downlink_queue data:', data)
DownLinkOptionObj = DownLinkOption(tracker=self.tracker)
option_attr = data[0]
args = data[1]
if hasattr(DownLinkOptionObj, option_attr):
option_fun = getattr(DownLinkOptionObj, option_attr)
option_fun(*args)
if self.remote_read_cb:
self.remote_read_cb(*data)
else:
log.warn('Remote read callback is not defined.')
else:
# TODO: Raise Error OR Conntinue
raise RemoteError('DownLinkOption has no accribute %s.' % option_attr)
def uplink_process(argv):
self = argv
while True:
'''
We need to post data in tracker_data.hist file to server firstly every time.
If still can't post all data to server, stop posting, but to append all data in uplink_queue to tracker_data.hist.
When data in tracker_data.hist and in uplink_queue is processed, wait for new data coming into uplink_queue.
If get new data, try to post data again, if fail, add data to tracker_data.hist file.
Otherwise, keep waiting untill new data coming, then process could go to the start of loopwhile, and data in tracker_data.hist could be processed again.
'''
need_refresh = False
# Read history data that didn't send to server intime to hist-dictionary.
hist = self.read_history()
try:
for key, value in hist.items():
# Check if non_loca data (sensor or device info data) or location gps data or location non-gps data (cell/wifi-locator data)
if key == 'non_loca' or key == 'loca_non_gps' or key == 'loca_gps':
if key == 'non_loca':
data_type = self.DATA_NON_LOCA
elif key == 'loca_non_gps':
data_type = self.DATA_LOCA_NON_GPS
else:
data_type = self.DATA_LOCA_GPS
for i, data in enumerate(value):
ntry = 0
# Try at most 3 times to post data to server.
while not self.cloud.post_data(data_type, data):
ntry += 1
if ntry >= 3: # Data post failed after 3 times, maybe network error?
raise RemoteError('Data post failed.') # Stop posting more data, go to exception handler.
utime.sleep(1)
else:
value.pop(i) # Pop data from data-list after posting sueecss.
need_refresh = True # Data in hist-dictionary changed, need to refresh history file.
except Exception:
while True: # Put all data in uplink_queue to hist-dictionary.
if self.uplink_queue.size():
msg = self.uplink_queue.get()
if msg:
if msg[0] == self.DATA_NON_LOCA:
key = 'non_loca'
elif msg[0] == self.DATA_LOCA_NON_GPS:
key = 'loca_non_gps'
elif msg[0] == self.DATA_LOCA_GPS:
key = 'loca_gps'
else:
continue
hist[key].append(msg[1])
need_refresh = True
else:
continue
else:
break
finally:
if need_refresh:
# Flush data in hist-dictionary to tracker_data.hist file.
self.refresh_history(hist)
need_refresh = False
'''
If history data exists, put a empty msg to uplink_queue to trriger the return of self.uplink_queue.get() API below.
So that history data could be processed again immediately.
Without this, history data could only be processed after new data being put into uplink_queue.
But is this necessary ???
'''
if len(hist.get('non_loca', [])) + len(hist.get('loca_non_gps', [])) + len(hist.get('loca_gps', [])):
self.uplink_queue.put(())
# When comes to this, wait for new data coming into uplink_queue.
msg = self.uplink_queue.get()
if msg:
if msg[0] == self.DATA_NON_LOCA or msg[0] == self.DATA_LOCA_NON_GPS or msg[0] == self.DATA_LOCA_GPS:
if not self.cloud.post_data(msg[0], msg[1]):
self.add_history(msg[0], msg[1])
else:
continue
else:
continue
else:
continue
class Remote(Singleton):
_history = '/usr/tracker_data.hist'
def __init__(self, tracker, remote_read_cb=None):
self.tracker = tracker
self.remote_read_cb = remote_read_cb
self.downlink_queue = Queue(maxsize=64)
self.uplink_queue = Queue(maxsize=64)
self.block_io = True
current_settings = settings.settings.get()
cloud_init_params = current_settings['sys']['cloud_init_params']
if current_settings['sys']['cloud'] == settings.default_values_sys._cloud.quecIot:
self.cloud = QuecThing(cloud_init_params['PK'], cloud_init_params['PS'], cloud_init_params['DK'], cloud_init_params['DS'], self.downlink_queue)
self.DATA_NON_LOCA = DATA_NON_LOCA
self.DATA_LOCA_NON_GPS = DATA_LOCA_NON_GPS
self.DATA_LOCA_GPS = DATA_LOCA_GPS
else:
raise settings.SettingsError('Current cloud (0x%X) not supported!' % current_settings['sys']['cloud'])
_thread.start_new_thread(downlink_process, (self,))
_thread.start_new_thread(uplink_process, (self,))
def read_history(self):
'''
{
"non_loca": [
{
'switch': True,
'energy': 100
},
{
'switch': True,
'energy': 100
}
],
"loca_non_gps": [
(117.1138, 31.82279, 550),
(117.1138, 31.82279, 550)
],
"loca_gps": [
['$GPRMCx,x,x,x', '$GPGGAx,x,x,x'],
['$GPRMCx,x,x,x', '$GPGGAx,x,x,x']
]
}
'''
if ql_fs.path_exists(self._history):
with open(self._history, 'r') as f:
try:
res = ujson.load(f)
if isinstance(res, dict):
return res
return {}
except Exception:
return {}
else:
return {}
def add_history(self, data_type, data):
try:
with open(self._history, 'r') as f:
res = ujson.load(f)
except Exception:
res = {}
if not isinstance(res, dict):
res = {}
if data_type == self.DATA_NON_LOCA:
key = 'non_loca'
elif data_type == self.DATA_LOCA_NON_GPS:
key = 'loca_non_gps'
elif data_type == self.DATA_LOCA_GPS:
key = 'loca_gps'
if key not in res:
res[key] = []
res[key].append(data)
return self.refresh_history(res)
def refresh_history(self, hist_dict):
try:
with open(self._history, 'w') as f:
ujson.dump(hist_dict, f)
return True
except Exception:
return False
def clean_history(self):
uos.remove(self._history)
def _post_data(self, data):
if data[0] == self.DATA_NON_LOCA or data[0] == self.DATA_LOCA_NON_GPS or data[0] == self.DATA_LOCA_GPS:
if not self.cloud.post_data(data[0], data[1]):
self.add_history(data[0], data[1])
return False
else:
return True
else:
raise RemoteError('Post data format is wrong. data: %s' % data)
'''
Data format to post:
--- non_loca ---
{
'switch': True,
'energy': 100
}
--- loca_non_gps ---
(117.1138, 31.82279, 550)
--- loca_gps ---
['$GPRMCx,x,x,x', '$GPGGAx,x,x,x']
'''
def post_data(self, data_type, data):
if self.block_io is True:
return self._post_data((data_type, data))
else:
self.uplink_queue.put((data_type, data))
return True
def set_block_io(self, val):
self.block_io = val
def check_ota(self):
current_settings = settings.settings.get()
if current_settings['sys']['cloud'] == settings.default_values_sys._cloud.quecIot:
if current_settings['app']['sw_ota'] is True:
self.cloud.dev_info_report()
else:
raise settings.SettingsError('OTA upgrade is disabled!')
else:
raise settings.SettingsError('Current cloud (0x%X) not supported!' % current_settings['sys']['cloud'])
def cloud_ota_action(self, val=1):
current_settings = settings.settings.get()
if current_settings['sys']['cloud'] == settings.default_values_sys._cloud.quecIot:
self.cloud.ota_action(val)
if val == 0:
settings.settings.set('ota_status', 0)
settings.settings.save()
else:
raise settings.SettingsError('Current cloud (0x%X) not supported!' % current_settings['sys']['cloud'])