Consider using aioserial.
Here's an example:
import asyncio
import concurrent.futures
import queue
import aioserial
async def readline_and_put_to_queue(
aioserial_instance: aioserial.AioSerial,
q: queue.Queue):
while True:
q.put(await aioserial_instance.readline_async())
async def process_queue(q: queue.Queue):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
while True:
line: bytes = await asyncio.get_running_loop().run_in_executor(
executor, q.get)
print(line.decode(errors='ignore'), end='', flush=True)
q.task_done()
q: queue.Queue = queue.Queue()
aioserial_ttyUSB0: aioserial.AioSerial =
aioserial.AioSerial(port='/dev/ttyUSB0')
aioserial_ttyUSB1: aioserial.AioSerial =
aioserial.AioSerial(port='/dev/ttyUSB1', baudrate=115200)
asyncio.run(asyncio.wait([
readline_and_put_to_queue(aioserial_ttyUSB0, q),
readline_and_put_to_queue(aioserial_ttyUSB1, q),
process_queue(q),
]))
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…