mirror of
https://gitee.com/qpy-solutions/dtu.git
synced 2025-05-19 19:18:24 +08:00
238 lines
11 KiB
Python
238 lines
11 KiB
Python
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
@file :dtu.py
|
|
@author :elian.wang@quectel.com
|
|
@brief :dtu main function
|
|
@version :0.1
|
|
@date :2022-05-18 09:12:37
|
|
@copyright :Copyright (c) 2022
|
|
"""
|
|
|
|
import _thread
|
|
import modem
|
|
import osTimer
|
|
from usr.modules.common import Singleton
|
|
from usr.modules.aliyunIot import AliYunIot
|
|
from usr.modules.quecthing import QuecThing
|
|
from usr.modules.mqttIot import MqttIot
|
|
from usr.modules.huawei_cloud import HuaweiIot
|
|
from usr.modules.txyunIot import TXYunIot
|
|
from usr.modules.socketIot import Socket
|
|
|
|
from usr.settings import settings
|
|
from usr.modules.serial import Serial
|
|
from usr.modules.history import History
|
|
from usr.modules.logging import getLogger
|
|
from usr.dtu_transaction import DownlinkTransaction, OtaTransaction, UplinkTransaction, GuiToolsInteraction
|
|
from usr.modules.remote import RemotePublish, RemoteSubscribe
|
|
from usr.settings import PROJECT_NAME, PROJECT_VERSION, DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION
|
|
|
|
log = getLogger(__name__)
|
|
|
|
|
|
class Dtu(Singleton):
|
|
"""Dtu main function call
|
|
"""
|
|
def __init__(self):
|
|
self.__ota_timer = osTimer()
|
|
self.__ota_transaction = None
|
|
|
|
def __cloud_init(self, protocol):
|
|
"""Cloud initialization and connection
|
|
|
|
Args:
|
|
protocol (str): cloud type name
|
|
|
|
Returns:
|
|
object: cloud object
|
|
"""
|
|
if protocol == "aliyun":
|
|
cloud_config = settings.current_settings.get("aliyun_config")
|
|
client_id = cloud_config["client_id"] if cloud_config.get("client_id") else modem.getDevImei()
|
|
cloud = AliYunIot(cloud_config.get("PK"),
|
|
cloud_config.get("PS"),
|
|
cloud_config.get("DK"),
|
|
cloud_config.get("DS"),
|
|
cloud_config.get("server"),
|
|
int(cloud_config.get("qos", 0)),
|
|
client_id,
|
|
cloud_config.get("publish"),
|
|
cloud_config.get("subscribe"),
|
|
cloud_config.get("burning_method"),
|
|
cloud_config.get("keep_alive"),
|
|
mcu_name=PROJECT_NAME,
|
|
mcu_version=PROJECT_VERSION,
|
|
firmware_name=DEVICE_FIRMWARE_NAME,
|
|
firmware_version=DEVICE_FIRMWARE_VERSION
|
|
)
|
|
cloud.init(enforce=True)
|
|
return cloud
|
|
elif protocol == ("quecthing"):
|
|
cloud_config = settings.current_settings.get("quecthing_config")
|
|
cloud = QuecThing(cloud_config.get("PK"),
|
|
cloud_config.get("PS"),
|
|
cloud_config.get("DK"),
|
|
cloud_config.get("DS"),
|
|
cloud_config.get("server")+":"+cloud_config.get("port"),
|
|
int(cloud_config.get("qos", 0)),
|
|
cloud_config.get("keep_alive"),
|
|
mcu_name=PROJECT_NAME,
|
|
mcu_version=PROJECT_VERSION)
|
|
cloud.init(enforce=True)
|
|
return cloud
|
|
elif protocol == "txyun":
|
|
cloud_config = settings.current_settings.get("txyun_config")
|
|
client_id = cloud_config["client_id"] if cloud_config.get("client_id") else modem.getDevImei()
|
|
cloud = TXYunIot(cloud_config.get("PK"),
|
|
cloud_config.get("PS"),
|
|
cloud_config.get("DK"),
|
|
cloud_config.get("DS"),
|
|
cloud_config.get("clean_session", False),
|
|
client_id,
|
|
cloud_config.get("publish"),
|
|
cloud_config.get("subscribe"),
|
|
cloud_config.get("burning_method"),
|
|
cloud_config.get("keep_alive"),
|
|
mcu_name=PROJECT_NAME,
|
|
mcu_version=PROJECT_VERSION,
|
|
firmware_name=DEVICE_FIRMWARE_NAME,
|
|
firmware_version=DEVICE_FIRMWARE_VERSION
|
|
)
|
|
cloud.init(enforce=True)
|
|
return cloud
|
|
elif protocol == "hwyun":
|
|
cloud_config = settings.current_settings.get("hwyun_config")
|
|
client_id = cloud_config["client_id"] if cloud_config.get("client_id") else modem.getDevImei()
|
|
cloud = HuaweiIot(cloud_config.get("PK", None),
|
|
cloud_config.get("PS", None),
|
|
cloud_config.get("DK", None),
|
|
cloud_config.get("DS", None),
|
|
cloud_config.get("server", None),
|
|
int(cloud_config.get("qos", 0)),
|
|
int(cloud_config.get("port", 1883)),
|
|
cloud_config.get("clean_session"),
|
|
client_id,
|
|
cloud_config.get("publish"),
|
|
cloud_config.get("subscribe"),
|
|
cloud_config.get("keep_alive"),
|
|
mcu_name=PROJECT_NAME,
|
|
mcu_version=PROJECT_VERSION,
|
|
firmware_name=DEVICE_FIRMWARE_NAME,
|
|
firmware_version=DEVICE_FIRMWARE_VERSION
|
|
)
|
|
cloud.init(enforce=True)
|
|
return cloud
|
|
elif protocol.startswith("mqtt"):
|
|
cloud_config = settings.current_settings.get("mqtt_private_cloud_config")
|
|
client_id = cloud_config["client_id"] if cloud_config.get("client_id") else modem.getDevImei()
|
|
cloud = MqttIot(cloud_config.get("server", None),
|
|
int(cloud_config.get("qos", 0)),
|
|
int(cloud_config.get("port", 1883)),
|
|
cloud_config.get("clean_session"),
|
|
client_id,
|
|
cloud_config.get("password"),
|
|
cloud_config.get("publish"),
|
|
cloud_config.get("subscribe"),
|
|
cloud_config.get("keep_alive")
|
|
)
|
|
cloud.init(enforce=True)
|
|
return cloud
|
|
elif protocol.startswith("tcp"):
|
|
cloud_config = settings.current_settings.get("tcp_private_cloud_config")
|
|
cloud = Socket(ip_type = cloud_config.get("ip_type"),
|
|
keep_alive = cloud_config.get("keep_alive"),
|
|
domain = cloud_config.get("server"),
|
|
port = int(cloud_config.get("port")),
|
|
)
|
|
cloud.init(enforce=True)
|
|
return cloud
|
|
|
|
def __periodic_ota_check(self, args):
|
|
"""Periodically check whether cloud have an upgrade plan"""
|
|
self.__ota_transaction.ota_check()
|
|
|
|
def start(self):
|
|
"""Dtu init flow
|
|
"""
|
|
log.info("PROJECT_NAME: %s, PROJECT_VERSION: %s" % (PROJECT_NAME, PROJECT_VERSION))
|
|
log.info("DEVICE_FIRMWARE_NAME: %s, DEVICE_FIRMWARE_VERSION: %s" % (DEVICE_FIRMWARE_NAME, DEVICE_FIRMWARE_VERSION))
|
|
|
|
uart_setting = settings.current_settings["uart_config"]
|
|
|
|
# Serial initialization
|
|
serial = Serial(int(uart_setting.get("port")),
|
|
int(uart_setting.get("baudrate")),
|
|
int(uart_setting.get("databits")),
|
|
int(uart_setting.get("parity")),
|
|
int(uart_setting.get("stopbits")),
|
|
int(uart_setting.get("flowctl")),
|
|
uart_setting.get("rs485_direction_pin"))
|
|
|
|
# Cloud initialization
|
|
cloud = self.__cloud_init(settings.current_settings["system_config"]["cloud"])
|
|
# GuiToolsInteraction initialization
|
|
gui_tool_inter = GuiToolsInteraction()
|
|
# UplinkTransaction initialization
|
|
up_transaction = UplinkTransaction()
|
|
up_transaction.add_module(serial)
|
|
up_transaction.add_module(gui_tool_inter)
|
|
# DownlinkTransaction initialization
|
|
down_transaction = DownlinkTransaction()
|
|
down_transaction.add_module(serial)
|
|
# OtaTransaction initialization
|
|
ota_transaction = OtaTransaction()
|
|
|
|
# RemoteSubscribe initialization
|
|
remote_sub = RemoteSubscribe()
|
|
remote_sub.add_executor(down_transaction, 1)
|
|
remote_sub.add_executor(ota_transaction, 2)
|
|
cloud.addObserver(remote_sub)
|
|
|
|
# RemotePublish initialization
|
|
remote_pub = RemotePublish()
|
|
remote_pub.add_cloud(cloud)
|
|
up_transaction.add_module(remote_pub)
|
|
ota_transaction.add_module(remote_pub)
|
|
|
|
# History initialization
|
|
if settings.current_settings["system_config"]["base_function"]["offline_storage"]:
|
|
history = History()
|
|
remote_pub.addObserver(history)
|
|
up_transaction.add_module(history)
|
|
# Send history data to the cloud after being powered on
|
|
up_transaction.report_history()
|
|
|
|
# Send module release information to cloud. After receiving this information,
|
|
# the cloud server checks whether to upgrade modules
|
|
ota_transaction.ota_check()
|
|
# Periodically check whether cloud have an upgrade plan
|
|
self.__ota_transaction = ota_transaction
|
|
self.__ota_timer.start(1000 * 600, 1, self.__periodic_ota_check)
|
|
# Start uplink transaction
|
|
try:
|
|
_thread.start_new_thread(up_transaction.uplink_main, ())
|
|
except:
|
|
raise self.Error(self.error_map[self.ErrCode.ESYS])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
dtu = Dtu()
|
|
dtu.start()
|
|
|