demo.tracker-v2/code/test_tracker.py

141 lines
4.0 KiB
Python
Raw Normal View History

import ure
import utime
import usr.settings as settings
from machine import UART
from queue import Queue
from usr.logging import getLogger
from usr.quecthing import QuecThing
from usr.remote import Remote
# from usr.location import Location
from usr.tracker import Tracker
log = getLogger(__name__)
def test_quecthing():
log.info('[x] start test_quecthing')
current_settings = settings.settings.get()
cloud_init_params = current_settings['sys']['cloud_init_params']
downlink_queue = Queue(maxsize=64)
cloud = QuecThing(cloud_init_params['PK'], cloud_init_params['PS'], cloud_init_params['DK'], cloud_init_params['DS'], downlink_queue)
cloud.post_data(0x0, 'put test msg.')
log.info('[x] end test_quecthing')
def test_settings():
log.info('[x] start test_settings')
current_settings = settings.settings.get()
log.info("current_settings", current_settings)
settings.Settings().reset()
current_settings = settings.settings.get()
log.info("current_settings", current_settings)
log.info('[x] end test_settings')
def test_uart():
log.info('[x] start test_uart')
current_settings = settings.settings.get()
gps_cfg = current_settings['sys']['locator_init_params']
uart1 = UART(UART.UART1, gps_cfg['buadrate'], gps_cfg['databits'], gps_cfg['parity'], gps_cfg['stopbits'], gps_cfg['flowctl'])
log.info("uart1.write('Test UART1 Msg.')")
uart1.write('Test UART1 Msg.')
utime.sleep(1)
ms = uart1.any()
log.info("uart1.any()", ms)
if ms:
msg = uart1.read(ms)
log.info("uart1 read msg", msg)
uart1.close()
def test_ure():
gps_data = '$GNVTG,218.45,T,,M,0.03,N,0.06,K,A*2C'
a = ure.search(r',N,[0-9]+\.[0-9]+,K,', gps_data)
if a:
print(a.group[0])
def test_remote():
log.info('[x] start test_remote')
log.info('settings.current_settings: %s' % settings.settings.get())
Remote()
log.info('[x] end test_remote')
def test_tracker():
log.info('[x] start test_tracker')
tracker = None
def remote_read_cb(*data):
if data:
if data[0] == 'object_model':
for item in data[1]:
if item[0] == 'loc_mode':
tracker.tracker_command_queue.put(item[0])
def loc_read_cb(data):
if data:
loc_method = data[0]
loc_data = data[1]
log.info("loc_method:", loc_method)
log.info("loc_data:", loc_data)
if loc_method == settings.default_values_app._loc_method.gps:
data_type = tracker.remote.DATA_LOCA_GPS
else:
data_type = tracker.remote.DATA_LOCA_NON_GPS
tracker.remote.post_data(data_type, loc_data)
def alert_read_cb(*data):
if data:
data_type = tracker.remote.DATA_NON_LOCA
alert_data = {data[0]: data[1]}
tracker.remote.post_data(data_type, alert_data)
kw = {}
tracker = Tracker(remote_read_cb, loc_read_cb, alert_read_cb, **kw)
# log.info('[.] sleep 10')
# utime.sleep(10)
# log.debug('[.] set loc_mode 0x0.')
# settings.settings.set('loc_mode', 0x0)
# settings.settings.save()
# log.debug('[.] tracker_command_queue put loc_mode.')
# tracker.tracker_command_queue.put('loc_mode')
# log.info('[.] sleep 10')
# utime.sleep(10)
# log.debug('[.] set loc_mode 0x1.')
# settings.settings.set('loc_mode', 0x1)
# settings.settings.save()
# log.debug('[.] tracker_command_queue put loc_mode.')
# tracker.tracker_command_queue.put('loc_mode')
# log.info('[.] sleep 5')
# utime.sleep(5)
# log.info('[.] locator trigger')
# tracker.locator.trigger()
# log.info('[.] sleep 3')
# utime.sleep(3)
# log.info('[.] alert post_alert')
# tracker.alert.post_alert(40000, {'drive_behavior_code': 40001, 'local_time': utime.mktime(utime.localtime())})
log.info('[x] end test_tracker')
def main():
# test_quecthing()
# test_settings()
# test_uart()
# test_remote()
test_tracker()
if __name__ == '__main__':
main()