Update the demo

This commit is contained in:
rivern yuan 2021-09-02 19:36:25 +08:00
parent 46c4af2024
commit c54caebff1
14 changed files with 375 additions and 5 deletions

View File

@ -2,11 +2,11 @@
**API文档 - 里面有API文档** **API文档 - 里面有API文档**
**helios service文档 \- 框架文档** **helios service文档 - 框架文档**
**Code \- 代码** **Code - 代码**
@ -14,7 +14,7 @@
**API documentation - It has API documentation** **API documentation - It has API documentation**
**Helios Service document \- Framework document** **Helios Service document - Framework document**
**Code \- code** **Code - code**

4
demo/README.md Normal file
View File

@ -0,0 +1,4 @@
# HeliosService demo
各个服务对应的demo名称为-->服务名_demo.py

56
demo/cloud_demo.py Normal file
View File

@ -0,0 +1,56 @@
"""
使用云服务之前, 需要传入配置文件, 并启动云服务, 详见进阶篇中的操作使用云服务
"""
from usr.bin.guard import GuardContext
import utime
# 刷新容器
guard_context = GuardContext()
guard_context.refresh()
# 获取云服务
cloud_ser = guard_context.get_server("cloud")
"==============================================第一种方法==================================="
"""
这是非同步的, 所以需要sleep 2
"""
# # 加载优先级低的服务 并拉起
# guard_context.reload()
#
# # 等待下载, 因为是异步的这里可以等待2s
# utime.sleep(2)
# if cloud_ser.get_app_code() == "3001":
# # 重启设备
# pass
"=============================================第二种========================================"
"""
此为订阅此升级事件
"""
def func(*args, **kwargs):
"""
:param args: ('anonymous',) 发送人, 默认是'anonymous'无需关心
:param kwargs: 格式如下
{
'message':{
'message_id':2, # 消息ID
'sender':'anonymous', # 发送人
'message':'xxx', # 来的消息,
'from_event':'CLOUD', # 从哪个事件或服务来的
'msg_type':255 # 消息类型, 默认是255
}
}
:return:
"""
if kwargs['message']['message'] == '3001':
# 重启模块, 调用power接口
pass
# 订阅服务
cloud_ser.subscribe(func)
# reload容器
guard_context.reload()

32
demo/exception_demo.py Normal file
View File

@ -0,0 +1,32 @@
from usr.bin.guard import GuardContext
# 刷新容器
guard_context = GuardContext()
guard_context.refresh()
def func(*args, **kwargs):
"""
:param args: ('anonymous',) 发送人, 默认是'anonymous'无需关心
:param kwargs: 格式如下
{
'message':{
'message_id':2, # 消息ID
'sender':'anonymous', # 发送人
'message':'xxx', # 来的消息,
'from_event':'EXCEPTION', # 从那个事件或服务来的
'msg_type':255 # 消息类型, 默认是255
}
}
:return:
"""
print("func + {}".format(kwargs['message']['message']))
# 获取服务
exception_ser = guard_context.get_server("exception")
# 订阅服务
exception_ser.subscribe(func)
# publish消息
exception_ser.publish("this is a exception topic")

29
demo/extint_demo.py Normal file
View File

@ -0,0 +1,29 @@
from machine import Pin, ExtInt
from usr.bin.third_party.ql_interrupter import ExIntProcess, BusExIntInterrupter
import sys_bus
"======================第一种======================="
"""
可以自定义 topic
"""
extint = BusExIntInterrupter("TOPIC1", Pin.GPIO17, trige_mode=ExtInt.IRQ_RISING_FALLING, pull_mode=ExtInt.PULL_DISABLE)
def sub_cb(topic, msg):
print(topic, msg)
sys_bus.subscribe("TOPIC1", sub_cb)
"=======================第二种====================="
"""
我们固定死 topic, 按默认规则订阅即可
"""
# extint = ExIntProcess(Pin.GPIO17, trige_mode=ExtInt.IRQ_RISING_FALLING, pull_mode=ExtInt.PULL_DISABLE)
# def sub_cb(topic, msg):
# print(topic, msg)
# sys_bus.subscribe("GPIO17_EXINT", sub_cb)

