Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
132 views
in Technique[技术] by (71.8m points)

[RabbitMQ][AMQP] Failing to get and read single message with amqp_basic_get and amqp_read_message

I want to setup a consumer with amqp to read from a specific queue. Some googling pointed out that this can be done with amqp_basic_get, and looking into the documentation, the actual message is retrieved with amqp_read_message. I also found this example which I tried to follow for implementing the basic_get. Nevertheless, I am failing to get and read a message from a specific queue.

My scenario is like this: I have two programs that communicate by publishing and consuming from the rabbitmq server. In each, a connection is declared, with two channels, one meant for consuming, and one for publishing. The flow of information is like this: program A gets the current time and publishes to rabbitmq. Upon receiving this message, program B gets its own time, packages its time and the received time in a message that it publishes to rabbitmq. Program A should consume this message. However, I cannot succeed in reading from the namedQueue.

Program A (in c++, and uses the amqp.c) is implemented as follows:

                        ... after creating the connection
        //Create channels
        amqp_channel_open_ok_t *res = amqp_channel_open(conn, channelIDPub);
        assert(res != NULL);
        amqp_channel_open_ok_t *res2 = amqp_channel_open(conn, channelIDSub);
        assert(res2 != NULL);
    
        //Declare exchange
        exchange = "exchangeName";
        exchangetype = "direct";
        amqp_exchange_declare(conn, channelIDPub, amqp_cstring_bytes(exchange.c_str()),
                              amqp_cstring_bytes(exchangetype.c_str()), 0, 0, 0, 0,
                              amqp_empty_table);
                                          ...
        throw_on_amqp_error(amqp_get_rpc_reply(conn), printText.c_str());
    
        //Bind the exchange to the queue
        const char* qname = "namedQueue";
        amqp_bytes_t queue = amqp_bytes_malloc_dup(amqp_cstring_bytes(qname));
        amqp_queue_declare_ok_t *r = amqp_queue_declare(
                conn, channelIDSub, queue, 0, 0, 0, 0, amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
        
        if (queue.bytes == NULL) {
            fprintf(stderr, "Out of memory while copying queue name");
            return;
        }
    
        amqp_queue_bind(conn, channelIDSub, queue, amqp_cstring_bytes(exchange.c_str()),
                        amqp_cstring_bytes(queueBindingKey.c_str()), amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
    
        amqp_basic_consume(conn, channelIDSub, queue, amqp_empty_bytes, 0, 0, 1,
                           amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
    
                                           // ...
        // In order to get a message from rabbitmq
        amqp_rpc_reply_t res, res2;
        amqp_message_t message;
        amqp_boolean_t no_ack = false;
    
        amqp_maybe_release_buffers(conn);
        printf("were here, with queue name %s, on channel %d
", queueName, channelIDSub);
    
        amqp_time_t deadline;
        struct timeval timeout = { 1 , 0 };//same timeout used in consume(json)
        int time_rc = amqp_time_from_now(&deadline, &timeout);
        assert(time_rc == AMQP_STATUS_OK);
    
        do {
            res = amqp_basic_get(conn, channelIDSub, amqp_cstring_bytes("namedQueue"), no_ack);
        } while (res.reply_type == AMQP_RESPONSE_NORMAL &&
                res.reply.id == AMQP_BASIC_GET_EMPTY_METHOD 
                && amqp_time_has_past(deadline) == AMQP_STATUS_OK);
    
        if (AMQP_RESPONSE_NORMAL != res.reply_type || AMQP_BASIC_GET_OK_METHOD != res.reply.id)
        {
            printf("amqp_basic_get error codes amqp_response_normal %d, amqp_basic_get_ok_method %d
", res.reply_type, res.reply.id);
            return false;
        }
    
        res2 = amqp_read_message(conn,channelID,&message,0);
        printf("error %s
", amqp_error_string2(res2.library_error));
        printf("5:reply type %d
", res2.reply_type);
    
        if (AMQP_RESPONSE_NORMAL != res2.reply_type) {
            printf("6:reply type %d
", res2.reply_type);
            return false;
        }
    
        payload = std::string(reinterpret_cast< char const * >(message.body.bytes), message.body.len);
    
        printf("then were here
 %s", payload.c_str());
        amqp_destroy_message(&message);

Program B (in python) is as follows

        #!/usr/bin/env python3
        import pika
        import json
        from datetime import datetime, timezone
        import time
        import threading
    
        cosimTime = 0.0
        newData = False
        lock = threading.Lock()
        thread_stop = False
    
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        connectionPublish = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channelConsume = connection.channel()
        channelPublish = connectionPublish.channel()
    
        print("Declaring exchange")
        channelConsume.exchange_declare(exchange='exchangeName', exchange_type='direct')
        channelPublish.exchange_declare(exchange='exchangeName', exchange_type='direct')
    
        print("Creating queue")
        result = channelConsume.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        result2 = channelPublish.queue_declare(queue='namedQueue', exclusive=False, auto_delete=False)
    
        channelConsume.queue_bind(exchange='exchangeName', queue=queue_name,
                       routing_key='fromB')
    
        channelPublish.queue_bind(exchange='exchangeName', queue="namedQueue",
                       routing_key='toB')
    
        print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callbackConsume(ch, method, properties, body):
        global newData, cosimTime
        print("
Received [x] %r" % body)
        #cosimTime = datetime.datetime.strptime(body, "%Y-%m-%dT%H:%M:%S.%f%z")
        with lock:
            newData = True
            cosimTime = body.decode()
            cosimTime = json.loads(cosimTime)
            #print(cosimTime)
    
    def publishRtime():
        global newData
        while not thread_stop:
            if newData:
            #if True:
                with lock:
                    newData = False
                    msg = {}
                    
                    msg['rtime'] = datetime.now(timezone.utc).astimezone().isoformat(timespec='milliseconds')
    
                    msg['cosimtime'] = cosimTime["simAtTime"]
                    print("
Sending [y] %s" % str(msg))
                    channelPublish.basic_publish(exchange='exchangeName',
                                        routing_key='toB',
                                        body=json.dumps(msg))
                    #time.sleep(1)
    
    channelConsume.basic_consume(
        queue=queue_name, on_message_callback=callbackConsume, auto_ack=True)
    
    try:
        thread = threading.Thread(target = publishRtime)
        thread.start()
        channelConsume.start_consuming()
    except KeyboardInterrupt:
        print("Exiting...")
        channelConsume.stop_consuming()
        thread_stop = True
        connection.close()

What program A outputs is:

amqp_basic_get error codes amqp_response_normal 1, amqp_basic_get_ok_method 3932232

which is the code for AMQP_BASIC_GET_EMPTY_METHOD.

Program B gets the data, and publishes continuously.

If I slightly modify B to just publish all the time a specific string, then it seems that the amqp_basic_get returns successfully, however then it fails at amqp_read_message with the code AMQP_RESPONSE_LIBRARY_EXCEPTION.

Any idea how to get this to work, what I am missing the setup?

question from:https://stackoverflow.com/questions/65888912/rabbitmqamqp-failing-to-get-and-read-single-message-with-amqp-basic-get-and

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The issue was in the queue_declare where the auto_delete parameter was not matching on both sides.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...