Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay required after PUB bind before messages can be sent #4746

Open
JC3 opened this issue Sep 30, 2024 · 1 comment
Open

Delay required after PUB bind before messages can be sent #4746

JC3 opened this issue Sep 30, 2024 · 1 comment

Comments

@JC3
Copy link

JC3 commented Sep 30, 2024

libzmq: 4.3.5 via PyZMQ 26.2.0 (Python 3.10)
OS: macOS 13.6.6, Ubuntu 22.04

It appears that a delay is needed after creating and binding a PUB socket before the first message can be successfully published. In testing, a 200ms delay seems to be the minimum required delay. There seems to be no difference between various protocols (tried TCP and IPC). The following graph shows the send_multipart success rate vs. the delay time after bind for various delays (30 trials each).

Screenshot 2024-09-30 at 11 24 08 AM

What is going on here? Is there a more deterministic way to check if the PUB socket is ready to send rather than adding arbitrary delays?

Test Code (Python)

import zmq
import time
import multiprocessing as mp
import struct

TEST_DELAYS = range(0,251,10)
TEST_TRIALS = 30
ADDRESS = "ipc:///tmp/delay_test.ipc" # "tcp://127.0.0.1:43212"


def _sub_main (keep_running):

    ctx = zmq.Context()
    
    sub = ctx.socket(zmq.SUB)
    sub.connect(ADDRESS)
    sub.setsockopt(zmq.SUBSCRIBE, b"")
    sub.setsockopt(zmq.RCVTIMEO, 500)

    results = {}
    
    while keep_running.value:
        try:
            (delay,) = struct.unpack("<i", sub.recv_multipart()[0])
        except zmq.Again:
            continue
        results[delay] = results.get(delay, 0) + 1
        print((delay, results[delay]))

    sub.close()
    ctx.destroy()

    print("---------")
    for delay_ms, success_rate in results.items():
        print(f"{delay_ms},{success_rate/TEST_TRIALS:.3f}")


def run_test (delay):

    ctx = zmq.Context()
    pub = ctx.socket(zmq.PUB)
    pub.bind(ADDRESS)
    if delay > 0:
        time.sleep(delay/1000.0)
    pub.send_multipart([struct.pack("<i", delay)])
    pub.close()
    ctx.destroy()        

def main ():

    keep_running = mp.Value('i', 1)
    receiver = mp.Process(target=_sub_main, args=(keep_running,))
    receiver.start()

    for delay in TEST_DELAYS:
        for k in range(TEST_TRIALS):
            run_test(delay)

    keep_running.value = 0
    receiver.join()

    
if __name__ == "__main__":
    main()
@Asmod4n
Copy link
Contributor

Asmod4n commented Dec 1, 2024

that's correct and documented behavior.

A sub socket only sees messages after it has successfully connected to a pub socket.
To synchronize state you'd have to also create another type of socket which tells you at which state the sub socket is currently, aka you'd have to create some form of bookkeeping for your messages in some form of stable storage.

the "client" can then tell the "server" which messages it has received already and then the server can send whats missing.

Or you might go and use the zmq_socket_monitor to tell you when the sub socket has connected and then let the pub socket start sending messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants