demo.dtu/code/dtu_transaction.py

355 lines
13 KiB
Python
Raw Normal View History

2022-09-01 20:31:51 +08:00
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@file :dtu_transaction.py
@author :elian.wang@quectel.com
@brief :Dtu transaction related interfaces
@version :0.1
@date :2022-08-04 11:33:23
@copyright :Copyright (c) 2022
"""
import log
import usys
import ujson
import utime
import _thread
from usr.modules.common import Singleton
from usr.modules.logging import getLogger
from usr.modules.serial import Serial
from usr.modules.remote import RemotePublish
from usr.modules.history import History
from usr.settings import settings
from usr.settings import PROJECT_NAME, PROJECT_VERSION, DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION
log = getLogger(__name__)
class DownlinkTransaction(Singleton):
"""Data downlink:Receive data from the cloud and send it to serial
"""
def __init__(self):
self.__serial = None
def add_module(self, module, callback=None):
if isinstance(module, Serial):
self.__serial = module
return True
return False
def __get_sub_topic_id(self, topic):
"""Locate the topic id from the cloud setting
Args:
topic (str): topic in mqtt protocol
Returns:
str: topic id
"""
cloud_name = settings.current_settings["system_config"]["cloud"]
cloud_config = settings.current_settings.get(cloud_name + "_config")
if cloud_config == None:
raise Exception("Cloud config parameter error")
for k, v in cloud_config.get("subscribe").items():
if topic == v:
return k
def downlink_main(self, *args, **kwargs):
"""Parsing cloud data, send to serial port
Args:
args (tuple): Not use
kwargs (dict): The data received by the cloud,contains topic and data
"""
# Get mqtt protocol message id
cloud_type = settings.current_settings["system_config"]["cloud"]
if cloud_type in ["aliyun", "txyun", "hwyun", "mqtt_private_cloud"]:
msg_id = self.__get_sub_topic_id(kwargs.get("topic"))
if msg_id == None:
raise Exception("Not found correct topic id")
elif cloud_type == "quecthing":
msg_id = kwargs.get("pkgid")
elif cloud_type == "tcp_private_cloud":
msg_id = None
else:
raise Exception("This cloud is not support now")
if isinstance(kwargs["data"], bytes):
data = kwargs["data"].decode()
elif isinstance(kwargs["data"], dict):
data = ujson.dumps(kwargs["data"])
elif isinstance(kwargs["data"], str):
data = kwargs["data"]
else:
data = str(kwargs["data"])
if cloud_type == "tcp_private_cloud":
packed_data = data
else:
packed_data = "%s,%s,%s".encode('utf-8') % (str(msg_id), str(len(data)), data)
# Send packed data through serial
self.__serial.write(packed_data)
class OtaTransaction(Singleton):
"""Device firmware OTA and project file OTA transaction
"""
def __init__(self):
self.__remote_pub = None
def __remote_ota_check(self):
if not self.__remote_pub:
raise TypeError("self.__remote_pub is not registered.")
return self.__remote_pub.cloud_ota_check()
def __remote_ota_action(self, action, module):
if not self.__remote_pub:
raise TypeError("self.__remote_pub is not registered.")
return self.__remote_pub.cloud_ota_action(action, module)
def __remote_device_report(self):
if not self.__remote_pub:
raise TypeError("self.__remote_pub is not registered.")
return self.__remote_pub.cloud_device_report()
def add_module(self, module, callback=None):
if isinstance(module, RemotePublish):
self.__remote_pub = module
return True
return False
def ota_check(self):
"""After powering on, release module information and check for update
"""
print("ota_check")
try:
if settings.current_settings["system_config"]["base_function"]["fota"]:
self.__remote_ota_check()
self.__remote_device_report()
utime.sleep(1)
except Exception as e:
log.error("periodic_ota_check fault", e)
def event_ota_plain(self, *args, **kwargs):
"""Determine the parameters and perform the OTA plan
Args:
args (tuple): Ota parameters sent from the cloud
kwargs (dict): None
"""
log.debug("ota_plain args: %s, kwargs: %s" % (str(args), str(kwargs)))
current_settings = settings.get()
if current_settings["system_config"]["cloud"] == "quecthing":
if args and args[0]:
if args[0][0] == "ota_cfg":
module = args[0][1].get("componentNo")
target_version = args[0][1].get("targetVersion")
if module == DEVICE_FIRMWARE_NAME and current_settings["system_config"]["base_function"]["fota"] == True:
source_version = DEVICE_FIRMWARE_VERSION
elif module == PROJECT_NAME and current_settings["system_config"]["base_function"]["sota"] == True:
source_version = PROJECT_VERSION
else:
return
if target_version != source_version:
self.__remote_ota_action(action=1, module=module)
elif current_settings["system_config"]["cloud"] == "aliyun":
if args and args[0]:
if args[0][0] == "ota_cfg":
module = args[0][1].get("module")
target_version = args[0][1].get("version")
if module == DEVICE_FIRMWARE_NAME and current_settings["system_config"]["base_function"]["fota"] == 1:
source_version = DEVICE_FIRMWARE_VERSION
elif module == PROJECT_NAME and current_settings["system_config"]["base_function"]["sota"] == 1:
source_version = PROJECT_VERSION
else:
return
if target_version != source_version:
self.__remote_ota_action(action=1, module=module)
else:
log.error("Current Cloud Not Supported OTA!")
class UplinkTransaction(Singleton):
"""Data uplink: read data from the serial and send it to cloud
"""
def __init__(self):
self.__remote_pub = None
self.__serial = None
self.__history = None
self.__parse_data = ""
self.__send_to_cloud_data = []
def __remote_post_data(self, data=None, topic_id=None):
if not self.__remote_pub:
raise TypeError("self.__remote_pub is not registered.")
return self.__remote_pub.post_data(data, topic_id)
def __get_pub_topic_id_list(self):
"""Get the publish topic id list in setting
Returns:
str: publist topic id list
"""
cloud_name = settings.current_settings["system_config"]["cloud"]
cloud_config = settings.current_settings.get(cloud_name + "_config")
if cloud_config == None:
raise Exception("Cloud config parameter error")
elif cloud_name == "quecthing":
return ["0"]
else:
return cloud_config.get("subscribe").keys()
def __parse(self):
"""Recursive parse uart data
"""
params_list = self.__parse_data.split(",", 2)
if len(params_list) < 3: # The received data is not a complete frame
return
topic_id = params_list[0]
data_len = params_list[1]
msg_data = params_list[2]
pub_topic_id_list = self.__get_pub_topic_id_list()
if int(data_len) > len(msg_data): # The received data is not a complete frame
return
elif int(data_len) < len(msg_data): # The received data may contain part of the data of another frame
if topic_id in pub_topic_id_list:
self.__send_to_cloud_data.append((topic_id, msg_data[:int(data_len)]))
# Frame format: <topic_id>,<data_len>,<data>
frame_len = len(topic_id)+len(data_len)+int(data_len)+2
self.__parse_data = self.__parse_data[frame_len:]
self.__parse()
else:
self.__parse_data = "" # Read data format eroor,restart read
return
else:
self.__parse_data = ""
if topic_id in pub_topic_id_list:
self.__send_to_cloud_data.append((topic_id, msg_data))
else:
return
def __mqtt_protocol_uart_data_parse(self, data):
"""When cloud is mqtt protocol, parse uart data.
Args:
data (str): Data read from uart
Returns:
list: topic_id and data tuple list
"""
self.__parse_data += data
self.__send_to_cloud_data = []
self.__parse()
def __send_mqtt_data(self, data):
for send_data in data:
self.__remote_post_data(data=send_data[1], topic_id=send_data[0])
utime.sleep_ms(10)
def __uplink_data(self, data):
"""Parsing uart data, send data to cloud
Args:
data (bytes): data read from uart
"""
if settings.current_settings["system_config"]["cloud"] == "tcp_private_cloud":
self.__remote_post_data(data=data)
else:
try:
self.__mqtt_protocol_uart_data_parse(data)
if len(self.__send_to_cloud_data) != 0:
_thread.start_new_thread(self.__send_mqtt_data, (self.__send_to_cloud_data,))
except Exception as e:
log.error(e)
def __post_history_data(self, data):
"""Post history data to cloud
Args:
data (str): history data
Returns:
True: Successfully post
False:Failure to post
"""
log.info("post_history_data")
try:
if settings.current_settings["system_config"]["cloud"] == "tcp_private_cloud_config":
return self.__remote_post_data(data=data)
else:
# In this case, topic id is not used specified, topic id default 0.
return self.__remote_post_data(data=data, topic_id=0)
except Exception as e:
log.error(e)
return False
def add_module(self, module, callback=None):
if isinstance(module, RemotePublish):
self.__remote_pub = module
return True
elif isinstance(module, Serial):
self.__serial = module
return True
elif isinstance(module, History):
self.__history = module
return True
return False
def uplink_main(self):
"""Read serial data, parse and upload to the cloud
"""
while 1:
# Read uart data
read_byte = self.__serial.read(nbytes=1024, timeout=100)
if read_byte:
try:
self.__uplink_data(read_byte)
except Exception as e:
usys.print_exception(e)
log.error("Parse uart data error: %s" % e)
def report_history(self):
"""Report history data to cloud
Returns:
boolen: True: Successfully post
False:Failure to post
"""
if not self.__history:
raise TypeError("self.__history is not registered.")
res = True
hist = self.__history.read()
print("hist[data]:", hist["data"])
if hist["data"]:
pt_count = 0
for i, data in enumerate(hist["data"]):
pt_count += 1
if not self.__post_history_data(data):
res = False
break
hist["data"] = hist["data"][pt_count:]
if hist["data"]:
# Flush data in hist-dictionary to tracker_data.hist file.
self.__history.write(hist["data"])
return res