Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No messages getting through XPUB/XSUB proxy #108

Open
stitchinthyme opened this issue Jun 24, 2021 · 7 comments
Open

No messages getting through XPUB/XSUB proxy #108

stitchinthyme opened this issue Jun 24, 2021 · 7 comments
Labels
bug Something isn't working

Comments

@stitchinthyme
Copy link

stitchinthyme commented Jun 24, 2021

I wrote the following small test server, which creates XPUB and XSUB sockets, and a proxy to send all messages received on the XSUB port to the XPUB port:

package main

import (
    "context"
    "log"
    "time"
    zmq "github.com/go-zeromq/zmq4"
)

func main() {
    pub_conn_str := "tcp://*:9002"
    sub_conn_str := "tcp://*:9003"
    pub_sock := zmq.NewXPub(context.Background())
    sub_sock := zmq.NewXSub(context.Background())
    if err := pub_sock.Listen(pub_conn_str); err != nil {
        log.Fatalf("Error binding pub to %s: %v\n", pub_conn_str, err)
    }
    if err := sub_sock.Listen(sub_conn_str); err != nil {
        log.Fatalf("Error binding sub to %s: %v\n", sub_conn_str, err)
    }
    log.Println("Sleeping 5s...")
    time.Sleep(5*time.Second)
    log.Println("Starting proxy")
    p := zmq.NewProxy(context.Background(), sub_sock, pub_sock, nil) // send anything received on sub_sock to pub_sock
    p.Run()
}

And a test client which connects to both ports, sends a series of 10 messages to the pub port (which is the server's sub port), and listens for them on the sub port (the server's pub port):

package main

import (
    "context"
    "log"
    "strconv"
    "sync"
    "time"
    zmq "github.com/go-zeromq/zmq4"
)

func main() {
    var wg sync.WaitGroup
    sub_conn_str := "tcp://localhost:9002"  // server's pub socket is our sub socket
    pub_conn_str := "tcp://localhost:9003"  //  ...and vice versa
    pub_sock := zmq.NewPub(context.Background())
    sub_sock := zmq.NewSub(context.Background())
    if err := pub_sock.Dial(pub_conn_str); err != nil {
        log.Fatalf("Error connecting pub to %s: %v\n", pub_conn_str, err)
    }
    if err := sub_sock.Dial(sub_conn_str); err != nil {
        log.Fatalf("Error connecting sub to %s: %v\n", sub_conn_str, err)
    }
    if err := sub_sock.SetOption(zmq.OptionSubscribe, ""); err != nil { // subscribe to all messages
        log.Fatalf("Subscribe error: %v\n", err)
        return
    }
    log.Printf("Sleeping 5s...")
    time.Sleep(5*time.Second)

    wg.Add(1)
    go func() {  // receiver goroutine - wait for incoming messages
        defer wg.Done()
        log.Println("Receiver starting")
        for {
            if msg, err := sub_sock.Recv(); err != nil {
                log.Printf("Recv error: %v\n", err)
            } else {
                log.Printf("Received msg [%s] %s\n", msg.Frames[0], msg.Frames[1])
            }
        }
    }()

    // Just send numbers 0-9, 1s apart
    for i := 0; i < 10; i++ {
        msg := zmq.NewMsgFrom([]byte("TEST"), []byte(strconv.FormatInt(int64(i), 16)))
        log.Printf("Sending message %d\n", i)
        if err := pub_sock.Send(msg); err != nil {
            log.Printf("Send error: %v\n", err)
        }
        time.Sleep(1*time.Second)
    }
    wg.Wait()
}

When I run the server in one window and the client in another, the server binds the ports properly and the client seems to be connecting and sending the messages, but the receiver in the client never gets them and never prints anything. I'm expecting to see a series of "Sending message [0-9]" and "Received msg [TEST] [0-9]". I see the "Sending message" prints, but not the "Received" ones.

I'm using Go 1.13 on Ubuntu 18.04.

One more note: I have almost exactly this same code using https://github.com/pebbe/zmq4 (with adjustments for API differences like Bind() vs. Listen(), etc.) and it works fine.

@junchuanwang
Copy link

I actually think the bug is in XSUB.
.recv() will block until an EOF is sent.

@stitchinthyme
Copy link
Author

Actually, it seems to be because there is no subscribe logic in the SUB/XSUB sockets. SUB does keep a map of subscribed topics, but it doesn't actually use it to screen the messages. Instead, that logic is in the PUB thread -- it won't send a message unless the PUB socket is subscribed to it, which is counter to how ZMQ is supposed to work -- you subscribe to topics with SUB/XSUB and the publisher shouldn't have to know or care what you're subscribed to.

@sbinet
Copy link
Contributor

sbinet commented Aug 28, 2021

it's probably because I initially wrote this with a heavy inspiration from nanomsg/mango that has subscription logic (IIRC).

PRs welcomed.

@sbinet sbinet added the bug Something isn't working label Aug 28, 2021
@stitchinthyme
Copy link
Author

I made a fix, but it hadn't finished building before it was time to knock off work for the day, so I'll test it fully on Monday.

@woodyiorl
Copy link

I made a fix, but it hadn't finished building before it was time to knock off work for the day, so I'll test it fully on Monday.

Any update, sir? @stitchinthyme

stitchinthyme pushed a commit to stitchinthyme/zmq4 that referenced this issue Sep 13, 2021
@stitchinthyme
Copy link
Author

Any update, sir? @stitchinthyme

Just submitted the PR, which works with my test programs above.

P.S. Not a 'sir'. :-)

@junchuanwang
Copy link

@stitchinthyme My bad, lol! thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants