Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: guard operating on nil resource fields #14

Merged
merged 1 commit into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion blockfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ func (n *Node) blockfetchClientConnOpts() []oblockfetch.BlockFetchOptionFunc {
}
}

func (n *Node) blockfetchServerRequestRange(ctx oblockfetch.CallbackContext, start ocommon.Point, end ocommon.Point) error {
func (n *Node) blockfetchServerRequestRange(
ctx oblockfetch.CallbackContext,
start ocommon.Point,
end ocommon.Point,
) error {
// TODO: check if we have requested block range available and send NoBlocks if not
// Start async process to send requested block range
go func() {
Expand Down
19 changes: 15 additions & 4 deletions chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func (n *Node) chainsyncClientStart(connId ouroboros.ConnectionId) error {
return nil
}

func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, points []ocommon.Point) (ocommon.Point, ochainsync.Tip, error) {
func (n *Node) chainsyncServerFindIntersect(
ctx ochainsync.CallbackContext,
points []ocommon.Point,
) (ocommon.Point, ochainsync.Tip, error) {
var retPoint ocommon.Point
var retTip ochainsync.Tip
// Find intersection
Expand Down Expand Up @@ -95,9 +98,14 @@ func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, poin
return retPoint, retTip, nil
}

func (n *Node) chainsyncServerRequestNext(ctx ochainsync.CallbackContext) error {
func (n *Node) chainsyncServerRequestNext(
ctx ochainsync.CallbackContext,
) error {
// Create/retrieve chainsync state for connection
clientState := n.chainsyncState.AddClient(ctx.ConnectionId, n.chainsyncState.Tip())
clientState := n.chainsyncState.AddClient(
ctx.ConnectionId,
n.chainsyncState.Tip(),
)
if clientState.NeedsInitialRollback {
err := ctx.Server.RollBackward(
clientState.Cursor.ToTip().Point,
Expand Down Expand Up @@ -137,7 +145,10 @@ func (n *Node) chainsyncServerRequestNext(ctx ochainsync.CallbackContext) error
return nil
}

func (n *Node) chainsyncServerSendNext(ctx ochainsync.CallbackContext, block chainsync.ChainsyncBlock) error {
func (n *Node) chainsyncServerSendNext(
ctx ochainsync.CallbackContext,
block chainsync.ChainsyncBlock,
) error {
var err error
if block.Rollback {
err = ctx.Server.RollBackward(
Expand Down
5 changes: 4 additions & 1 deletion chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func (s *State) RecentBlocks() []ChainsyncBlock {
return s.recentBlocks[:]
}

func (s *State) AddClient(connId connection.ConnectionId, cursor ChainsyncPoint) *ChainsyncClientState {
func (s *State) AddClient(
connId connection.ConnectionId,
cursor ChainsyncPoint,
) *ChainsyncClientState {
s.Lock()
defer s.Unlock()
// Create initial chainsync state for connection
Expand Down
6 changes: 4 additions & 2 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func main() {
}

// Global flags
rootCmd.PersistentFlags().BoolVarP(&globalFlags.debug, "debug", "D", false, "enable debug logging")
rootCmd.PersistentFlags().BoolVarP(&globalFlags.version, "version", "", false, "show version and exit")
rootCmd.PersistentFlags().
BoolVarP(&globalFlags.debug, "debug", "D", false, "enable debug logging")
rootCmd.PersistentFlags().
BoolVarP(&globalFlags.version, "version", "", false, "show version and exit")

// Execute cobra command
if err := rootCmd.Execute(); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func (n *Node) configPopulateNetworkMagic() error {

func (n *Node) configValidate() error {
if n.config.networkMagic == 0 {
return fmt.Errorf("invalid network magic value: %d", n.config.networkMagic)
return fmt.Errorf(
"invalid network magic value: %d",
n.config.networkMagic,
)
}
if len(n.config.listeners) == 0 {
return fmt.Errorf("no listeners defined")
Expand All @@ -60,7 +63,9 @@ func (n *Node) configValidate() error {
if listener.ListenNetwork != "" && listener.ListenAddress != "" {
continue
}
return fmt.Errorf("listener must provide net.Listener or listen network/address values")
return fmt.Errorf(
"listener must provide net.Listener or listen network/address values",
)
}
return nil
}
Expand Down Expand Up @@ -126,7 +131,9 @@ func WithOutboundSourcePort(port int) ConfigOptionFunc {
}

// WithTopologyConfig specifies an ouroboros.TopologyConfig to use for outbound peers
func WithTopologyConfig(topologyConfig *ouroboros.TopologyConfig) ConfigOptionFunc {
func WithTopologyConfig(
topologyConfig *ouroboros.TopologyConfig,
) ConfigOptionFunc {
return func(c *Config) {
c.topologyConfig = topologyConfig
}
Expand Down
7 changes: 6 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import (
func socketControl(network, address string, c syscall.RawConn) error {
var innerErr error
err := c.Control(func(fd uintptr) {
err := unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
err := unix.SetsockoptInt(
int(fd),
unix.SOL_SOCKET,
unix.SO_REUSEADDR,
1,
)
if err != nil {
innerErr = err
return
Expand Down
14 changes: 11 additions & 3 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func (n *Node) startListener(l ListenerConfig) error {
if l.ReuseAddress {
listenConfig.Control = socketControl
}
listener, err := listenConfig.Listen(context.Background(), l.ListenNetwork, l.ListenAddress)
listener, err := listenConfig.Listen(
context.Background(),
l.ListenNetwork,
l.ListenAddress,
)
if err != nil {
return fmt.Errorf("failed to open listening socket: %s", err)
}
Expand Down Expand Up @@ -108,15 +112,19 @@ func (n *Node) startListener(l ListenerConfig) error {
n.config.logger.Error(fmt.Sprintf("accept failed: %s", err))
continue
}
n.config.logger.Info(fmt.Sprintf("accepted connection from %s", conn.RemoteAddr()))
n.config.logger.Info(
fmt.Sprintf("accepted connection from %s", conn.RemoteAddr()),
)
// Setup Ouroboros connection
connOpts := append(
defaultConnOpts,
ouroboros.WithConnection(conn),
)
oConn, err := ouroboros.NewConnection(connOpts...)
if err != nil {
n.config.logger.Error(fmt.Sprintf("failed to setup connection: %s", err))
n.config.logger.Error(
fmt.Sprintf("failed to setup connection: %s", err),
)
continue
}
// Add to connection manager
Expand Down
36 changes: 26 additions & 10 deletions mempool/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func newConsumer() *MempoolConsumer {
}

func (m *MempoolConsumer) NextTx(blocking bool) *MempoolTransaction {
if m == nil {
return nil
}
var ret *MempoolTransaction
if blocking {
// Wait until a transaction is available
Expand Down Expand Up @@ -59,28 +62,41 @@ func (m *MempoolConsumer) NextTx(blocking bool) *MempoolTransaction {
}

func (m *MempoolConsumer) GetTxFromCache(hash string) *MempoolTransaction {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
return m.cache[hash]
if m != nil {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
return m.cache[hash]
}
var ret *MempoolTransaction
return ret
}

func (m *MempoolConsumer) ClearCache() {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
m.cache = make(map[string]*MempoolTransaction)
if m != nil {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
m.cache = make(map[string]*MempoolTransaction)
}
}

func (m *MempoolConsumer) RemoveTxFromCache(hash string) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
delete(m.cache, hash)
if m != nil {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
delete(m.cache, hash)
}
}

func (m *MempoolConsumer) stop() {
close(m.txChan)
if m != nil {
close(m.txChan)
}
}

func (m *MempoolConsumer) pushTx(tx *MempoolTransaction, wait bool) bool {
if m == nil {
return false
}
if wait {
// Block on write to channel
m.txChan <- tx
Expand Down
10 changes: 8 additions & 2 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ func (m *Mempool) removeExpired() {
if tx.LastSeen.Before(expiredBefore) {
m.removeTransaction(tx.Hash)
m.logger.Debug(
fmt.Sprintf("removed expired transaction %s from mempool", tx.Hash),
fmt.Sprintf(
"removed expired transaction %s from mempool",
tx.Hash,
),
)
}
}
Expand All @@ -148,7 +151,10 @@ func (m *Mempool) AddTransaction(tx MempoolTransaction) error {
if existingTx != nil {
tx.LastSeen = time.Now()
m.logger.Debug(
fmt.Sprintf("updated last seen for transaction %s in mempool", tx.Hash),
fmt.Sprintf(
"updated last seen for transaction %s in mempool",
tx.Hash,
),
)
return nil
}
Expand Down
13 changes: 11 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,18 @@ func (n *Node) Run() error {
select {}
}

func (n *Node) connectionManagerConnClosed(connId ouroboros.ConnectionId, err error) {
func (n *Node) connectionManagerConnClosed(
connId ouroboros.ConnectionId,
err error,
) {
if err != nil {
n.config.logger.Error(fmt.Sprintf("unexpected connection failure: %s: %s", connId.String(), err))
n.config.logger.Error(
fmt.Sprintf(
"unexpected connection failure: %s: %s",
connId.String(),
err,
),
)
} else {
n.config.logger.Info(fmt.Sprintf("connection closed: %s", connId.String()))
}
Expand Down
38 changes: 31 additions & 7 deletions outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,25 @@ type outboundPeer struct {
func (n *Node) startOutboundConnections() {
var tmpHosts []string
for _, host := range n.config.topologyConfig.Producers {
tmpHosts = append(tmpHosts, net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))))
tmpHosts = append(
tmpHosts,
net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))),
)
}
for _, localRoot := range n.config.topologyConfig.LocalRoots {
for _, host := range localRoot.AccessPoints {
tmpHosts = append(tmpHosts, net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))))
tmpHosts = append(
tmpHosts,
net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))),
)
}
}
for _, publicRoot := range n.config.topologyConfig.PublicRoots {
for _, host := range publicRoot.AccessPoints {
tmpHosts = append(tmpHosts, net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))))
tmpHosts = append(
tmpHosts,
net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))),
)
}
}
// Start outbound connections
Expand All @@ -60,7 +69,11 @@ func (n *Node) startOutboundConnections() {
go func(peer outboundPeer) {
if err := n.createOutboundConnection(peer); err != nil {
n.config.logger.Error(
fmt.Sprintf("failed to establish connection to %s: %s", peer.Address, err),
fmt.Sprintf(
"failed to establish connection to %s: %s",
peer.Address,
err,
),
)
go n.reconnectOutboundConnection(peer)
}
Expand All @@ -77,7 +90,10 @@ func (n *Node) createOutboundConnection(peer outboundPeer) error {
if n.config.outboundSourcePort > 0 {
// Setup connection to use our listening port as the source port
// This is required for peer sharing to be useful
clientAddr, _ = net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", n.config.outboundSourcePort))
clientAddr, _ = net.ResolveTCPAddr(
"tcp",
fmt.Sprintf(":%d", n.config.outboundSourcePort),
)
dialer.LocalAddr = clientAddr
dialer.Control = socketControl
}
Expand Down Expand Up @@ -167,12 +183,20 @@ func (n *Node) reconnectOutboundConnection(peer outboundPeer) {
peer.ReconnectDelay = peer.ReconnectDelay * reconnectBackoffFactor
}
n.config.logger.Info(
fmt.Sprintf("delaying %s before reconnecting to %s", peer.ReconnectDelay, peer.Address),
fmt.Sprintf(
"delaying %s before reconnecting to %s",
peer.ReconnectDelay,
peer.Address,
),
)
time.Sleep(peer.ReconnectDelay)
if err := n.createOutboundConnection(peer); err != nil {
n.config.logger.Error(
fmt.Sprintf("failed to establish connection to %s: %s", peer.Address, err),
fmt.Sprintf(
"failed to establish connection to %s: %s",
peer.Address,
err,
),
)
continue
}
Expand Down
5 changes: 4 additions & 1 deletion peersharing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func (n *Node) peersharingClientConnOpts() []opeersharing.PeerSharingOptionFunc
}
}

func (n *Node) peersharingShareRequest(ctx opeersharing.CallbackContext, amount int) ([]opeersharing.PeerAddress, error) {
func (n *Node) peersharingShareRequest(
ctx opeersharing.CallbackContext,
amount int,
) ([]opeersharing.PeerAddress, error) {
// TODO: add hooks for getting peers to share
return []opeersharing.PeerAddress{}, nil
}
Loading