Your queues must be created inside the loop. You created them outside the loop created for asyncio.run()
, so they use events.get_event_loop()
. asyncio.run()
creates a new loop, and futures created for the queue in one loop can't then be used in the other.
Create your queues in your top-level run()
coroutine, and either pass them to the coroutines that need them, or use contextvars.ContextVar
objects if you must use globals.
You also need to clean up how you handle task cancelling inside your tasks. A task is cancelled by raising a asyncio.CancelledError
exception in the task. You can ignore it, but if you catch it to do clean-up work, you must re-raise it.
Your task code catches all exceptions without re-raising, including CancelledError
, so you block proper cancellations.
Instead, what does happen during cancellation is that you call queue.task_done()
; don't do that, at least not when your task is being cancelled. You should only call task_done()
when you actually are handling a queue task, but your code calls task_done()
when an exception occurs while waiting for a queue task to appear.
If you need to use try...finally: in_queue.task_done()
, put this around the block of code that handles an item received from the queue, and keep the await in_queue.get()
outside of that try
block. You don't want to mark tasks done you didn't actually receive.
Finally, when you print exceptions, you want to print their repr()
; for historical reasons, the str()
conversion of exceptions produces their .args
value, which is not very helpful for CancelledError
exceptions, which have an empty .args
. Use {e!r}
in formatted strings, so you can see what exception you are catching:
worker-0 exception CancelledError()
So, corrected code, with the saver()
task enabled, the queues created inside of run()
, and task exception handling cleaned up, would be:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
async def run():
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
for request in range(1):
await in_queue.put(request)
# each task consumes from 'in_queue' and produces to 'out_queue':
tasks = []
for i in range(num_workers):
tasks.append(asyncio.create_task(
worker(in_queue, out_queue, name=f'worker-{i}')))
tasks.append(asyncio.create_task(saver(out_queue)))
await in_queue.join()
await out_queue.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(in_queue, out_queue, name):
print(f"{name} started")
try:
while True:
num = await in_queue.get()
try:
print(f'{name} got {num}')
await asyncio.sleep(0)
await out_queue.put(num)
except Exception as e:
print(f"{name} exception {e!r}")
raise
finally:
in_queue.task_done()
except asyncio.CancelledError:
print(f"{name} is being cancelled")
raise
finally:
print(f"{name} ended")
async def saver(out_queue):
print("saver started")
try:
while True:
num = await out_queue.get()
try:
print(f'saver got {num}')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception {e!r}")
raise
finally:
out_queue.task_done()
except asyncio.CancelledError:
print(f"saver is being cancelled")
raise
finally:
print(f"saver ended")
asyncio.run(run(), debug=True)
print('Done!')
This prints
worker-0 started
worker-0 got 0
saver started
saver got 0
saver ended
done
worker-0 is being cancelled
worker-0 ended
saver is being cancelled
saver ended
Done!
If you want to use globals, to share queue objects, then use ContextVar
objects. You still create the queues in run()
, but if you were to start multiple loops then the contextvars
module integration will take care of keeping the queues separate:
from contextvars import ContextVar
# ...
in_queue = ContextVar('in_queue')
out_queue = ContextVar('out_queue')
async def run():
in_, out = asyncio.Queue(), asyncio.Queue()
in_queue.set(in_)
out_queue.set(out)
for request in range(1):
await in_.put(request)
# ...
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
tasks.append(asyncio.create_task(saver()))
await in_.join()
await out.join()
# ...
async def worker(name):
print(f"{name} started")
in_ = in_queue.get()
out = out_queue.get()
try:
while True:
num = await in_.get()
try:
# ...
await out.put(num)
# ...
finally:
in_.task_done()
# ...
async def saver():
print("saver started")
out = out_queue.get()
try:
while True:
num = await out.get()
try:
# ...
finally:
out.task_done()
# ...