Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
552 views
in Technique[技术] by (71.8m points)

python - How could I set hwm in the push/pull pattern of zmq?

I have found a similar question, ZeroMQ: HWM on PUSH does not work, but it couldn't solve my problem.

I want to control the number of messages that the push socket queues, but it doesn't work and still queues 1000 messages.
So I want to know how to set the hwm of the push socket. Thanks in advance.

My environment is: libzmq 4.0.4, pyzmq 14.1.0, python 3.3

Here's my code:

server.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random

import zmq


class TestPush(object):

    def __init__(self):
        self.ctx = zmq.Context()

        random.seed()

    def run(self):
        task_snd = self.ctx.socket(zmq.PUSH)
        task_snd.setsockopt(zmq.SNDHWM, 10)
        task_snd.bind('tcp://*:53000')        

        while True:
            workload = str(random.randint(1, 100))
            task_snd.send(workload.encode('utf-8'))
            print('Send {0}'.format(workload))


if __name__ == '__main__':
    test_push = TestPush()
    test_push.run()

client.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import random

import zmq


class TestPull(object):

    def __init__(self):
        self.ctx = zmq.Context()

    def run(self):
        task_rcv = self.ctx.socket(zmq.PULL)
        task_rcv.setsockopt(zmq.RCVHWM, 1)
        task_rcv.connect('tcp://localhost:53000')

        while True:
            msg = task_rcv.recv()
            print('Receive msg: {0}'.format(msg))

            time.sleep(random.randint(2, 3))


if __name__ == '__main__':
    test_pull = TestPull()
    test_pull.run()
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I faced similar problem with ZeroMQ when I tried to set HWM (High Water Mark) on push and pull socket. Even, it was not working with pub and sub socket. I would like to explain what happened and how it got resolved.

I made 2 scripts, first as a sender with push socket and another as a receiver with pull socket. Set HWM as 10 on both the sockets. And inside receiver script, I put a delay of 5 seconds after each message received. Then I ran the sender script with loop of 100 messages (after keeping the receiver as running to receive).

What I had expected:

The receiver queue and then the sender's queue will reach the hwm. After that, the sender will stop sending more messages.

But what happened:

The sender sends all the 100 messages and exited. But the receiver has kept processing message one-after for long time till it receives all the messages.

After research I found out the reason:

There is something called as Kernel Socket Buffer which seats between both the sender socket and the receiver sockets. Whenever, a process opens a socket, the kernel allots memory space to the tcp sockets, which is by default 128KB. The kernel socket buffer is applicable to the sender and receiver socket (so total buffer will be 128KB + 128KB). My message size was in bytes (an int with some characters). Thus, the total message buffering will be as follows:

Total buffer message = sender socket hwm + kernel socket buffer for the sender socket (128KB) + receiver socket hwm + kernel socket buffer for the sender socket (128KB)

Solution:

Now, I changed my message length to little more than a 1KB. Then perform the test again and found approx 260 messages sent (as expected), after this the sender stops in interval till the receiver receives some message and starts again.


Additional Info

In order for push socket to keep sending messages even when receiver is not able receive, we can use NOBLOCK option in send routine, but then the number of lost messages by receiver will increase very high. So, better option is to use Poll or timeout and then call send routine with NOBLOCK option.

Please note that you may use SNDBUFF/RCVBUFF of zeromq but OS may not obey it (as in my case it was not working).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...