import _thread
import utime
from machine import UART
from queue import Queue
class Example_uart(object):
def __init__(self, no=UART.UART2, bate=115200, data_bits=8, parity=0, stop_bits=1, flow_control=0):
self.uart = UART(no, bate, data_bits, parity, stop_bits, flow_control)
self._queue = Queue(5)
_thread.start_new_thread(self.handler_thread, ())
self.uart.set_callback(self.callback)
def callback(self, para):
print("call para:{}".format(para))
if(0 == para[0]):
self._queue.put(para[2])
def uartWrite(self, msg):
print("write msg:{}".format(msg))
self.uart.write(msg)
def uartRead(self, len):
msg = self.uart.read(len)
utf8_msg = msg.decode()
print("UartRead msg: {}".format(utf8_msg))
return utf8_msg
def uartWrite_test(self):
for i in range(10):
write_msg = "Hello count={}".format(i)
self.uartWrite(write_msg)
utime.sleep(1)
def handler_thread(self):
while True:
recv_len = self._queue.get()
self.uartRead(recv_len)
if __name__ == "__main__":
uart_test = Example_uart()
uart_test.uartWrite_test()