11
demo/gpio_demo.py Normal file
View File

@ -0,0 +1,11 @@
from machine import Pin, ExtInt
from usr.bin.third_party.ql_interrupter import PinInterrupter
# 初始化gpio
pin_inter = PinInterrupter(Pin.GPIO6, trige_mode=Pin.OUT, pull_mode=Pin.PULL_DISABLE, mode=0)
# 闪烁一次在1秒的持续时间
pin_inter.blinker(1)
# 闪烁3次2秒的持续时间
pin_inter.freq_blinker(3, 5)

46
demo/log_demo.py Normal file
View File

@ -0,0 +1,46 @@
from usr.bin.guard import GuardContext
# 刷新容器
guard_context = GuardContext()
guard_context.refresh()
def func(*args, **kwargs):
"""
:param args: ('anonymous',) 发送人, 默认是'anonymous'无需关心
:param kwargs: 格式如下
{
'message':{
'message_id':2, # 消息ID
'sender':'anonymous', # 发送人
'message':'xxx', # 可以获取日志消息,
'from_event':'LOG', # 从那个事件或服务来的
'msg_type':255 # 消息类型, 默认是255
}
}
:return:
"""
print("func + {}".format(kwargs['message']['message']))
# 获取服务
log_ser = guard_context.get_server("log")
# 订阅服务
log_ser.set_level("DEBUG")
# 获取日志客户端
app_log = guard_context.get_logger("app")
# 打印日志
app_log.debug("this is a debug log")
app_log.info("this is a debug log")
app_log.warning("this is a debug log")
app_log.error("this is a debug log")
app_log.critical("this is a debug log")
# 可以订阅日志
log_ser.subscribe(func)
app_log.debug("this is a debug log")
app_log.info("this is a debug log")
app_log.warning("this is a debug log")
app_log.error("this is a debug log")
app_log.critical("this is a debug log")

42
demo/media_demo.py Normal file
View File

@ -0,0 +1,42 @@
from usr.bin.guard import GuardContext
# 刷新容器
guard_context = GuardContext()
guard_context.refresh()
def func(*args, **kwargs):
"""
:param args: ('anonymous',) 发送人, 默认是'anonymous'无需关心
:param kwargs: 格式如下
{
'message':{
'message_id':2, # 消息ID
'sender':'anonymous', # 发送人
'message': { # 来的消息
'play_data': 'tts', # 播放音频或者tts消息
'breakin': 0, # 是否允许被打断
'mode': 2, # 只有tts存在字段, audio不存在此字段, 模式
'priority': 4 # 优先级4>3>2>1>0
},
'from_event':'MEDIA', # 从那个事件或服务来的
'msg_type':1 # 消息类型, 1是来自tts的消息, 0是来自audio的消息
}
}
:return:
"""
print(kwargs['message'])
# 获取媒体服务
media_ser = guard_context.get_server("media")
# 订阅媒体服务
media_ser.subscribe(func)
# 设置模式
media_ser.set_mode(0)
# 设置是否自动播报, 默认是True, 如果为False, 采用订阅手动播报, 一旦设置, audio_play和tts_play将不再生效, 用户可以从订阅消息通拿到并自动播报
media_ser.enable = True
# publish消息
media_ser.audio_play(play_data="usr/a.mp3")
media_ser.tts_play(play_data="这是一条tts消息")

56
demo/net_demo.py Normal file
View File

