from usr import uasyncio as asyncio
async def barking(n):
print('Start barking')
for _ in range(6):
await asyncio.sleep(1)
print('Done barking.')
return 2 * n
async def foo(n):
print('Start timeout coro foo()')
while True:
await asyncio.sleep(1)
n += 1
return n
async def bar(n):
print('Start cancellable bar()')
while True:
await asyncio.sleep(1)
n += 1
return n
async def do_cancel(task):
await asyncio.sleep(5)
print('About to cancel bar')
task.cancel()
async def main():
tasks = [asyncio.create_task(bar(70))]
tasks.append(barking(21))
tasks.append(asyncio.wait_for(foo(10), 7))
asyncio.create_task(do_cancel(tasks[0]))
res = None
try:
# return_exceptions=True, 默认为False,即在任何协程引发异常时立即中止并引发异常。 如果设置为True,则将异常包装在结果中返回。
res = await asyncio.gather(*tasks, return_exceptions=True)
except asyncio.TimeoutError: # These only happen if return_exceptions is False
print('Timeout') # With the default times, cancellation occurs first
except asyncio.CancelledError:
print('Cancelled')
print('Result: ', res)
asyncio.run(main())