Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
2.9k views
in Technique[技术] by (71.8m points)

websocket - Asyncio: Fastapi with aio-pika, consumer ignores Await

I am trying to hook my websocket endpoint with rabbitmq (aio-pika). Goal is to have listener in that endpoint and on any new message from queue pass the message to browser client over websockets.

I tested the consumer with asyncio in a script with asyncio loop. Works as I followed and used aio-pika documentation. (source: https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/2-work-queues.html, worker.py)

However, when I use it in fastapi in websockets endpoint, I cant make it work. Somehow the listener:

await queue.consume(on_message)

is completely ignored.

This is my attempt (I put it all in one function, so its more readable):

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    print("Entering websockets")
    await manager.connect(websocket)
    print("got connection")

    # params
    queue_name = "task_events"
    routing_key = "user_id.task"


    con = "amqp://rabbitmq:rabbitmq@rabbit:5672/"
    connection = await connect(con)
    channel = await connection.channel()


    await channel.set_qos(prefetch_count=1)

    exchange = await channel.declare_exchange(
        "topic_logs",
        ExchangeType.TOPIC,
    )

    # Declaring queue
    queue = await channel.declare_queue(queue_name)

    # Binding the queue to the exchange
    await queue.bind(exchange, routing_key)

    async def on_message(message: IncomingMessage):
        async with message.process():
            # here will be the message passed over websockets to browser client
            print("sent", message.body)

    

    
    try:
        
        ######### Not working as expected ###########
        # await does not await and websockets finishes, as there is no loop
        await queue.consume(on_message) 
        #############################################

        ################ This Alternative code atleast receives some messages #############
        # If I use this part, I atleast get some messages, when I trigger a backend task that publishes new messages to the queue. 
        # It seems like the messages are somehow stuck and new task releases all stucked messages, but does not release new one. 
        while True: 
            await queue.consume(on_message)
            await asyncio.sleep(1)
        ################## one part #############

    except WebSocketDisconnect:
        manager.disconnect(websocket)

I am quite new to async in python. I am not sure where is the problem and I cannot somehow implement async consuming loop while getting inspired with worker.py from aio-pika.

question from:https://stackoverflow.com/questions/65940177/asyncio-fastapi-with-aio-pika-consumer-ignores-await

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...