Skip to content

Commit

Permalink
feat: outgoing connections (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
agaffney authored Apr 13, 2024
1 parent 1e401c1 commit 2a854c9
Show file tree
Hide file tree
Showing 12 changed files with 423 additions and 56 deletions.
15 changes: 15 additions & 0 deletions blockfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ import (
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)

func (n *Node) blockfetchServerConnOpts() []oblockfetch.BlockFetchOptionFunc {
return []oblockfetch.BlockFetchOptionFunc{
oblockfetch.WithRequestRangeFunc(n.blockfetchServerRequestRange),
}
}

func (n *Node) blockfetchClientConnOpts() []oblockfetch.BlockFetchOptionFunc {
return []oblockfetch.BlockFetchOptionFunc{
// TODO
/*
oblockfetch.WithBlockFunc(n.blockfetchClientBlock),
*/
}
}

func (n *Node) blockfetchServerRequestRange(ctx oblockfetch.CallbackContext, start ocommon.Point, end ocommon.Point) error {
// TODO: check if we have requested block range available and send NoBlocks if not
// Start async process to send requested block range
Expand Down
17 changes: 17 additions & 0 deletions chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ import (
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)

func (n *Node) chainsyncServerConnOpts() []ochainsync.ChainSyncOptionFunc {
return []ochainsync.ChainSyncOptionFunc{
ochainsync.WithFindIntersectFunc(n.chainsyncServerFindIntersect),
ochainsync.WithRequestNextFunc(n.chainsyncServerRequestNext),
}
}

func (n *Node) chainsyncClientConnOpts() []ochainsync.ChainSyncOptionFunc {
return []ochainsync.ChainSyncOptionFunc{
// TODO
/*
ochainsync.WithRollForwardFunc(n.chainsyncClientRollForward),
ochainsync.WithRollBackwardFunc(n.chainsyncClientRollBackward),
*/
}
}

func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, points []ocommon.Point) (ocommon.Point, ochainsync.Tip, error) {
var retPoint ocommon.Point
var retTip ochainsync.Tip
Expand Down
36 changes: 30 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
)

type Config struct {
logger *slog.Logger
listeners []ListenerConfig
network string
networkMagic uint32
peerSharing bool
// TODO
logger *slog.Logger
listeners []ListenerConfig
network string
networkMagic uint32
peerSharing bool
outboundSourcePort int
topologyConfig *ouroboros.TopologyConfig
}

// configPopulateNetworkMagic uses the named network (if specified) to determine the network magic value (if not specified)
Expand All @@ -52,6 +53,15 @@ func (n *Node) configValidate() error {
if len(n.config.listeners) == 0 {
return fmt.Errorf("no listeners defined")
}
for _, listener := range n.config.listeners {
if listener.Listener != nil {
continue
}
if listener.ListenNetwork != "" && listener.ListenAddress != "" {
continue
}
return fmt.Errorf("listener must provide net.Listener or listen network/address values")
}
return nil
}

Expand Down Expand Up @@ -107,3 +117,17 @@ func WithPeerSharing(peerSharing bool) ConfigOptionFunc {
c.peerSharing = peerSharing
}
}

// WithOutboundSourcePort specifies the source port to use for outbound connections. This defaults to dynamic source ports
func WithOutboundSourcePort(port int) ConfigOptionFunc {
return func(c *Config) {
c.outboundSourcePort = port
}
}

// WithTopologyConfig specifies an ouroboros.TopologyConfig to use for outbound peers
func WithTopologyConfig(topologyConfig *ouroboros.TopologyConfig) ConfigOptionFunc {
return func(c *Config) {
c.topologyConfig = topologyConfig
}
}
54 changes: 54 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package node

import (
"syscall"

"golang.org/x/sys/unix"
)

// socketControl is a helper function for setting socket options on listener and outbound sockets
func socketControl(network, address string, c syscall.RawConn) error {
var innerErr error
err := c.Control(func(fd uintptr) {
err := unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
if err != nil {
innerErr = err
return
}
err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
if err != nil {
innerErr = err
return
}
})
if innerErr != nil {
return innerErr
}
if err != nil {
return err
}
return nil
}

