Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.4k views
in Technique[技术] by (71.8m points)

sockets - How to have limited ZMQ (ZeroMQ - PyZMQ) queue buffer size in python?

I'm using pyzmq library with pub/sub pattern. I have some quick ZMQ publisher by .connect() method and a slower ZMQ subscriber by .bind() method. Then after a few minutes, my subscriber gets old published data from the publisher (due to ZMQ buffer).


My Question:

Is there any approach to manage ZMQ queue buffer size? (set a limited buffer)

[NOTE]:

  • I don't want to use ZMQ PUSH/PULL.
  • I've read this post, but this approach clear buffer only: clear ZMQ buffer
  • I tried with high watermark options too, but it didn't work:
socket.setsockopt(zmq.RCVHWM, 10)  # not working
socket.setsockopt(zmq.SNDHWM, 10)  # not working

Publisher:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10)  # not working

while True:
    data = time.time()
    print("%d" % data)
    socket.send("%d" % data)
    time.sleep(1)

Subscriber:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10)  # not working

while 1:
    time.sleep(2)  # A speed reducer like.
    data = socket.recv()
    print(data)

Even with these options, the queue size is more than 10 yet (with configured send/receive high watermark).

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I found a way to get "Last message only" option in ZMQ Subscribe socket (using CONFLATE option).

But first you should set the CONFLATE option before you connect:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while 1:
    time.sleep(2)  # Dummy delay
    data = socket.recv()
    print(data)

On the other word, I removed any buffered queue in subscriber code.


[NOTE]:

In addition, with the zmq.SNDBUF and zmq.RCVBUF options we could set a limit on ZMQ buffer size. (More complete and an example)



与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...