demo.tracker-v2/code/test_tracker.py
2022-04-14 15:24:06 +08:00

514 lines
18 KiB
Python

# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import modem
import utime
import osTimer
from usr.logging import Logger
from usr.tracker import Collector, tracker
from usr.battery import Battery
from usr.history import History
from usr.location import Location, _loc_method
from usr.quecthing import QuecThing, QuecObjectModel
from usr.aliyunIot import AliYunIot, AliObjectModel
from usr.mpower import LowEnergyRTC
from usr.common import Observable, Observer
from usr.remote import RemoteSubcribe, RemotePublish
from usr.settings import Settings, PROJECT_NAME, PROJECT_VERSION, \
DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION, LocConfig
log = Logger(__name__)
def test_led():
pass
def test_logger():
res = {"all": 0, "success": 0, "failed": 0}
log = Logger("test_logger")
log.debug("debug Level Log.")
log.info("info Level Log.")
log.warn("warn Level Log.")
log.error("error Level Log.")
log.critical("critical Level Log.")
assert log.get_debug() is True, "[test_logger] FAILED: log.get_debug() is not True."
print("[test_logger] SUCCESS: log.get_debug() is True.")
res["success"] += 1
assert log.set_debug(True) is True, "[test_logger] FAILED: log.set_debug(True)."
print("[test_logger] SUCCESS: log.set_debug(True).")
res["success"] += 1
assert log.set_debug(False) is True, "[test_logger] FAILED: log.set_debug(False)."
print("[test_logger] SUCCESS: log.set_debug(False).")
res["success"] += 1
assert log.get_level() == "debug", "[test_logger] FAILED: log.get_level() is not debug."
print("[test_logger] SUCCESS: log.get_level() is debug.")
res["success"] += 1
for level in ("debug", "info", "warn", "error", "critical"):
assert log.set_level(level) is True and log.get_level() == level, "[test_logger] FAILED: log.set_level(%s)." % level
print("[test_logger] SUCCESS: log.set_level(%s)." % level)
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_logger] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
def test_settings():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
assert settings.init() is True, "[test_settings] FAILED: Settings.init()."
print("[test_settings] SUCCESS: Settings.init().")
res["success"] += 1
current_settings = settings.get()
assert current_settings and isinstance(current_settings, dict)
print("[test_settings] SUCCESS: Settings.get().")
res["success"] += 1
for key, val in current_settings.get("user_cfg", {}).items():
val = "18888888888" if key == "phone_num" else val
if key == "work_mode_timeline":
set_res = False
else:
set_res = True
assert settings.set(key, val) is set_res, "[test_settings] FAILED: APP Settings.set(%s, %s)." % (key, val)
print("[test_settings] SUCCESS: APP Settings.set(%s, %s)." % (key, val))
res["success"] += 1
cloud_params = current_settings["cloud"]
assert settings.set("cloud", cloud_params) is True, "[test_settings] FAILED: SYS Settings.set(%s, %s)." % ("cloud", str(cloud_params))
print("[test_settings] SUCCESS: SYS Settings.set(%s, %s)." % ("cloud", str(cloud_params)))
res["success"] += 1
assert settings.save() is True, "[test_settings] FAILED: Settings.save()."
print("[test_settings] SUCCESS: Settings.save().")
res["success"] += 1
assert settings.reset() is True, "[test_settings] FAILED: Settings.reset()."
print("[test_settings] SUCCESS: Settings.reset().")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_settings] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
run_time = 0
def get_voltage_cb(args):
global run_time
run_time += 5
def test_battery():
res = {"all": 0, "success": 0, "failed": 0}
battery = Battery()
temp = 30
msg = "[test_battery] %s: battery.set_temp(30)."
assert battery.set_temp(temp) and battery.__temp == temp, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
timer = osTimer()
timer.start(5, 1, get_voltage_cb)
voltage = battery.get_voltage()
timer.stop()
global run_time
print("[test_battery] battery.get_voltage() run_time: %sms" % run_time)
msg = "[test_battery] %s: battery.get_voltage() %s."
assert isinstance(voltage, int) and voltage > 0, msg % ("FAILED", voltage)
print(msg % ("SUCCESS", voltage))
res["success"] += 1
energy = battery.get_energy()
assert isinstance(energy, int) and energy >= 0, "[test_battery] FAILED: battery.get_energy() %s." % energy
print("[test_battery] SUCCESS: battery.get_energy() is %s." % energy)
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_battery] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
class TestHistObservable(Observable):
def produce_hist_data(self, local_time):
hist_data = [{"local_time": local_time}]
self.notifyObservers(self, *hist_data)
def test_history():
res = {"all": 0, "success": 0, "failed": 0}
history = History()
test_hist_obs = TestHistObservable()
test_hist_obs.addObserver(history)
hist_data = [{"test": "test"}]
assert history.write(hist_data), "[test_history] FAILED: history.write()."
print("[test_history] SUCCESS: history.write(%s)." % str(hist_data))
res["success"] += 1
hist = history.read()
assert hist.get("data") is not None and isinstance(hist["data"], list), "[test_history] FAILED: history.read() %s." % hist
print("[test_history] SUCCESS: history.read() is %s." % hist)
res["success"] += 1
local_time = utime.mktime(utime.localtime())
test_hist_obs.produce_hist_data(local_time)
hist = history.read()
obs_res = False
for i in hist.get("data", []):
if i.get("local_time") == local_time:
obs_res = True
break
assert obs_res, "[test_history] FAILED: history.update() %s." % str(hist)
print("[test_history] SUCCESS: history.update() %s." % str(hist))
res["success"] += 1
assert history.clean(), "[test_history] FAILED: history.clean()."
print("[test_history] SUCCESS: history.clean().")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_history] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
def test_location():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
current_settings = settings.get()
gps_mode = 0x2
locator_init_params = current_settings["LocConfig"]["locator_init_params"]
locator = Location(gps_mode, locator_init_params)
for loc_method in range(1, 8):
loc_data = locator.read(loc_method)
if loc_method & 0x1:
assert loc_data.get(0x1) not in ("", (), None), "[test_location] FAILED: locator.read(%s) loc_data: %s." % (loc_method, loc_data)
if loc_method & 0x2:
assert loc_data.get(0x2) not in ("", (), None), "[test_location] FAILED: locator.read(%s) loc_data: %s." % (loc_method, loc_data)
if loc_method & 0x4:
assert loc_data.get(0x4) not in ("", (), None), "[test_location] FAILED: locator.read(%s) loc_data: %s." % (loc_method, loc_data)
print("[test_location] SUCCESS: locator.read(%s) loc_data: %s." % (loc_method, loc_data))
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_location] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
def test_quecthing():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
current_settings = settings.get()
cloud_init_params = current_settings["cloud"]
cloud = QuecThing(
cloud_init_params["PK"],
cloud_init_params["PS"],
cloud_init_params["DK"],
cloud_init_params["DS"],
cloud_init_params["SERVER"],
mcu_name=PROJECT_NAME,
mcu_version=PROJECT_VERSION
)
remote_sub = RemoteSubcribe()
cloud.addObserver(remote_sub)
collector = Collector()
quec_om = QuecObjectModel()
collector.add_module(quec_om)
collector.init_cloud_object_module(cloud_init_params["object_model"])
gps_mode = LocConfig._gps_mode.external
locator_init_params = current_settings["LocConfig"]["locator_init_params"]
locator = Location(gps_mode, locator_init_params)
msg = "[test_quecthing] %s: cloud.set_object_model(%s)."
assert cloud.set_object_model(collector.cloud_om), msg % ("FAILED", collector.cloud_om)
print(msg % ("SUCCESS", collector.cloud_om))
res["success"] += 1
msg = "[test_quecthing] %s: cloud.init()."
assert cloud.init(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_quecthing] %s: collector.__get_quec_loc_data(%s, %s) %s."
loc_method = _loc_method.gps
loc_data = locator.read(loc_method)
quec_loc_data = collector.__get_quec_loc_data(loc_method, loc_data.get(loc_method))
assert quec_loc_data != "", msg % ("FAILED", loc_method, loc_data, quec_loc_data)
print(msg % ("SUCCESS", loc_method, loc_data, quec_loc_data))
res["success"] += 1
msg = "[test_quecthing] %s: cloud.post_data(%s)."
assert cloud.post_data(quec_loc_data), msg % ("FAILED", str(quec_loc_data))
print(msg % ("SUCCESS", str(quec_loc_data)))
res["success"] += 1
msg = "[test_quecthing] %s: cloud.ota_request()."
assert cloud.ota_request(), msg % ("FAILED",)
print(msg % ("SUCCESS",))
res["success"] += 1
# # PASS: No OTA Plain, ota_action Return False
# msg = "[test_quecthing] %s: cloud.ota_action()."
# assert cloud.ota_action() is True, msg % ("FAILED",)
# print(msg % ("SUCCESS",))
msg = "[test_quecthing] %s: cloud.close()."
assert cloud.close(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_quecthing] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
def test_aliyuniot():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
current_settings = settings.get()
cloud_init_params = current_settings["cloud"]
client_id = cloud_init_params["DK"] if cloud_init_params["DK"] else modem.getDevImei()
cloud = AliYunIot(
cloud_init_params["PK"],
cloud_init_params["PS"],
cloud_init_params["DK"],
cloud_init_params["DS"],
cloud_init_params["SERVER"],
client_id,
burning_method=cloud_init_params["burning_method"],
mcu_name=PROJECT_NAME,
mcu_version=PROJECT_VERSION,
firmware_name=DEVICE_FIRMWARE_NAME,
firmware_version=DEVICE_FIRMWARE_VERSION
)
remote_sub = RemoteSubcribe()
cloud.addObserver(remote_sub)
collector = Collector()
ali_om = AliObjectModel()
collector.add_module(ali_om)
collector.init_cloud_object_module(cloud_init_params["object_model"])
gps_mode = LocConfig._gps_mode.external
locator_init_params = current_settings["LocConfig"]["locator_init_params"]
locator = Location(gps_mode, locator_init_params)
msg = "[test_aliyuniot] %s: cloud.set_object_model(%s)."
assert cloud.set_object_model(collector.cloud_om), msg % ("FAILED", collector.cloud_om)
print(msg % ("SUCCESS", collector.cloud_om))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.init()."
assert cloud.init(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_aliyuniot] %s: collector.__get_ali_loc_data(%s, %s) %s."
loc_method = _loc_method.gps
loc_data = locator.read(loc_method)
ali_loc_data = collector.__get_ali_loc_data(loc_method, loc_data.get(loc_method))
assert ali_loc_data != "", msg % ("FAILED", loc_method, loc_data, ali_loc_data)
print(msg % ("SUCCESS", loc_method, loc_data, ali_loc_data))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.post_data(%s)."
assert cloud.post_data(ali_loc_data), msg % ("FAILED", str(ali_loc_data))
print(msg % ("SUCCESS", str(ali_loc_data)))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.ota_request()."
assert cloud.ota_request(), msg % ("FAILED",)
print(msg % ("SUCCESS",))
res["success"] += 1
msg = "[test_aliyuniot] %s: cloud.device_report()."
assert cloud.device_report(), msg % ("FAILED",)
print(msg % ("SUCCESS",))
res["success"] += 1
# # PASS: No OTA Plain, ota_action Return False
# msg = "[test_aliyuniot] %s: cloud.ota_action()."
# assert cloud.ota_action() is True, msg % ("FAILED",)
# print(msg % ("SUCCESS",))
msg = "[test_aliyuniot] %s: cloud.close()."
assert cloud.close() and cloud.__ali.getAliyunSta() != 0, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_aliyuniot] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
def test_remote():
res = {"all": 0, "success": 0, "failed": 0}
settings = Settings()
current_settings = settings.get()
cloud_init_params = current_settings["cloud"]
cloud = QuecThing(
cloud_init_params["PK"],
cloud_init_params["PS"],
cloud_init_params["DK"],
cloud_init_params["DS"],
cloud_init_params["SERVER"],
mcu_name=PROJECT_NAME,
mcu_version=PROJECT_VERSION
)
remote_sub = RemoteSubcribe()
cloud.addObserver(remote_sub)
remote_pub = RemotePublish()
msg = "[test_remote] %s: cloud.cloud_init()."
assert cloud.cloud_init(), msg % "FAILED"
print(msg % "SUCCESS")
msg = "[test_remote] %s: remote_pub.set_cloud(cloud)."
assert remote_pub.set_cloud(cloud), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_remote] %s: remote_pub.get_cloud()."
assert isinstance(remote_pub.get_cloud(), QuecThing), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_remote] %s: remote_pub.cloud_ota_check()."
assert remote_pub.cloud_ota_check(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
# # PASS: No OTA Plain, ota_action Return False
# msg = "[test_remote] %s: remote_pub.ota_request()."
# assert remote_pub.ota_request(), msg % "FAILED"
# print(msg % "SUCCESS")
gps_mode = 0x2
locator_init_params = current_settings["user_cfg"]["locator_init_params"]
locator = Location(gps_mode, locator_init_params)
loc_method = 0x1
loc_data = locator.read(loc_method)
quec_loc_data = cloud.get_loc_data(loc_method, loc_data.get(loc_method))
msg = "[test_remote] %s: remote_pub.post_data(%s)."
assert remote_pub.post_data(quec_loc_data), msg % ("FAILED", str(quec_loc_data))
print(msg % ("SUCCESS", str(quec_loc_data)))
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_remote] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
class TestRTCObserver(Observer):
def update(self, observable, *args, **kwargs):
log.debug("observable: %s" % observable)
log.debug("args: %s" % str(args))
log.debug("kwargs: %s" % str(kwargs))
observable.start_rtc()
return True
def test_low_energy_rtc():
res = {"all": 0, "success": 0, "failed": 0}
low_energy_rtc = LowEnergyRTC()
test_rtc_obs = TestRTCObserver()
low_energy_rtc.addObserver(test_rtc_obs)
period = 5
msg = "[test_low_energy_rtc] %s: low_energy_rtc.set_period(%s)."
assert low_energy_rtc.set_period(period), msg % ("FAILED", period)
print(msg % ("SUCCESS", period))
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.get_period()."
assert low_energy_rtc.get_period() == period, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
low_energy_method = "PM"
msg = "[test_low_energy_rtc] %s: low_energy_rtc.set_low_energy_method(%s)."
assert low_energy_rtc.set_low_energy_method(low_energy_method), msg % ("FAILED", low_energy_method)
print(msg % ("SUCCESS", low_energy_method))
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.get_low_energy_method()."
assert low_energy_rtc.get_low_energy_method() == low_energy_method, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.low_energy_init()."
assert low_energy_rtc.low_energy_init(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.get_lpm_fd()."
assert low_energy_rtc.get_lpm_fd() is not None, msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
msg = "[test_low_energy_rtc] %s: low_energy_rtc.start_rtc()."
assert low_energy_rtc.start_rtc(), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
utime.sleep(period * 3 + 1)
msg = "[test_low_energy_rtc] %s: low_energy_rtc.enable_rtc(0)."
assert low_energy_rtc.enable_rtc(0), msg % "FAILED"
print(msg % "SUCCESS")
res["success"] += 1
res["all"] = res["success"] + res["failed"]
print("[test_low_energy_rtc] ALL: %s SUCCESS: %s, FAILED: %s." % (res["all"], res["success"], res["failed"]))
def test_tracker():
tracker()
def main():
# test_logger()
# test_settings()
# test_battery()
# test_history()
# test_location()
# test_quecthing()
# test_aliyuniot()
# test_remote()
# test_low_energy_rtc()
test_tracker()
if __name__ == "__main__":
main()