demo.tracker-v2/code/test_tracker.py

508 lines
18 KiB
Python
Raw Normal View History

2022-03-24 16:01:12 +08:00
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import utime
2022-04-13 15:34:07 +08:00
import osTimer
2022-04-12 09:13:20 +08:00
from usr.logging import Logger
2022-04-13 15:34:07 +08:00
from usr.tracker import Tracker, tracker_main
2022-04-12 09:13:20 +08:00
from usr.battery import Battery
from usr.history import History
from usr.location import Location
2022-04-13 11:55:36 +08:00
from usr.quecthing import QuecThing, QuecObjectModel
from usr.aliyunIot import AliYunIot, AliObjectModel
2022-04-12 09:13:20 +08:00
from usr.mpower import LowEnergyRTC
from usr.common import Observable, Observer
from usr.remote import RemoteSubcribe, RemotePublish
from usr.settings import Settings, PROJECT_NAME, PROJECT_VERSION, \
2022-04-13 11:55:36 +08:00
DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION, \
2022-04-13 18:24:49 +08:00
quec_object_model, ali_object_model, default_values_sys, default_values_app
2022-03-17 13:25:59 +08:00
2022-04-12 09:13:20 +08:00
log = Logger(__name__)
2022-03-17 13:25:59 +08:00
2022-04-12 09:13:20 +08:00
def test_led():
pass
2022-04-12 09:13:20 +08:00
def test_logger():
res = {"all": 0, "success": 0, "failed": 0}
2022-04-12 09:13:20 +08:00
log = Logger("test_logger")
log.debug("debug Level Log.")
log.info("info Level Log.")
log.warn("warn Level Log.")
log.error("error Level Log.")
log.critical("critical Level Log.")
2022-04-12 09:13:20 +08:00
assert log.get_debug() is True, "[test_logger] FAILED: log.get_debug() is not True."
print("[test_logger] SUCCESS: log.get_debug() is True.")
res["success"] += 1
2022-04-12 09:13:20 +08:00
assert log.set_debug(True) is True, "[test_logger] FAILED: log.set_debug(True)."
print("[test_logger] SUCCESS: log.set_debug(True).")
res["success"] += 1
assert log.set_debug(False) is True, "[test_logger] FAILED: log.set_debug(False)."
print("[test_logger] SUCCESS: log.set_debug(False).")
res["success"] += 1
2022-04-12 09:13:20 +08:00
assert log.get_level() == "debug", "[test_logger] FAILED: log.get_level() is not debug."
print("[test_logger] SUCCESS: log.get_level() is debug.")
res["success"] += 1
2022-04-12 09:13:20 +08:00
for level in ("debug", "info", "warn", "error", "critical"):
assert log.set_level(level) is True and log.get_level() == level, "[test_logger] FAILED: log.set_level(%s)." % level
print("[test_logger] SUCCESS: log.set_level(%s)." % level)
res["success"] += 1
2022-04-12 09:13:20 +08:00
res["all"] = res["success"] + res["failed"]
print("[test_logger] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
2022-04-12 09:13:20 +08:00
def test_settings():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
assert settings.init() is True, "[test_settings] FAILED: Settings.init()."
print("[test_settings] SUCCESS: Settings.init().")
res["success"] += 1
current_settings = settings.get()
assert current_settings and isinstance(current_settings, dict)
print("[test_settings] SUCCESS: Settings.get().")
res["success"] += 1
for key, val in current_settings.get("app", {}).items():
val = "18888888888" if key == "phone_num" else val
assert settings.set(key, val) is True, "[test_settings] FAILED: APP Settings.set(%s, %s)." % (key, val)
print("[test_settings] SUCCESS: APP Settings.set(%s, %s)." % (key, val))
res["success"] += 1
for key, val in current_settings.get("sys", {}).items():
if key in ("sw_log", "ota_status", "cloud_init_params", "user_ota_action"):
assert settings.set(key, val) is True, "[test_settings] FAILED: SYS Settings.set(%s, %s)." % (key, val)
print("[test_settings] SUCCESS: SYS Settings.set(%s, %s)." % (key, val))
res["success"] += 1
assert settings.save() is True, "[test_settings] FAILED: Settings.save()."
print("[test_settings] SUCCESS: Settings.save().")
res["success"] += 1
assert settings.reset() is True, "[test_settings] FAILED: Settings.reset()."
print("[test_settings] SUCCESS: Settings.reset().")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_settings] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
2022-04-13 15:34:07 +08:00
run_time = 0
def get_voltage_cb(args):
global run_time
run_time += 5
2022-04-12 09:13:20 +08:00
def test_battery():
res = {"all": 0, "success": 0, "failed": 0}
battery = Battery()
2022-04-13 15:34:07 +08:00
temp = 30
msg = "[test_battery] %s: battery.set_temp(30)."
assert battery.set_temp(temp) and battery.__temp == temp, msg % "FAILED"
print(msg % "SUCCESS")
2022-04-12 09:13:20 +08:00
res["success"] += 1
2022-04-13 15:34:07 +08:00
timer = osTimer()
timer.start(5, 1, get_voltage_cb)
2022-04-12 09:13:20 +08:00
voltage = battery.get_voltage()
2022-04-13 15:34:07 +08:00
timer.stop()
global run_time
2022-04-13 18:24:49 +08:00
print("[test_battery] battery.get_voltage() run_time: %sms" % run_time)
2022-04-13 15:34:07 +08:00
msg = "[test_battery] %s: battery.get_voltage() %s."
assert isinstance(voltage, int) and voltage > 0, msg % ("FAILED", voltage)
print(msg % ("SUCCESS", voltage))
2022-04-12 09:13:20 +08:00
res["success"] += 1
energy = battery.get_energy()
assert isinstance(energy, int) and energy >= 0, "[test_battery] FAILED: battery.get_energy() %s." % energy
print("[test_battery] SUCCESS: battery.get_energy() is %s." % energy)
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_battery] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
class TestHistObservable(Observable):
def produce_hist_data(self, local_time):
hist_data = [{"local_time": local_time}]
self.notifyObservers(self, *hist_data)
def test_history():
res = {"all": 0, "success": 0, "failed": 0}
history = History()
test_hist_obs = TestHistObservable()
test_hist_obs.addObserver(history)
hist_data = [{"test": "test"}]
assert history.write(hist_data), "[test_history] FAILED: history.write()."
print("[test_history] SUCCESS: history.write(%s)." % str(hist_data))
res["success"] += 1
hist = history.read()
assert hist.get("data") is not None and isinstance(hist["data"], list), "[test_history] FAILED: history.read() %s." % hist
print("[test_history] SUCCESS: history.read() is %s." % hist)
res["success"] += 1
local_time = utime.mktime(utime.localtime())
test_hist_obs.produce_hist_data(local_time)
hist = history.read()
obs_res = False
for i in hist.get("data", []):
if i.get("local_time") == local_time:
obs_res = True
break
2022-04-12 09:13:20 +08:00
assert obs_res, "[test_history] FAILED: history.update() %s." % str(hist)
print("[test_history] SUCCESS: history.update() %s." % str(hist))
res["success"] += 1
2022-03-21 11:21:01 +08:00
2022-04-12 09:13:20 +08:00
assert history.clean(), "[test_history] FAILED: history.clean()."
print("[test_history] SUCCESS: history.clean().")
res["success"] += 1
2022-03-21 11:21:01 +08:00
2022-04-12 09:13:20 +08:00
res["all"] = res["success"] + res["failed"]
print("[test_history] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
2022-03-21 11:21:01 +08:00
2022-04-12 09:13:20 +08:00
def test_location():
res = {"all": 0, "success": 0, "failed": 0}
2022-03-24 13:30:00 +08:00
2022-04-12 09:13:20 +08:00
settings = Settings()
current_settings = settings.get()
gps_mode = 0x2
locator_init_params = current_settings["sys"]["locator_init_params"]
2022-03-24 13:30:00 +08:00
2022-04-12 09:13:20 +08:00
locator = Location(gps_mode, locator_init_params)
for loc_method in range(1, 8):
loc_data = locator.read(loc_method)
2022-04-12 09:13:20 +08:00
if loc_method & 0x1:
assert loc_data.get(0x1) not in ("", (), None), "[test_location] FAILED: locator.read(%s) loc_data: %s." % (loc_method, loc_data)
if loc_method & 0x2:
assert loc_data.get(0x2) not in ("", (), None), "[test_location] FAILED: locator.read(%s) loc_data: %s." % (loc_method, loc_data)
if loc_method & 0x4:
assert loc_data.get(0x4) not in ("", (), None), "[test_location] FAILED: locator.read(%s) loc_data: %s." % (loc_method, loc_data)
print("[test_location] SUCCESS: locator.read(%s) loc_data: %s." % (loc_method, loc_data))
res["success"] += 1
2022-03-24 13:30:00 +08:00
2022-04-12 09:13:20 +08:00
res["all"] = res["success"] + res["failed"]
2022-03-24 13:30:00 +08:00
2022-04-12 09:13:20 +08:00
print("[test_location] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
2022-03-24 13:30:00 +08:00
2022-04-12 09:13:20 +08:00
def test_quecthing():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
current_settings = settings.get()
2022-04-13 11:55:36 +08:00
cloud_init_params = default_values_sys._quecIot
2022-04-12 09:13:20 +08:00
cloud = QuecThing(
cloud_init_params["PK"],
cloud_init_params["PS"],
cloud_init_params["DK"],
cloud_init_params["DS"],
cloud_init_params["SERVER"],
mcu_name=PROJECT_NAME,
mcu_version=PROJECT_VERSION
2022-03-24 13:30:00 +08:00
)
2022-04-12 09:13:20 +08:00
remote_sub = RemoteSubcribe()
cloud.addObserver(remote_sub)
2022-04-13 11:55:36 +08:00
tracker = Tracker()
quec_om = QuecObjectModel()
tracker.set_cloud_om(quec_om)
tracker.init_cloud_object_module(quec_object_model)
2022-04-13 18:24:49 +08:00
gps_mode = default_values_sys._gps_mode.external
2022-04-12 09:13:20 +08:00
locator_init_params = current_settings["sys"]["locator_init_params"]
locator = Location(gps_mode, locator_init_params)
msg = "[test_quecthing] %s: cloud.set_object_model(%s)."
2022-04-13 11:55:36 +08:00
assert cloud.set_object_model(tracker.cloud_om), msg % ("FAILED", tracker.cloud_om)
print(msg % ("SUCCESS", tracker.cloud_om))
res["success"] += 1
msg = "[test_quecthing] %s: cloud.init()."
assert cloud.init(), msg % "FAILED"
2022-04-12 09:13:20 +08:00
print(msg % "SUCCESS")
res["success"] += 1
2022-04-13 11:55:36 +08:00
msg = "[test_quecthing] %s: tracker.__get_quec_loc_data(%s, %s) %s."
2022-04-13 18:24:49 +08:00
loc_method = default_values_app.loc_method.gps
2022-04-12 09:13:20 +08:00
loc_data = locator.read(loc_method)
2022-04-13 11:55:36 +08:00
quec_loc_data = tracker.__get_quec_loc_data(loc_method, loc_data.get(loc_method))
2022-04-12 09:13:20 +08:00
assert quec_loc_data != "", msg % ("FAILED", loc_method, loc_data, quec_loc_data)
print(msg % ("SUCCESS", loc_method, loc_data, quec_loc_data))
res["success"] += 1
msg = "[test_quecthing] %s: cloud.post_data(%s)."
assert cloud.post_data(quec_loc_data), msg % ("FAILED", str(quec_loc_data))
print(msg % ("SUCCESS", str(quec_loc_data)))
res["success"] += 1
msg = "[test_quecthing] %s: cloud.ota_request()."
assert cloud.ota_request(), msg % ("FAILED",)
print(msg % ("SUCCESS",))
res["success"] += 1
# # PASS: No OTA Plain, ota_action Return False
# msg = "[test_quecthing] %s: cloud.ota_action()."
# assert cloud.ota_action() is True, msg % ("FAILED",)
# print(msg % ("SUCCESS",))
msg = "[test_quecthing] %s: cloud.close()."
assert cloud.close(), msg % "FAILED"
2022-04-12 09:13:20 +08:00
print(msg % "SUCCESS")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_quecthing] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
2022-03-31 13:41:08 +08:00
2022-04-13 11:55:36 +08:00
def test_aliyuniot():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
current_settings = settings.get()
cloud_init_params = default_values_sys._AliYun
cloud = AliYunIot(
cloud_init_params["PK"],
cloud_init_params["PS"],
cloud_init_params["DK"],
cloud_init_params["DS"],
cloud_init_params["SERVER"],
burning_method=1,
mcu_name=PROJECT_NAME,
mcu_version=PROJECT_VERSION,
firmware_name=DEVICE_FIRMWARE_NAME,
firmware_version=DEVICE_FIRMWARE_VERSION
)
remote_sub = RemoteSubcribe()
cloud.addObserver(remote_sub)
tracker = Tracker()
ali_om = AliObjectModel()
tracker.set_cloud_om(ali_om)
tracker.init_cloud_object_module(ali_object_model)
2022-04-13 18:24:49 +08:00
gps_mode = default_values_sys._gps_mode.external
2022-04-13 11:55:36 +08:00
locator_init_params = current_settings["sys"]["locator_init_params"]
locator = Location(gps_mode, locator_init_params)
msg = "[test_aliyuniot] %s: cloud.set_object_model(%s)."
assert cloud.set_object_model(tracker.cloud_om), msg % ("FAILED", tracker.cloud_om)
print(msg % ("SUCCESS", tracker.cloud_om))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.init()."
assert cloud.init(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_aliyuniot] %s: tracker.__get_ali_loc_data(%s, %s) %s."
2022-04-13 18:24:49 +08:00
loc_method = default_values_app.loc_method.gps
2022-04-13 11:55:36 +08:00
loc_data = locator.read(loc_method)
ali_loc_data = tracker.__get_ali_loc_data(loc_method, loc_data.get(loc_method))
assert ali_loc_data != "", msg % ("FAILED", loc_method, loc_data, ali_loc_data)
print(msg % ("SUCCESS", loc_method, loc_data, ali_loc_data))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.post_data(%s)."
assert cloud.post_data(ali_loc_data), msg % ("FAILED", str(ali_loc_data))
print(msg % ("SUCCESS", str(ali_loc_data)))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.ota_request()."
assert cloud.ota_request(), msg % ("FAILED",)
print(msg % ("SUCCESS",))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.device_report()."
assert cloud.device_report(), msg % ("FAILED",)
print(msg % ("SUCCESS",))
res["success"] += 1
# # PASS: No OTA Plain, ota_action Return False
# msg = "[test_aliyuniot] %s: cloud.ota_action()."
# assert cloud.ota_action() is True, msg % ("FAILED",)
# print(msg % ("SUCCESS",))
msg = "[test_aliyuniot] %s: cloud.close()."
assert cloud.close() and cloud.__ali.getAliyunSta() != 0, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_aliyuniot] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
2022-04-12 09:13:20 +08:00
def test_remote():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
current_settings = settings.get()
cloud_init_params = current_settings["sys"]["cloud_init_params"]
cloud = QuecThing(
cloud_init_params["PK"],
cloud_init_params["PS"],
cloud_init_params["DK"],
cloud_init_params["DS"],
cloud_init_params["SERVER"],
mcu_name=PROJECT_NAME,
mcu_version=PROJECT_VERSION
)
remote_sub = RemoteSubcribe()
cloud.addObserver(remote_sub)
remote_pub = RemotePublish()
msg = "[test_remote] %s: cloud.cloud_init()."
assert cloud.cloud_init(), msg % "FAILED"
print(msg % "SUCCESS")
msg = "[test_remote] %s: remote_pub.set_cloud(cloud)."
assert remote_pub.set_cloud(cloud), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_remote] %s: remote_pub.get_cloud()."
assert isinstance(remote_pub.get_cloud(), QuecThing), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_remote] %s: remote_pub.cloud_ota_check()."
assert remote_pub.cloud_ota_check(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
# # PASS: No OTA Plain, ota_action Return False
# msg = "[test_remote] %s: remote_pub.ota_request()."
# assert remote_pub.ota_request(), msg % "FAILED"
# print(msg % "SUCCESS")
gps_mode = 0x2
locator_init_params = current_settings["sys"]["locator_init_params"]
locator = Location(gps_mode, locator_init_params)
loc_method = 0x1
loc_data = locator.read(loc_method)
quec_loc_data = cloud.get_loc_data(loc_method, loc_data.get(loc_method))
msg = "[test_remote] %s: remote_pub.post_data(%s)."
assert remote_pub.post_data(quec_loc_data), msg % ("FAILED", str(quec_loc_data))
print(msg % ("SUCCESS", str(quec_loc_data)))
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_remote] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
class TestRTCObserver(Observer):
def update(self, observable, *args, **kwargs):
log.debug("observable: %s" % observable)
log.debug("args: %s" % str(args))
log.debug("kwargs: %s" % str(kwargs))
observable.start_rtc()
return True
def test_low_energy_rtc():
res = {"all": 0, "success": 0, "failed": 0}
low_energy_rtc = LowEnergyRTC()
test_rtc_obs = TestRTCObserver()
low_energy_rtc.addObserver(test_rtc_obs)
period = 5
msg = "[test_low_energy_rtc] %s: low_energy_rtc.set_period(%s)."
assert low_energy_rtc.set_period(period), msg % ("FAILED", period)
print(msg % ("SUCCESS", period))
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.get_period()."
assert low_energy_rtc.get_period() == period, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
low_energy_method = "PM"
msg = "[test_low_energy_rtc] %s: low_energy_rtc.set_low_energy_method(%s)."
assert low_energy_rtc.set_low_energy_method(low_energy_method), msg % ("FAILED", low_energy_method)
print(msg % ("SUCCESS", low_energy_method))
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.get_low_energy_method()."
assert low_energy_rtc.get_low_energy_method() == low_energy_method, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.low_energy_init()."
assert low_energy_rtc.low_energy_init(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.get_lpm_fd()."
assert low_energy_rtc.get_lpm_fd() is not None, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.start_rtc()."
assert low_energy_rtc.start_rtc(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
utime.sleep(period * 3 + 1)
msg = "[test_low_energy_rtc] %s: low_energy_rtc.enable_rtc(0)."
assert low_energy_rtc.enable_rtc(0), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_low_energy_rtc] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
2022-04-07 14:56:12 +08:00
2022-04-12 09:13:20 +08:00
def test_tracker():
2022-04-13 15:34:07 +08:00
tracker_main()
2022-04-07 14:56:12 +08:00
2022-04-12 09:13:20 +08:00
def main():
# test_logger()
# test_settings()
# test_battery()
# test_history()
# test_location()
# test_quecthing()
2022-04-13 15:34:07 +08:00
# test_aliyuniot()
# test_remote()
2022-04-13 11:55:36 +08:00
# test_low_energy_rtc()
2022-04-13 15:34:07 +08:00
test_tracker()
2022-04-07 14:56:12 +08:00
2022-04-12 09:13:20 +08:00
if __name__ == "__main__":
main()