import _thread import dataCall import net import sim import checkNet import osTimer import ujson import uos as os import utime from event_message import Event, EventManager from misc import Power __GROUP_ID = "qpy.quectel.com" __ARTIFACT_ID = "qpy-framework" __VERSION = "1.0.0.RELEASE" ANY = "anonymous" class Signal(object): def __init__(self, doc): self.receivers = { ANY: list() } self.doc = doc def connect(self, receiver, sender=ANY): if sender not in self.receivers: self.receivers[sender] = list() self.receivers[sender].append(receiver) def connect_via(self, sender=ANY): def decorator(fn): self.connect(fn, sender) return fn return decorator def receivers_for(self, sender): return self.receivers.get(sender, []) def send(self, *senders, **kwargs): if not len(senders): senders = list(ANY) for sender in senders: self.__publish(sender, **kwargs) def __publish(self, sender, **kwargs): for receiver in self.receivers_for(sender): try: receiver(sender, **kwargs) except Exception as e: print("send to {} kwargs error, reason {}".format(sender, kwargs)) def disconnect(self, receiver, sender=ANY): receivers = self.receivers_for(sender) receivers.remove(receiver) class NamedSignal(Signal): def __init__(self, name, doc=None): super().__init__(doc) self.name = name class Namespace(dict): def signal(self, name, doc=None): try: return self[name] except KeyError: return self.setdefault(name, NamedSignal(name, doc)) signal = Namespace().signal class ModelDTO(object): def model_to_dict(self, *args, **kwargs): pass def __call__(self, *args, **kwargs): return self.model_to_dict(*args, **kwargs) class QMessageModel(ModelDTO): def __init__(self, ms_id=None, msg_type=0xFF, message=None, sender=None, from_event=None, lock_msg=None): self.__msg_type = msg_type self.__message_id = ms_id self.__message = message self.__sender = "anonymous" if sender is None else sender self.__from_event = from_event self.__lock_msg = lock_msg @property def msg_type(self): return self.__msg_type @property def lock_msg(self): return self.__lock_msg @property def message_id(self): return self.__message_id @property def sender(self): return self.__sender @property def message(self): return self.__message @property def from_event(self): return self.__from_event def model_to_dict(self): return dict(message_id=self.message_id, msg_type=self.msg_type, message=self.message, sender=self.sender, from_event=self.from_event) class LockMsgModel(ModelDTO): """传输对象""" def __init__(self, lock): self.__lock = lock self.__msg = dict(message=None) @property def msg(self): return self.__msg @msg.setter def msg(self, msg): self.__msg = msg @property def lock(self): return self.__lock def model_to_dict(self, *args, **kwargs): return dict(lock=self.__lock, msg=self.__msg) class AbstractService(object): def __init__(self, sign): self.__name = sign self.__signal = signal(sign) self.__event = Event(sign) self.__em = EventManager() self.__message_id = 0 self.__service_status = 0 self.__em.register_event(self.__event) self.__mode = 1 def set_mode(self, mode): self.__mode = mode @property def mode(self): return self.__mode @property def name(self): return self.__name @property def message_id(self): return self.__message_id + 1 @property def signal(self): return self.__signal def ms_id_increase(self): if self.__message_id > 99999: self.__message_id = 0 else: self.__message_id += 1 return self.message_id def _get_message(self, sender=None, msg_type=0xFF, message=None, callback=None, lock_msg=None): if callback is not None and not callable(callback): return None ms_id = self.ms_id_increase() qm = QMessageModel(ms_id=ms_id, msg_type=msg_type, message=message, sender=sender, from_event=self.__event.name, lock_msg=lock_msg) return qm def _callback(self, **kwargs): em = kwargs.get("event_message", False) if em.msg: msg = em.msg try: self.signal.send(em.msg.sender, message=msg()) except Exception as e: success = 0 else: success = 1 resp_data = dict(message=msg(), success=success) if em.callback is not None: try: em.callback(**resp_data) except Exception as e: pass if msg.lock_msg is not None: msg.lock_msg.msg = resp_data msg.lock_msg.lock.release() def _clear(self, *args, **kwargs): self.__event.clear() def _add_default_handler(self): self.add_handler(self._callback) def register_event(self): pass def prepare_before_start(self): pass def prepare_before_stop(self): pass def start_crontab(self, *args, **kwargs): pass def add_handler(self, handler): self.__event.add_handler(handler) return handler def add_handler_via(self): def decorator(fn): self.add_handler(fn) return fn return decorator def send_msg_async(self, msg_type=0xFF, message=None, callback=None, sender=None): if self.status(): qm = self._get_message(sender=sender, msg_type=msg_type, message=message, callback=callback) self.__event.post(qm, callback=callback) else: return 0 def send_msg_sync(self, msg_type=0xFF, message=None, callback=None, sender=None): if self.status(): lock = _thread.allocate_lock() lock_msg = LockMsgModel(lock) qm = self._get_message(sender=sender, msg_type=msg_type, message=message, callback=callback, lock_msg=lock_msg) self.__event.post(qm, callback=callback) lock_msg.lock.acquire() msg = lock_msg.msg del lock_msg return msg else: return 0 def send_msg(self, msg_type=0xFF, message=None, callback=None, sender=None, mode=None): mode_pattern = self.mode if mode is not None: mode_pattern = mode if mode_pattern: return self.send_msg_async(msg_type=msg_type, message=message, callback=callback, sender=sender) else: return self.send_msg_sync(msg_type=msg_type, message=message, callback=callback, sender=sender) def _component_start(self): if not self.__service_status: self.register_event() self._add_default_handler() self.prepare_before_start() self.start_crontab() def start(self): self._component_start() self.__service_status = self.__em.start() return self.__service_status def status(self): return self.__em.status def stop(self): self.prepare_before_stop() self.__service_status = self.__em.stop() return self.__service_status def close(self): self._clear() def subscribe(self, cb, sender=ANY): self.signal.connect(cb, sender=sender) def unsubscribe(self, cb, sender=ANY): self.signal.disconnect(cb, sender=sender) def publish(self, msg, sender=ANY, msg_type=0XFF): self.send_msg(message=msg, msg_type=msg_type, sender=sender) class TimeResolver(object): def __init__(self): self.output_format = "{ascdate} {asctime} {ascweek}" self.weekday_list = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] def resolver(self, rt=None): if rt is None: rt = utime.localtime() d_f = "{0:02}" return self.output_format.format( ascdate=str(rt[0]) + "-" + d_f.format(rt[1]) + "-" + d_f.format(rt[2]) , asctime=d_f.format(rt[3]) + ":" + d_f.format(rt[4]) + ":" + d_f.format(rt[5]), ascweek=self.weekday_list[rt[6]]) class JsonParser(object): DEFAULT_FILE_NAME = "config.json" @classmethod def composite_url(cls, url): if not url.endswith("/"): url += "/" return url + cls.DEFAULT_FILE_NAME @classmethod def parse(cls, url): rep_d = dict( status=1, data=dict() ) try: url = cls.composite_url(url) with open(url, "r") as f: rep_d["data"] = ujson.load(f) except Exception as e: rep_d["status"] = 0 return rep_d else: return rep_d def Singleton(cls): _instance = {} def _singleton(*args, **kargs): if cls not in _instance: _instance[cls] = cls(*args, **kargs) return _instance[cls] return _singleton class ServiceMonitor(object): THRESHOLD = 3 def __init__(self, service): self.service = service self.__failed_count = 0 self.__level = 0 self.__timer = None self.__ping_count = 0 self.__exception_handlers = {} self.service.signal.connect(self.ping_handler, sender=0xFE) def set_level(self, level): if self.__level != level: self.__level = level def __timer_handle(self, *args): """定时器调度""" if self.__ping_count: self.__failed_count += 1 self.__ping_count = self.__ping_count + 1 self.service.signal.send(0xFE) if self.__failed_count: self.__failed_handle() def set_exception_handlers(self, handler): if handler is not None: self.__exception_handlers = handler def __failed_handle(self): """失败处理""" try: for k, v in self.__exception_handlers.items(): if k == "reboot": if v.get("failCount") == self.__failed_count: # 关机 Power.powerDown() elif k == "stop": if v.get("failCount") == self.__failed_count: # 停止服务 self.stop() else: continue except Exception as e: print(e) def ping_handler(self, *args, **kwargs): self.__ping_count = 0 def start(self): if self.__level != self.THRESHOLD: self.service.start() if self.__timer is None: self.__timer = osTimer() self.__timer.start(15000, 1, self.__timer_handle) return self.status() def status(self): return self.service.status() def clear(self): self.__failed_count = 0 self.__level = 0 if self.__timer is not None: self.__timer.stop() self.__timer.delete() self.__timer = None self.__ping_count = 0 def stop(self): if self.__level != self.THRESHOLD: self.service.stop() self.clear() return self.status def __call__(self, *args, **kwargs): return self.service LOGGER = "LOG" class LOG_LV: DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL" class AbstractLogOutputUtil(object): def open(self): pass def output(self, message, **kwargs): pass def close(self): pass def __call__(self, message, **kwargs): self.output(message, **kwargs) @Singleton class PrintLogOutputUtil(AbstractLogOutputUtil): def output(self, message, **kwargs): print(message.log_format) class AbstractFormatUtil(object): def __init__(self): self.time = "" self.tag = "" self.level = "" self.content = "" self.log_format = "" class LogFormatUtil(AbstractFormatUtil): @classmethod def format(cls, *args, **kwargs): self = LogFormatUtil() self.time = args[0] self.tag = args[1] self.level = args[2] self.content = args[3] self.log_format = "{} {} [{}] - {}\n".format(self.time, self.tag, self.level, self.content) return self class LogServiceMonitor(ServiceMonitor): @staticmethod def create_monitor(config=None): log_service = LogService() if config is not None: level = config.get('level') log_service.set_level(level) lsm = LogServiceMonitor(log_service) if config is not None: lsm.set_exception_handlers(config.get('exceptionHandlers', None)) return lsm class NetServiceMonitor(ServiceMonitor): @staticmethod def create_monitor(config=None): net_service = NetService() nsm = NetServiceMonitor(net_service) if config is not None: nsm.set_exception_handlers(config.get('exceptionHandlers', None)) return nsm NET = "NET" ENABLE_ = "enable" NET_ = "NET" DATACALL_ = "DATACALL" @Singleton class NetService(AbstractService): def __init__(self): super().__init__(NET) self.__data_call = dataCall self.__net = net self.__sim = sim self.__net_connect_status = False self.__data_call_status = False self.server_status = { DATACALL_: {ENABLE_: 1}, NET_: {ENABLE_: 0} } self.check_net = checkNet.CheckNetwork("QuecPython_Helios_Framework", "this latest version") @property def sim(self): return self.__sim @property def data_call(self): return self.__data_call @property def net(self): return self.__net def set_enable(self, sr, enable): """set start""" self.server_status[sr][ENABLE_] = enable def set_apn(self, profileIdx, ipType, apn, username, password, authType): return self.__data_call.setApn(profileIdx, ipType, apn, username, password, authType) def ev_dc(self, args): if self.server_status[DATACALL_][ENABLE_]: profile_idx = args[0] datacall_status = args[1] msg_dict = \ { "sim_status": None, "net_status": None, "datacall_status": datacall_status, "profile_id": profile_idx, "ip_type": None, "IPv4": None, "IPv4_DNS1": None, "IPv4_DNS2": None, "IPv6": None, "IPv6_DNS1": None, "IPv6_DNS2": None, } sim_status = self.__sim.getStatus() net_status = self.__net.getState() datacall_info = self.__data_call.getInfo(profile_idx, 2) msg_dict.update({"sim_status": sim_status}) if net_status != -1: if net_status[0][0] == 0 or net_status[1][0] == 0: msg_dict.update({"net_status": 0}) else: msg_dict.update({"net_status": net_status[1][0]}) else: msg_dict.update({"net_status": -1}) if datacall_info != -1: if datacall_info[2][0] == 1 or datacall_info[3][0] == 1: msg_dict.update({"datacall_status": 1}) else: msg_dict.update({"datacall_status": 0}) msg_dict.update({"ip_type": datacall_info[1]}) msg_dict.update({"IPv4": datacall_info[2][2]}) msg_dict.update({"IPv4_DNS1": datacall_info[2][3]}) msg_dict.update({"IPv4_DNS2": datacall_info[2][4]}) msg_dict.update({"IPv6": datacall_info[3][2]}) msg_dict.update({"IPv6_DNS1": datacall_info[3][3]}) msg_dict.update({"IPv6_DNS2": datacall_info[3][4]}) self.send_msg(msg_type=1, message=msg_dict) def ev_nc(self, args): if self.server_status[NET_][ENABLE_]: net_status = args[1] self.send_msg(msg_type=0, message={"network_register_status": net_status}) def wait_connect(self, timeout): self.check_net.poweron_print_once() return self.check_net.wait_network_connected(timeout) def get_net_status(self): self.__data_call_status = self.__data_call.getInfo(1, 0)[2][0] return self.__data_call_status def register_event(self): self.__data_call.setCallback(self.ev_dc) @Singleton class LogService(AbstractService): def __init__(self): super().__init__(LOGGER) self.__reporter = [PrintLogOutputUtil(), ] self.__tr = TimeResolver() self.format_util = LogFormatUtil() self.__level_map = { LOG_LV.DEBUG: 0, LOG_LV.INFO: 1, LOG_LV.WARNING: 2, LOG_LV.ERROR: 3, LOG_LV.CRITICAL: 4 } self.low_level = 0 def __set_report(self, report): if isinstance(report, AbstractLogOutputUtil): self.__reporter.append(report) def set_output(self, out_obj): if isinstance(out_obj, AbstractLogOutputUtil): self.__set_report(out_obj) else: self.log_send(self.name, LOG_LV.ERROR, '"{}" is not extend AbstractLogOutputUtil'.format(out_obj)) raise Exception('"{}" is not extend AbstractLogOutputUtil'.format(out_obj)) def set_level(self, level): if level in self.__level_map: self.low_level = self.__level_map[level] else: self.low_level = 0 def log_send(self, sign, level, msg, mode=1): """send log deal""" if self.mode is not None: mode = self.mode if self.__level_map[level] >= self.low_level: if mode: self.send_msg_async(message=self.format_msg(sign, level, msg)) else: self.send_msg_sync(message=self.format_msg(sign, level, msg)) def format_msg(self, sign, level, msg): return self.format_util.format(self.__tr.resolver(), sign, level, msg) def output_msg(self, *args, **kwargs): msg = "message" em = kwargs.get(msg, False) if em: for repoter in self.__reporter: repoter.output(em[msg]) def prepare_before_stop(self): for repoter in self.__reporter: repoter.close() def prepare_before_start(self): self.signal.connect(self.output_msg, sender="anonymous") for repoter in self.__reporter: repoter.open() class LogAdapter(object): def __init__(self, name, enable=1): self.log_service = LogService() self.name = name self.enable = enable self.mode = 1 self.tag = None def get_tag(self): if self.tag is None: return self.name else: return self.tag def critical(self, msg): if self.enable: self.log_service.log_send(self.name, LOG_LV.CRITICAL, msg, self.mode) def debug(self, msg): if self.enable: self.log_service.log_send(self.name, LOG_LV.DEBUG, msg, self.mode) def info(self, msg): if self.enable: self.log_service.log_send(self.name, LOG_LV.INFO, msg, self.mode) def warning(self, msg): if self.enable: self.log_service.log_send(self.name, LOG_LV.WARNING, msg, self.mode) def error(self, msg): if self.enable: self.log_service.log_send(self.name, LOG_LV.ERROR, msg, self.mode) def version(): return {"GROUP_ID": __GROUP_ID, "artifact_id": __ARTIFACT_ID, "VERSION": __VERSION} @Singleton class Guard(object): def __init__(self): self.monitor_service = set() self.timer = None self.__status = 0 def register_monitor(self, monitor): self.monitor_service.add(monitor) def start(self): for monitor in self.monitor_service: monitor.start() self.__status = 1 def reload_level(self): for monitor in self.monitor_service: monitor.set_level(0) monitor.start() self.__status = 1 def stop(self): for monitor in self.monitor_service: monitor.stop() self.__status = 0 def status(self): return self.__status def upgrade(self): pass def monitor_map(self): return {m.service.name: m.service for m in self.monitor_service} @Singleton class GuardContext(object): def __init__(self): self.__guard = Guard() self.system_config = {} self.service_config = {} self.config_map = { "/usr/etc/system_config": self.system_config, "/usr/etc/app_config": self.service_config } self.config_parser = JsonParser() self.monitor_map = dict() self.error_handler = print def servers(self): return self.monitor_map.copy() def get_server(self, name): return self.monitor_map[name]() def stop_server(self, name): return self.monitor_map[name].stop() @staticmethod def get_logger(name): return LogAdapter(name) @staticmethod def path_exist(check_path): try: os.stat(check_path) except Exception as e: return 0 else: return 1 def load_config_definitions(self): root_path = "/usr/etc/" stat = self.path_exist(root_path) if not stat: return self.load_configs(root_path) def load_configs(self, root_path): stat = self.path_exist(root_path) if not stat: self.error_handler("[ WARN ] {} path is not exist".format(root_path)) return True for par_path in os.listdir(root_path): abs_path = root_path + par_path for sub_path in os.listdir(abs_path): truth_path = abs_path + "/" + sub_path rep_d = self.config_parser.parse(truth_path) if rep_d["status"] == 0: self.error_handler("[ WARN ] read {} status {}".format(truth_path, 0)) else: self.config_map[abs_path][sub_path] = rep_d["data"] def create_monitors(self): # 创建monitors monitor_map = dict() monitor_map.update(self.create_system_monitors()) monitor_map.update(self.create_app_monitors()) self.monitor_map = monitor_map def start(self): self.__guard.start() def register_monitors(self): for k, v in self.monitor_map.items(): self.__guard.register_monitor(v) def refresh(self): self.load_config_definitions() self.create_monitors() self.register_monitors() self.start() def reload(self): self.__guard.reload_level() def create_system_monitors(self): monitor_map = dict() try: LOG = "log" log_monitor = LogServiceMonitor.create_monitor(self.system_config.get(LOG, None)) monitor_map[LOG] = log_monitor print("[ OK ] create sys monitor log service") except Exception as e: self.error_handler("[ FAILED ] load log monitor error reason:{}".format(e)) try: NET = "net" net_monitor = NetServiceMonitor.create_monitor(self.system_config.get(NET, None)) monitor_map[NET] = net_monitor print("[ OK ] create sys monitor net service") except Exception as e: # 异常重定向 self.error_handler("[ FAILED ] load net monitor error reason:{}".format(e)) return monitor_map def create_app_monitors(self): monitor_map = dict() return monitor_map