@ -0,0 +1,56 @@
from usr.bin.guard import GuardContext
# 刷新容器
guard_context = GuardContext()
guard_context.refresh()
def func(*args, **kwargs):
print(args, kwargs)
"""
:param args: ('anonymous',) 发送人, 默认是'anonymous'无需关心
:param kwargs: 格式如下
{
'message':{
'message_id':2, # 消息ID
'sender':'anonymous', # 发送人
'message': { # 来的消息
"sim_status": 1, # sim卡状态 1为正常0为异常
"net_status": 1, # 网络状态 1为正常0为异常
"datacall_status": 1, # 注网状态 1为正常0为异常, 一般只需关注拨号状态即可
"profile_id": 1, # pid
"ip_type": 0, # 0是ipv4, 1是ipv6
"IPv4": "xxxx",
"IPv4_DNS1": "xxxx",
"IPv4_DNS2": "xxxx",
"IPv6": "xxxx",
"IPv6_DNS1": "xxxx",
"IPv6_DNS2": "xxxx",
},
'from_event':'NET', # 从那个事件或服务来的
'msg_type':1 # 1代表来自有data call信息
}
}
:return:
"""
print(kwargs['message']['message'])
# 获取媒体服务
net_ser = guard_context.get_server("net")
# 获取并打印媒体服务
_net = net_ser.net
print("_net = {}".format(_net))
# 获取并打印sim卡原生对象
_sim = net_ser.sim
print("_sim = {}".format(_sim))
# 获取并打印dataCall原生对象
_data_call = net_ser.data_call
print("_data_call = {}".format(_data_call))
# 等待网络连接
net_ser.wait_connect(5)
# 获取网络状态, 返回值, 1网络正常, 0网络异常
net_ser.get_net_status()
# 订阅网络服务
net_ser.subscribe(func)

20
demo/pm_demo.py Normal file
View File

@ -0,0 +1,20 @@
from usr.bin.guard import GuardContext
# 刷新容器
guard_context = GuardContext()
guard_context.refresh()
# 获取pm低功耗服务
pm_ser = guard_context.get_server("pm")
# 投票不睡眠
pm_ser.lock()
pm_ser.lock()
pm_ser.lock()
# 释放投票进入休眠
pm_ser.unlock()
pm_ser.unlock()
pm_ser.unlock()
# 获取票数
pm_ser.count()
# 设置是否自动休眠, 默认是自动休眠, 1是自动休眠0则不是
pm_ser.auto_sleep(1)

15
demo/queue_demo.py Normal file
View File

@ -0,0 +1,15 @@
import _thread
from queue import Queue
q = Queue(20)
def get():
while True:
msg = q.get()
print("msg = {}".format(msg))
_thread.start_new_thread(get, ())
q.put("this is a msg")

18
demo/sys_bus_demo.py Normal file
View File

@ -0,0 +1,18 @@
import sys_bus
def sub_cb(topic, msg):
"""
订阅函数
:param topic: 订阅的topic
:param msg: 来自topic的消息
:return:
"""
print("sub_cb === {}".format(topic, msg))
# 订阅topic
sys_bus.subscribe("test", sub_cb)
# 发布消息到topic中
sys_bus.publish("test", "this is one sys_bus")

14
demo/timer_demo.py Normal file
View File

@ -0,0 +1,14 @@
from usr.bin.third_party.ql_interrupter import TimerInterrupter
def func(*args, **kwargs):
print("func args = {} kwargs = {}".format(args, kwargs))
# 初始化 传入回调函数, 1代表, 一直执行
# timer = TimerInterrupter(3000, func, 1)
# 此为只执行1次
timer = TimerInterrupter(3000, func, 0)
# 启动timer
timer.start()

27
demo/watchdog_demo.py Normal file
View File

@ -0,0 +1,27 @@
import sys_bus
from machine import Pin, ExtInt
from usr.bin.third_party.ql_interrupter import WatchDog
def topic1_cb(topic, msg):
print("topic1_cb topic = {} msg = {}".format(topic, msg))
def topic2_cb(topic, msg):
print("topic2_cb topic = {} msg = {}".format(topic, msg))
# 订阅喂狗后硬件狗拉GPIO 告知模块喂狗成功
sys_bus.subscribe("WDT_KICK_TOPIC", topic1_cb)
# 订阅喂狗后的回调
sys_bus.subscribe("WDT_KICK_TOPIC_FEED", topic2_cb)
wd = WatchDog(Pin.GPIO17, 1, 10000)
wd.start()