// mergeConnOpts merges one or more sets of connection opts into a single set
func mergeConnOpts[T any](optSets ...[]T) []T {
var tmpOpts []T
for _, optSet := range optSets {
tmpOpts = append(tmpOpts, optSet...)
}
return tmpOpts
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ go 1.21.5
require (
github.com/blinklabs-io/gouroboros v0.78.0
github.com/spf13/cobra v1.8.0
golang.org/x/sys v0.18.0
)

require (
github.com/fxamacker/cbor/v2 v2.6.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jinzhu/copier v0.4.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/utxorpc/go-codegen v0.4.4 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ github.com/blinklabs-io/ouroboros-mock v0.3.0/go.mod h1:0dzTNEk/Kvqa7qYHDy7/Nn3O
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/fxamacker/cbor/v2 v2.6.0 h1:sU6J2usfADwWlYDAFhZBQ6TnLFBHxgesMrQfQgk1tWA=
github.com/fxamacker/cbor/v2 v2.6.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
Expand All @@ -26,8 +26,6 @@ golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
16 changes: 16 additions & 0 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"log/slog"
"net"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/node"
)

Expand All @@ -39,6 +40,21 @@ func Run(logger *slog.Logger) error {
Listener: l,
},
),
// TODO: replace with parsing topology file
node.WithTopologyConfig(
&ouroboros.TopologyConfig{
PublicRoots: []ouroboros.TopologyConfigP2PPublicRoot{
{
AccessPoints: []ouroboros.TopologyConfigP2PAccessPoint{
{
Address: "preview-node.play.dev.cardano.org",
Port: 3001,
},
},
},
},
},
),
),
)
if err != nil {
Expand Down
92 changes: 57 additions & 35 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package node

import (
"context"
"fmt"
"net"

Expand All @@ -26,16 +27,32 @@ import (
)

type ListenerConfig struct {
UseNtC bool
Listener net.Listener
// TODO
UseNtC bool
Listener net.Listener
ListenNetwork string
ListenAddress string
ReuseAddress bool
}

func (n *Node) startListener(l ListenerConfig) {
func (n *Node) startListener(l ListenerConfig) error {
// Create listener if none is provided
if l.Listener == nil {
listenConfig := net.ListenConfig{}
if l.ReuseAddress {
listenConfig.Control = socketControl
}
listener, err := listenConfig.Listen(context.Background(), l.ListenNetwork, l.ListenAddress)
if err != nil {
return fmt.Errorf("failed to open listening socket: %s", err)
}
l.Listener = listener
}
// Build connection options
defaultConnOpts := []ouroboros.ConnectionOptionFunc{
ouroboros.WithNetworkMagic(n.config.networkMagic),
ouroboros.WithNodeToNode(!l.UseNtC),
ouroboros.WithServer(true),
ouroboros.WithPeerSharing(n.config.peerSharing),
}
if l.UseNtC {
// Node-to-client
Expand All @@ -49,54 +66,59 @@ func (n *Node) startListener(l ListenerConfig) {
// Node-to-node
defaultConnOpts = append(
defaultConnOpts,
// Peer sharing
ouroboros.WithPeerSharing(n.config.peerSharing),
ouroboros.WithPeerSharingConfig(
peersharing.NewConfig(
peersharing.WithShareRequestFunc(n.peersharingShareRequest),
mergeConnOpts(
n.peersharingServerConnOpts(),
)...,
),
),
// TxSubmission
ouroboros.WithTxSubmissionConfig(
txsubmission.NewConfig(
txsubmission.WithInitFunc(n.txsubmissionServerInit),
mergeConnOpts(
n.txsubmissionServerConnOpts(),
)...,
),
),
// ChainSync
ouroboros.WithChainSyncConfig(
chainsync.NewConfig(
chainsync.WithFindIntersectFunc(n.chainsyncServerFindIntersect),
chainsync.WithRequestNextFunc(n.chainsyncServerRequestNext),
mergeConnOpts(
n.chainsyncServerConnOpts(),
)...,
),
),
// BlockFetch
ouroboros.WithBlockFetchConfig(
blockfetch.NewConfig(
blockfetch.WithRequestRangeFunc(n.blockfetchServerRequestRange),
mergeConnOpts(
n.blockfetchServerConnOpts(),
)...,
),
),
)
}
for {
// Accept connection
conn, err := l.Listener.Accept()
if err != nil {
n.config.logger.Error(fmt.Sprintf("accept failed: %s", err))
continue
go func() {
for {
// Accept connection
conn, err := l.Listener.Accept()
if err != nil {
n.config.logger.Error(fmt.Sprintf("accept failed: %s", err))
continue
}
n.config.logger.Info(fmt.Sprintf("accepted connection from %s", conn.RemoteAddr()))
// Setup Ouroboros connection
connOpts := append(
defaultConnOpts,
ouroboros.WithConnection(conn),
)
oConn, err := ouroboros.NewConnection(connOpts...)
if err != nil {
n.config.logger.Error(fmt.Sprintf("failed to setup connection: %s", err))
continue
}
// Add to connection manager
// TODO: add tags for connection for later tracking
n.connManager.AddConnection(oConn)
}
n.config.logger.Info(fmt.Sprintf("accepted connection from %s", conn.RemoteAddr()))
// Setup Ouroboros connection
connOpts := append(
defaultConnOpts,
ouroboros.WithConnection(conn),
)
oConn, err := ouroboros.NewConnection(connOpts...)
if err != nil {
n.config.logger.Error(fmt.Sprintf("failed to setup connection: %s", err))
continue
}
// Add to connection manager
// TODO: add tags for connection for later tracking
n.connManager.AddConnection(oConn)
}
}()
return nil
}
Loading

0 comments on commit 2a854c9

Please sign in to comment.