To accomplish this, you need a function that will take two async sequences and merge them, producing the results from either one or the other, as they become available. With such a function in stock, run
could look like this:
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for f in merge(p.stdout, p.stderr):
print(datetime.now(), f.decode().strip())
A function like merge
does not (yet) exist in the standard library, but the aiostream
external library provides one. You can also write your own using an async generator and asyncio.wait()
:
async def merge(*iterables):
iter_next = {it.__aiter__(): None for it in iterables}
while iter_next:
for it, it_next in iter_next.items():
if it_next is None:
fut = asyncio.ensure_future(it.__anext__())
fut._orig_iter = it
iter_next[it] = fut
done, _ = await asyncio.wait(iter_next.values(),
return_when=asyncio.FIRST_COMPLETED)
for fut in done:
iter_next[fut._orig_iter] = None
try:
ret = fut.result()
except StopAsyncIteration:
del iter_next[fut._orig_iter]
continue
yield ret
The above run
will still differ from your desired output in one detail: it will not distinguish between output and error lines. But this can be easily accomplished by decorating the lines with an indicator:
async def decorate_with(it, prefix):
async for item in it:
yield prefix, item
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for is_out, line in merge(decorate_with(p.stdout, True),
decorate_with(p.stderr, False)):
if is_out:
print(datetime.now(), line.decode().strip())
else:
print(datetime.now(), "E:", line.decode().strip())
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…