From 7997a5fde4b62b2b2b0cf4b55af5a98e67225dbb Mon Sep 17 00:00:00 2001 From: Jarrod Baumann Date: Wed, 21 Jun 2023 16:48:47 +0000 Subject: [PATCH] initial passive example --- cmd/gobmp/gobmp.go | 22 +++--- go.mod | 4 +- go.sum | 4 + pkg/gobmpsrv/gobmpsrv.go | 15 +++- pkg/nats/nats-publisher_test.go | 126 ++++++++++++++++++++++++++++++++ 5 files changed, 159 insertions(+), 12 deletions(-) create mode 100644 pkg/nats/nats-publisher_test.go diff --git a/cmd/gobmp/gobmp.go b/cmd/gobmp/gobmp.go index 6644c485..8c711543 100644 --- a/cmd/gobmp/gobmp.go +++ b/cmd/gobmp/gobmp.go @@ -22,15 +22,16 @@ import ( ) var ( - dstPort int - srcPort int - perfPort int - kafkaSrv string - natsSrv string - intercept string - splitAF string - dump string - file string + dstPort int + srcPort int + perfPort int + kafkaSrv string + natsSrv string + passiveRtr string + intercept string + splitAF string + dump string + file string ) func init() { @@ -39,6 +40,7 @@ func init() { flag.IntVar(&dstPort, "destination-port", 5050, "port openBMP is listening") flag.StringVar(&kafkaSrv, "kafka-server", "", "URL to access Kafka server") flag.StringVar(&natsSrv, "nats-server", "", "URL to access NATS server") + flag.StringVar(&passiveRtr, "passive-router", "", "Passive BMP router to connect outbound (:)") flag.StringVar(&intercept, "intercept", "false", "When intercept set \"true\", all incomming BMP messges will be copied to TCP port specified by destination-port, otherwise received BMP messages will be published to Kafka.") flag.StringVar(&splitAF, "split-af", "true", "When set \"true\" (default) ipv4 and ipv6 will be published in separate topics. if set \"false\" the same topic will be used for both address families.") flag.IntVar(&perfPort, "performance-port", 56767, "port used for performance debugging") @@ -98,7 +100,7 @@ func main() { glog.Errorf("failed to parse to bool the value of the intercept flag with error: %+v", err) os.Exit(1) } - bmpSrv, err := gobmpsrv.NewBMPServer(srcPort, dstPort, interceptFlag, publisher, splitAFFlag) + bmpSrv, err := gobmpsrv.NewBMPServer(srcPort, dstPort, interceptFlag, publisher, splitAFFlag, passiveRtr) if err != nil { glog.Errorf("failed to setup new gobmp server with error: %+v", err) os.Exit(1) diff --git a/go.mod b/go.mod index a256e129..d936332f 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,10 @@ require ( github.com/go-test/deep v1.0.8 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.5.3 // indirect - github.com/nats-io/nats-server/v2 v2.9.16 // indirect + github.com/nats-io/nats-server/v2 v2.9.16 github.com/nats-io/nats.go v1.25.0 + github.com/pkg/errors v0.9.1 // indirect github.com/sbezverk/tools v0.0.0-20220706091339-17ec2f713538 google.golang.org/protobuf v1.30.0 // indirect + gotest.tools v2.2.0+incompatible ) diff --git a/go.sum b/go.sum index d1f7e977..475d8d57 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= @@ -171,3 +173,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200601152816-913338de1bd2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= diff --git a/pkg/gobmpsrv/gobmpsrv.go b/pkg/gobmpsrv/gobmpsrv.go index 13721038..6dd262cc 100644 --- a/pkg/gobmpsrv/gobmpsrv.go +++ b/pkg/gobmpsrv/gobmpsrv.go @@ -26,6 +26,7 @@ type bmpServer struct { destinationPort int incoming net.Listener stop chan struct{} + passiveRouter string } func (srv *bmpServer) Start() { @@ -43,6 +44,17 @@ func (srv *bmpServer) Stop() { } func (srv *bmpServer) server() { + // Establish connection to passive router if specified + if srv.passiveRouter != "" { + conn, err := net.Dial("tcp", srv.passiveRouter) + if err != nil { + glog.Errorf("failed to connect to passive router with error: %+v", err) + return + } + glog.Infof("connected to passive router %+v, calling bmpWorker", conn.RemoteAddr()) + go srv.bmpWorker(conn) + } + for { client, err := srv.incoming.Accept() if err != nil { @@ -117,7 +129,7 @@ func (srv *bmpServer) bmpWorker(client net.Conn) { } // NewBMPServer instantiates a new instance of BMP Server -func NewBMPServer(sPort, dPort int, intercept bool, p pub.Publisher, splitAF bool) (BMPServer, error) { +func NewBMPServer(sPort, dPort int, intercept bool, p pub.Publisher, splitAF bool, passiveRouter string) (BMPServer, error) { incoming, err := net.Listen("tcp", fmt.Sprintf(":%d", sPort)) if err != nil { glog.Errorf("fail to setup listener on port %d with error: %+v", sPort, err) @@ -131,6 +143,7 @@ func NewBMPServer(sPort, dPort int, intercept bool, p pub.Publisher, splitAF boo publisher: p, incoming: incoming, splitAF: splitAF, + passiveRouter: passiveRouter, } return &bmp, nil diff --git a/pkg/nats/nats-publisher_test.go b/pkg/nats/nats-publisher_test.go new file mode 100644 index 00000000..2d582665 --- /dev/null +++ b/pkg/nats/nats-publisher_test.go @@ -0,0 +1,126 @@ +package nats + +import ( + "encoding/json" + "fmt" + "net" + "testing" + "time" + + natssrv "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/sbezverk/gobmp/pkg/gobmpsrv" + "github.com/sbezverk/gobmp/pkg/message" + "github.com/sbezverk/gobmp/pkg/pub" + "gotest.tools/assert" +) + +const ( + streamName = "gobmp" + natsPrefix = "gobmp" + bmpPort = 5000 +) + +var ( + p pub.Publisher + s nats.JetStreamContext + b gobmpsrv.BMPServer +) + +func TestMain(m *testing.M) { + // build in-memory NATS server + natsSrv, err := natssrv.NewServer(&natssrv.Options{ + Host: "127.0.0.1", + Debug: false, + Port: natssrv.RANDOM_PORT, + JetStream: true, + }) + if err != nil { + panic(err) + } + + defer natsSrv.Shutdown() + + // Start NATS server + if err := natssrv.Run(natsSrv); err != nil { + panic(err) + } + + // Create a NATS connection for subscribing + nc, err := nats.Connect(natsSrv.ClientURL()) + if err != nil { + panic(err) + } + defer nc.Close() + + s, err = nc.JetStream() + if err != nil { + panic(err) + } + + // Create the stream + _, err = s.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{streamName + ".>"}, + }) + if err != nil { + panic(err) + } + + // Create the Publisher + p, err = NewPublisher(natsSrv.ClientURL()) + if err != nil { + panic(err) + } + + // Start BMP + b, err = gobmpsrv.NewBMPServer(bmpPort, 0, false, p, false, "") + if err != nil { + panic(err) + } + + // Starting Interceptor server + b.Start() + + m.Run() +} + +// TestNATSProducer tests NATS producer +func TestNATSProducer(t *testing.T) { + input := []byte{3, 0, 0, 0, 32, 4, 0, 1, 0, 10, 32, 55, 46, 50, 46, 49, 46, 50, 51, 73, 0, 2, 0, 8, 120, 114, 118, 57, 107, 45, 114, 49, 3, 0, 0, 0, 234, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 80, 103, 0, 0, 19, 206, 57, 112, 1, 254, 94, 98, 129, 171, 0, 0, 215, 126, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 80, 128, 0, 179, 131, 152, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 91, 1, 4, 19, 206, 0, 90, 192, 168, 8, 8, 62, 2, 6, 1, 4, 0, 1, 0, 1, 2, 6, 1, 4, 0, 1, 0, 4, 2, 6, 1, 4, 0, 1, 0, 128, 2, 2, 128, 0, 2, 2, 2, 0, 2, 6, 65, 4, 0, 0, 19, 206, 2, 20, 5, 18, 0, 1, 0, 1, 0, 2, 0, 1, 0, 2, 0, 2, 0, 1, 0, 128, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 75, 1, 4, 19, 206, 0, 90, 57, 112, 1, 254, 46, 2, 44, 2, 0, 1, 4, 0, 1, 0, 1, 1, 4, 0, 2, 0, 1, 1, 4, 0, 1, 0, 4, 1, 4, 0, 2, 0, 4, 1, 4, 0, 1, 0, 128, 1, 4, 0, 2, 0, 128, 65, 4, 0, 0, 19, 206} + + // Create a connection to the BMP server + conn, err := net.Dial("tcp", fmt.Sprintf(":%d", bmpPort)) + if err != nil { + t.Fatalf("failed to connect to gobmp server with error: %+v", err) + } + defer conn.Close() + + // Send message to the BMP server + _, err = conn.Write(input) + if err != nil { + t.Fatalf("failed to send message to gobmp server with error: %+v", err) + } + + // Wait for message to be published + sub, err := s.SubscribeSync(streamName + ".>") + if err != nil { + t.Fatalf("failed to subscribe to stream with error: %+v", err) + } + + // Wait to receive the message from the publisher + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("failed to pull message from stream with error: %+v", err) + } + + // Unmarshal msg.Data (bytes) to message.PeerStateChange + var peerStateChange message.PeerStateChange + err = json.Unmarshal(msg.Data, &peerStateChange) + if err != nil { + t.Fatalf("failed to unmarshal message with error: %+v", err) + } + + assert.Equal(t, peerStateChange.Action, "add") + assert.Equal(t, peerStateChange.RouterHash, "4371c52d8d4a6a67a4c438964f61700b") +}