2022-01-13 11:41:19 +08:00

879 lines
24 KiB
Python

import _thread
import dataCall
import net
import sim
import checkNet
import osTimer
import ujson
import uos as os
import utime
from event_message import Event, EventManager
from misc import Power
__GROUP_ID = "qpy.quectel.com"
__ARTIFACT_ID = "qpy-framework"
__VERSION = "1.0.0.RELEASE"
ANY = "anonymous"
class Signal(object):
def __init__(self, doc):
self.receivers = {
ANY: list()
}
self.doc = doc
def connect(self, receiver, sender=ANY):
if sender not in self.receivers:
self.receivers[sender] = list()
self.receivers[sender].append(receiver)
def connect_via(self, sender=ANY):
def decorator(fn):
self.connect(fn, sender)
return fn
return decorator
def receivers_for(self, sender):
return self.receivers.get(sender, [])
def send(self, *senders, **kwargs):
if not len(senders):
senders = list(ANY)
for sender in senders:
self.__publish(sender, **kwargs)
def __publish(self, sender, **kwargs):
for receiver in self.receivers_for(sender):
try:
receiver(sender, **kwargs)
except Exception as e:
print("send to {} kwargs error, reason {}".format(sender, kwargs))
def disconnect(self, receiver, sender=ANY):
receivers = self.receivers_for(sender)
receivers.remove(receiver)
class NamedSignal(Signal):
def __init__(self, name, doc=None):
super().__init__(doc)
self.name = name
class Namespace(dict):
def signal(self, name, doc=None):
try:
return self[name]
except KeyError:
return self.setdefault(name, NamedSignal(name, doc))
signal = Namespace().signal
class ModelDTO(object):
def model_to_dict(self, *args, **kwargs):
pass
def __call__(self, *args, **kwargs):
return self.model_to_dict(*args, **kwargs)
class QMessageModel(ModelDTO):
def __init__(self, ms_id=None, msg_type=0xFF, message=None, sender=None, from_event=None, lock_msg=None):
self.__msg_type = msg_type
self.__message_id = ms_id
self.__message = message
self.__sender = "anonymous" if sender is None else sender
self.__from_event = from_event
self.__lock_msg = lock_msg
@property
def msg_type(self):
return self.__msg_type
@property
def lock_msg(self):
return self.__lock_msg
@property
def message_id(self):
return self.__message_id
@property
def sender(self):
return self.__sender
@property
def message(self):
return self.__message
@property
def from_event(self):
return self.__from_event
def model_to_dict(self):
return dict(message_id=self.message_id, msg_type=self.msg_type, message=self.message, sender=self.sender,
from_event=self.from_event)
class LockMsgModel(ModelDTO):
"""传输对象"""
def __init__(self, lock):
self.__lock = lock
self.__msg = dict(message=None)
@property
def msg(self):
return self.__msg
@msg.setter
def msg(self, msg):
self.__msg = msg
@property
def lock(self):
return self.__lock
def model_to_dict(self, *args, **kwargs):
return dict(lock=self.__lock, msg=self.__msg)
class AbstractService(object):
def __init__(self, sign):
self.__name = sign
self.__signal = signal(sign)
self.__event = Event(sign)
self.__em = EventManager()
self.__message_id = 0
self.__service_status = 0
self.__em.register_event(self.__event)
self.__mode = 1
def set_mode(self, mode):
self.__mode = mode
@property
def mode(self):
return self.__mode
@property
def name(self):
return self.__name
@property
def message_id(self):
return self.__message_id + 1
@property
def signal(self):
return self.__signal
def ms_id_increase(self):
if self.__message_id > 99999:
self.__message_id = 0
else:
self.__message_id += 1
return self.message_id
def _get_message(self, sender=None, msg_type=0xFF, message=None, callback=None, lock_msg=None):
if callback is not None and not callable(callback):
return None
ms_id = self.ms_id_increase()
qm = QMessageModel(ms_id=ms_id, msg_type=msg_type, message=message, sender=sender, from_event=self.__event.name,
lock_msg=lock_msg)
return qm
def _callback(self, **kwargs):
em = kwargs.get("event_message", False)
if em.msg:
msg = em.msg
try:
self.signal.send(em.msg.sender, message=msg())
except Exception as e:
success = 0
else:
success = 1
resp_data = dict(message=msg(), success=success)
if em.callback is not None:
try:
em.callback(**resp_data)
except Exception as e:
pass
if msg.lock_msg is not None:
msg.lock_msg.msg = resp_data
msg.lock_msg.lock.release()
def _clear(self, *args, **kwargs):
self.__event.clear()
def _add_default_handler(self):
self.add_handler(self._callback)
def register_event(self):
pass
def prepare_before_start(self):
pass
def prepare_before_stop(self):
pass
def start_crontab(self, *args, **kwargs):
pass
def add_handler(self, handler):
self.__event.add_handler(handler)
return handler
def add_handler_via(self):
def decorator(fn):
self.add_handler(fn)
return fn
return decorator
def send_msg_async(self, msg_type=0xFF, message=None, callback=None, sender=None):
if self.status():
qm = self._get_message(sender=sender, msg_type=msg_type, message=message, callback=callback)
self.__event.post(qm, callback=callback)
else:
return 0
def send_msg_sync(self, msg_type=0xFF, message=None, callback=None, sender=None):
if self.status():
lock = _thread.allocate_lock()
lock_msg = LockMsgModel(lock)
qm = self._get_message(sender=sender, msg_type=msg_type, message=message, callback=callback,
lock_msg=lock_msg)
self.__event.post(qm, callback=callback)
lock_msg.lock.acquire()
msg = lock_msg.msg
del lock_msg
return msg
else:
return 0
def send_msg(self, msg_type=0xFF, message=None, callback=None, sender=None, mode=None):
mode_pattern = self.mode
if mode is not None:
mode_pattern = mode
if mode_pattern:
return self.send_msg_async(msg_type=msg_type, message=message, callback=callback, sender=sender)
else:
return self.send_msg_sync(msg_type=msg_type, message=message, callback=callback, sender=sender)
def _component_start(self):
if not self.__service_status:
self.register_event()
self._add_default_handler()
self.prepare_before_start()
self.start_crontab()
def start(self):
self._component_start()
self.__service_status = self.__em.start()
return self.__service_status
def status(self):
return self.__em.status
def stop(self):
self.prepare_before_stop()
self.__service_status = self.__em.stop()
return self.__service_status
def close(self):
self._clear()
def subscribe(self, cb, sender=ANY):
self.signal.connect(cb, sender=sender)
def unsubscribe(self, cb, sender=ANY):
self.signal.disconnect(cb, sender=sender)
def publish(self, msg, sender=ANY, msg_type=0XFF):
self.send_msg(message=msg, msg_type=msg_type, sender=sender)
class TimeResolver(object):
def __init__(self):
self.output_format = "{ascdate} {asctime} {ascweek}"
self.weekday_list = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
def resolver(self, rt=None):
if rt is None:
rt = utime.localtime()
d_f = "{0:02}"
return self.output_format.format(
ascdate=str(rt[0]) + "-" + d_f.format(rt[1]) + "-" + d_f.format(rt[2])
, asctime=d_f.format(rt[3]) + ":" + d_f.format(rt[4]) + ":" + d_f.format(rt[5]),
ascweek=self.weekday_list[rt[6]])
class JsonParser(object):
DEFAULT_FILE_NAME = "config.json"
@classmethod
def composite_url(cls, url):
if not url.endswith("/"):
url += "/"
return url + cls.DEFAULT_FILE_NAME
@classmethod
def parse(cls, url):
rep_d = dict(
status=1,
data=dict()
)
try:
url = cls.composite_url(url)
with open(url, "r") as f:
rep_d["data"] = ujson.load(f)
except Exception as e:
rep_d["status"] = 0
return rep_d
else:
return rep_d
def Singleton(cls):
_instance = {}
def _singleton(*args, **kargs):
if cls not in _instance:
_instance[cls] = cls(*args, **kargs)
return _instance[cls]
return _singleton
class ServiceMonitor(object):
THRESHOLD = 3
def __init__(self, service):
self.service = service
self.__failed_count = 0
self.__level = 0
self.__timer = None
self.__ping_count = 0
self.__exception_handlers = {}
self.service.signal.connect(self.ping_handler, sender=0xFE)
def set_level(self, level):
if self.__level != level:
self.__level = level
def __timer_handle(self, *args):
"""定时器调度"""
if self.__ping_count:
self.__failed_count += 1
self.__ping_count = self.__ping_count + 1
self.service.signal.send(0xFE)
if self.__failed_count:
self.__failed_handle()
def set_exception_handlers(self, handler):
if handler is not None:
self.__exception_handlers = handler
def __failed_handle(self):
"""失败处理"""
try:
for k, v in self.__exception_handlers.items():
if k == "reboot":
if v.get("failCount") == self.__failed_count:
# 关机
Power.powerDown()
elif k == "stop":
if v.get("failCount") == self.__failed_count:
# 停止服务
self.stop()
else:
continue
except Exception as e:
print(e)
def ping_handler(self, *args, **kwargs):
self.__ping_count = 0
def start(self):
if self.__level != self.THRESHOLD:
self.service.start()
if self.__timer is None:
self.__timer = osTimer()
self.__timer.start(15000, 1, self.__timer_handle)
return self.status()
def status(self):
return self.service.status()
def clear(self):
self.__failed_count = 0
self.__level = 0
if self.__timer is not None:
self.__timer.stop()
self.__timer.delete()
self.__timer = None
self.__ping_count = 0
def stop(self):
if self.__level != self.THRESHOLD:
self.service.stop()
self.clear()
return self.status
def __call__(self, *args, **kwargs):
return self.service
LOGGER = "LOG"
class LOG_LV:
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class AbstractLogOutputUtil(object):
def open(self):
pass
def output(self, message, **kwargs):
pass
def close(self):
pass
def __call__(self, message, **kwargs):
self.output(message, **kwargs)
@Singleton
class PrintLogOutputUtil(AbstractLogOutputUtil):
def output(self, message, **kwargs):
print(message.log_format)
class AbstractFormatUtil(object):
def __init__(self):
self.time = ""
self.tag = ""
self.level = ""
self.content = ""
self.log_format = ""
class LogFormatUtil(AbstractFormatUtil):
@classmethod
def format(cls, *args, **kwargs):
self = LogFormatUtil()
self.time = args[0]
self.tag = args[1]
self.level = args[2]
self.content = args[3]
self.log_format = "{} {} [{}] - {}\n".format(self.time, self.tag, self.level, self.content)
return self
class LogServiceMonitor(ServiceMonitor):
@staticmethod
def create_monitor(config=None):
log_service = LogService()
if config is not None:
level = config.get('level')
log_service.set_level(level)
lsm = LogServiceMonitor(log_service)
if config is not None:
lsm.set_exception_handlers(config.get('exceptionHandlers', None))
return lsm
class NetServiceMonitor(ServiceMonitor):
@staticmethod
def create_monitor(config=None):
net_service = NetService()
nsm = NetServiceMonitor(net_service)
if config is not None:
nsm.set_exception_handlers(config.get('exceptionHandlers', None))
return nsm
NET = "NET"
ENABLE_ = "enable"
NET_ = "NET"
DATACALL_ = "DATACALL"
@Singleton
class NetService(AbstractService):
def __init__(self):
super().__init__(NET)
self.__data_call = dataCall
self.__net = net
self.__sim = sim
self.__net_connect_status = False
self.__data_call_status = False
self.server_status = {
DATACALL_: {ENABLE_: 1},
NET_: {ENABLE_: 0}
}
self.check_net = checkNet.CheckNetwork("QuecPython_Helios_Framework", "this latest version")
@property
def sim(self):
return self.__sim
@property
def data_call(self):
return self.__data_call
@property
def net(self):
return self.__net
def set_enable(self, sr, enable):
"""set start"""
self.server_status[sr][ENABLE_] = enable
def set_apn(self, profileIdx, ipType, apn, username, password, authType):
return self.__data_call.setApn(profileIdx, ipType, apn, username, password, authType)
def ev_dc(self, args):
if self.server_status[DATACALL_][ENABLE_]:
profile_idx = args[0]
datacall_status = args[1]
msg_dict = \
{
"sim_status": None,
"net_status": None,
"datacall_status": datacall_status,
"profile_id": profile_idx,
"ip_type": None,
"IPv4": None,
"IPv4_DNS1": None,
"IPv4_DNS2": None,
"IPv6": None,
"IPv6_DNS1": None,
"IPv6_DNS2": None,
}
sim_status = self.__sim.getStatus()
net_status = self.__net.getState()
datacall_info = self.__data_call.getInfo(profile_idx, 2)
msg_dict.update({"sim_status": sim_status})
if net_status != -1:
if net_status[0][0] == 0 or net_status[1][0] == 0:
msg_dict.update({"net_status": 0})
else:
msg_dict.update({"net_status": net_status[1][0]})
else:
msg_dict.update({"net_status": -1})
if datacall_info != -1:
if datacall_info[2][0] == 1 or datacall_info[3][0] == 1:
msg_dict.update({"datacall_status": 1})
else:
msg_dict.update({"datacall_status": 0})
msg_dict.update({"ip_type": datacall_info[1]})
msg_dict.update({"IPv4": datacall_info[2][2]})
msg_dict.update({"IPv4_DNS1": datacall_info[2][3]})
msg_dict.update({"IPv4_DNS2": datacall_info[2][4]})
msg_dict.update({"IPv6": datacall_info[3][2]})
msg_dict.update({"IPv6_DNS1": datacall_info[3][3]})
msg_dict.update({"IPv6_DNS2": datacall_info[3][4]})
self.send_msg(msg_type=1, message=msg_dict)
def ev_nc(self, args):
if self.server_status[NET_][ENABLE_]:
net_status = args[1]
self.send_msg(msg_type=0, message={"network_register_status": net_status})
def wait_connect(self, timeout):
self.check_net.poweron_print_once()
return self.check_net.wait_network_connected(timeout)
def get_net_status(self):
self.__data_call_status = self.__data_call.getInfo(1, 0)[2][0]
return self.__data_call_status
def register_event(self):
self.__data_call.setCallback(self.ev_dc)
@Singleton
class LogService(AbstractService):
def __init__(self):
super().__init__(LOGGER)
self.__reporter = [PrintLogOutputUtil(), ]
self.__tr = TimeResolver()
self.format_util = LogFormatUtil()
self.__level_map = {
LOG_LV.DEBUG: 0,
LOG_LV.INFO: 1,
LOG_LV.WARNING: 2,
LOG_LV.ERROR: 3,
LOG_LV.CRITICAL: 4
}
self.low_level = 0
def __set_report(self, report):
if isinstance(report, AbstractLogOutputUtil):
self.__reporter.append(report)
def set_output(self, out_obj):
if isinstance(out_obj, AbstractLogOutputUtil):
self.__set_report(out_obj)
else:
self.log_send(self.name, LOG_LV.ERROR, '"{}" is not extend AbstractLogOutputUtil'.format(out_obj))
raise Exception('"{}" is not extend AbstractLogOutputUtil'.format(out_obj))
def set_level(self, level):
if level in self.__level_map:
self.low_level = self.__level_map[level]
else:
self.low_level = 0
def log_send(self, sign, level, msg, mode=1):
"""send log deal"""
if self.mode is not None:
mode = self.mode
if self.__level_map[level] >= self.low_level:
if mode:
self.send_msg_async(message=self.format_msg(sign, level, msg))
else:
self.send_msg_sync(message=self.format_msg(sign, level, msg))
def format_msg(self, sign, level, msg):
return self.format_util.format(self.__tr.resolver(), sign, level, msg)
def output_msg(self, *args, **kwargs):
msg = "message"
em = kwargs.get(msg, False)
if em:
for repoter in self.__reporter:
repoter.output(em[msg])
def prepare_before_stop(self):
for repoter in self.__reporter:
repoter.close()
def prepare_before_start(self):
self.signal.connect(self.output_msg, sender="anonymous")
for repoter in self.__reporter:
repoter.open()
class LogAdapter(object):
def __init__(self, name, enable=1):
self.log_service = LogService()
self.name = name
self.enable = enable
self.mode = 1
self.tag = None
def get_tag(self):
if self.tag is None:
return self.name
else:
return self.tag
def critical(self, msg):
if self.enable:
self.log_service.log_send(self.name, LOG_LV.CRITICAL, msg, self.mode)
def debug(self, msg):
if self.enable:
self.log_service.log_send(self.name, LOG_LV.DEBUG, msg, self.mode)
def info(self, msg):
if self.enable:
self.log_service.log_send(self.name, LOG_LV.INFO, msg, self.mode)
def warning(self, msg):
if self.enable:
self.log_service.log_send(self.name, LOG_LV.WARNING, msg, self.mode)
def error(self, msg):
if self.enable:
self.log_service.log_send(self.name, LOG_LV.ERROR, msg, self.mode)
def version():
return {"GROUP_ID": __GROUP_ID, "artifact_id": __ARTIFACT_ID, "VERSION": __VERSION}
@Singleton
class Guard(object):
def __init__(self):
self.monitor_service = set()
self.timer = None
self.__status = 0
def register_monitor(self, monitor):
self.monitor_service.add(monitor)
def start(self):
for monitor in self.monitor_service:
monitor.start()
self.__status = 1
def reload_level(self):
for monitor in self.monitor_service:
monitor.set_level(0)
monitor.start()
self.__status = 1
def stop(self):
for monitor in self.monitor_service:
monitor.stop()
self.__status = 0
def status(self):
return self.__status
def upgrade(self):
pass
def monitor_map(self):
return {m.service.name: m.service for m in self.monitor_service}
@Singleton
class GuardContext(object):
def __init__(self):
self.__guard = Guard()
self.system_config = {}
self.service_config = {}
self.config_map = {
"/usr/etc/system_config": self.system_config,
"/usr/etc/app_config": self.service_config
}
self.config_parser = JsonParser()
self.monitor_map = dict()
self.error_handler = print
def servers(self):
return self.monitor_map.copy()
def get_server(self, name):
return self.monitor_map[name]()
def stop_server(self, name):
return self.monitor_map[name].stop()
@staticmethod
def get_logger(name):
return LogAdapter(name)
@staticmethod
def path_exist(check_path):
try:
os.stat(check_path)
except Exception as e:
return 0
else:
return 1
def load_config_definitions(self):
root_path = "/usr/etc/"
stat = self.path_exist(root_path)
if not stat:
return
self.load_configs(root_path)
def load_configs(self, root_path):
stat = self.path_exist(root_path)
if not stat:
self.error_handler("[ WARN ] {} path is not exist".format(root_path))
return True
for par_path in os.listdir(root_path):
abs_path = root_path + par_path
for sub_path in os.listdir(abs_path):
truth_path = abs_path + "/" + sub_path
rep_d = self.config_parser.parse(truth_path)
if rep_d["status"] == 0:
self.error_handler("[ WARN ] read {} status {}".format(truth_path, 0))
else:
self.config_map[abs_path][sub_path] = rep_d["data"]
def create_monitors(self):
# 创建monitors
monitor_map = dict()
monitor_map.update(self.create_system_monitors())
monitor_map.update(self.create_app_monitors())
self.monitor_map = monitor_map
def start(self):
self.__guard.start()
def register_monitors(self):
for k, v in self.monitor_map.items():
self.__guard.register_monitor(v)
def refresh(self):
self.load_config_definitions()
self.create_monitors()
self.register_monitors()
self.start()
def reload(self):
self.__guard.reload_level()
def create_system_monitors(self):
monitor_map = dict()
try:
LOG = "log"
log_monitor = LogServiceMonitor.create_monitor(self.system_config.get(LOG, None))
monitor_map[LOG] = log_monitor
print("[ OK ] create sys monitor log service")
except Exception as e:
self.error_handler("[ FAILED ] load log monitor error reason:{}".format(e))
try:
NET = "net"
net_monitor = NetServiceMonitor.create_monitor(self.system_config.get(NET, None))
monitor_map[NET] = net_monitor
print("[ OK ] create sys monitor net service")
except Exception as e:
# 异常重定向
self.error_handler("[ FAILED ] load net monitor error reason:{}".format(e))
return monitor_map
def create_app_monitors(self):
monitor_map = dict()
return monitor_map