Skip to content

Commit

Permalink
feat: chainsync client (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
agaffney authored Apr 13, 2024
1 parent 2a854c9 commit ff3ebf2
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 14 deletions.
84 changes: 79 additions & 5 deletions chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
package node

import (
"encoding/hex"
"fmt"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/node/chainsync"

"github.com/blinklabs-io/gouroboros/ledger"
ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)
Expand All @@ -30,14 +35,29 @@ func (n *Node) chainsyncServerConnOpts() []ochainsync.ChainSyncOptionFunc {

func (n *Node) chainsyncClientConnOpts() []ochainsync.ChainSyncOptionFunc {
return []ochainsync.ChainSyncOptionFunc{
// TODO
/*
ochainsync.WithRollForwardFunc(n.chainsyncClientRollForward),
ochainsync.WithRollBackwardFunc(n.chainsyncClientRollBackward),
*/
ochainsync.WithRollForwardFunc(n.chainsyncClientRollForward),
ochainsync.WithRollBackwardFunc(n.chainsyncClientRollBackward),
}
}

func (n *Node) chainsyncClientStart(connId ouroboros.ConnectionId) error {
conn := n.connManager.GetConnectionById(connId)
if conn == nil {
return fmt.Errorf("failed to lookup connection ID: %s", connId.String())
}
oConn := conn.Conn
// TODO: use our recent blocks to build intersect points
tip, err := oConn.ChainSync().Client.GetCurrentTip()
if err != nil {
return err
}
intersectPoints := []ocommon.Point{tip.Point}
if err := oConn.ChainSync().Client.Sync(intersectPoints); err != nil {
return err
}
return nil
}

func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, points []ocommon.Point) (ocommon.Point, ochainsync.Tip, error) {
var retPoint ocommon.Point
var retTip ochainsync.Tip
Expand Down Expand Up @@ -134,3 +154,57 @@ func (n *Node) chainsyncServerSendNext(ctx ochainsync.CallbackContext, block cha
}
return err
}

func (n *Node) chainsyncClientRollBackward(
ctx ochainsync.CallbackContext,
point ocommon.Point,
tip ochainsync.Tip,
) error {
n.chainsyncState.Rollback(
point.Slot,
hex.EncodeToString(point.Hash),
)
return nil
}

func (n *Node) chainsyncClientRollForward(
ctx ochainsync.CallbackContext,
blockType uint,
blockData interface{},
tip ochainsync.Tip,
) error {
var blk ledger.Block
switch v := blockData.(type) {
case ledger.Block:
blk = v
case ledger.BlockHeader:
conn := n.connManager.GetConnectionById(ctx.ConnectionId)
if conn == nil {
return fmt.Errorf("failed to lookup connection ID: %s", ctx.ConnectionId.String())
}
oConn := conn.Conn
blockSlot := v.SlotNumber()
blockHash, _ := hex.DecodeString(v.Hash())
tmpBlock, err := oConn.BlockFetch().Client.GetBlock(ocommon.Point{Slot: blockSlot, Hash: blockHash})
if err != nil {
return err
}
blk = tmpBlock
default:
return fmt.Errorf("unexpected block data type: %T", v)
}
n.chainsyncState.AddBlock(
chainsync.ChainsyncBlock{
Point: chainsync.ChainsyncPoint{
SlotNumber: blk.SlotNumber(),
BlockHash: blk.Hash(),
// TODO: figure out something for Byron. this won't work, since the
// block number isn't stored in the block itself
BlockNumber: blk.BlockNumber(),
},
Cbor: blk.Cbor(),
Type: blockType,
},
)
return nil
}
18 changes: 18 additions & 0 deletions chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type State struct {
clients map[ouroboros.ConnectionId]*ChainsyncClientState
recentBlocks []ChainsyncBlock // TODO: replace with hook(s) for block storage/retrieval
subs map[ouroboros.ConnectionId]chan ChainsyncBlock
clientConnId *ouroboros.ConnectionId // TODO: replace with handling of multiple chainsync clients
}

func NewState() *State {
Expand Down Expand Up @@ -126,6 +127,23 @@ func (s *State) RemoveClient(connId connection.ConnectionId) {
}
}

// TODO: replace with handling of multiple chainsync clients
func (s *State) GetClientConnId() *ouroboros.ConnectionId {
return s.clientConnId
}

// TODO: replace with handling of multiple chainsync clients
func (s *State) SetClientConnId(connId ouroboros.ConnectionId) {
s.clientConnId = &connId
}

// TODO: replace with handling of multiple chainsync clients
func (s *State) RemoveClientConnId(connId ouroboros.ConnectionId) {
if *s.clientConnId == connId {
s.clientConnId = nil
}
}

func (s *State) sub(key ouroboros.ConnectionId) chan ChainsyncBlock {
s.Lock()
defer s.Unlock()
Expand Down
16 changes: 10 additions & 6 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ func (n *Node) startListener(l ListenerConfig) error {
}
if l.UseNtC {
// Node-to-client
defaultConnOpts = append(
defaultConnOpts,
// TODO: add localtxsubmission
// TODO: add localstatequery
// TODO: add localtxmonitor
)
// TODO: uncomment once we take care of any of the below TODO items
// This is disabled to stop lint errors about the append being an effective no-op
/*
defaultConnOpts = append(
defaultConnOpts,
// TODO: add localtxsubmission
// TODO: add localstatequery
// TODO: add localtxmonitor
)
*/
} else {
// Node-to-node
defaultConnOpts = append(
Expand Down
11 changes: 9 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type Node struct {
chainsyncState *chainsync.State
outboundConns map[ouroboros.ConnectionId]outboundPeer
outboundConnsMutex sync.Mutex
// TODO
}

func New(cfg Config) (*Node, error) {
Expand Down Expand Up @@ -85,5 +84,13 @@ func (n *Node) connectionManagerConnClosed(connId ouroboros.ConnectionId, err er
n.connManager.RemoveConnection(connId)
// Remove any chainsync client state
n.chainsyncState.RemoveClient(connId)
// TODO: additional cleanup
// Outbound connections
n.outboundConnsMutex.Lock()
if peer, ok := n.outboundConns[connId]; ok {
// Release chainsync client
n.chainsyncState.RemoveClientConnId(connId)
// Reconnect outbound connection
go n.reconnectOutboundConnection(peer)
}
n.outboundConnsMutex.Unlock()
}
12 changes: 11 additions & 1 deletion outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,17 @@ func (n *Node) createOutboundConnection(peer outboundPeer) error {
n.outboundConnsMutex.Lock()
n.outboundConns[oConn.Id()] = peer
n.outboundConnsMutex.Unlock()
// TODO: start protocol clients (txsubmission/chainsync)
// TODO: replace this with handling for multiple chainsync clients
// Start chainsync client if we don't have another
n.chainsyncState.Lock()
defer n.chainsyncState.Unlock()
chainsyncClientConnId := n.chainsyncState.GetClientConnId()
if chainsyncClientConnId == nil {
if err := n.chainsyncClientStart(oConn.Id()); err != nil {
return err
}
}
// TODO: start txsubmission client
return nil
}

Expand Down

0 comments on commit ff3ebf2

Please sign in to comment.