diff --git a/blockfetch.go b/blockfetch.go new file mode 100644 index 0000000..6b788f2 --- /dev/null +++ b/blockfetch.go @@ -0,0 +1,50 @@ +// Copyright 2024 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package node + +import ( + oblockfetch "github.com/blinklabs-io/gouroboros/protocol/blockfetch" + ocommon "github.com/blinklabs-io/gouroboros/protocol/common" +) + +func (n *Node) blockfetchServerRequestRange(ctx oblockfetch.CallbackContext, start ocommon.Point, end ocommon.Point) error { + // TODO: check if we have requested block range available and send NoBlocks if not + // Start async process to send requested block range + go func() { + if err := ctx.Server.StartBatch(); err != nil { + return + } + for _, block := range n.chainsyncState.RecentBlocks() { + if block.Point.SlotNumber < start.Slot { + continue + } + if block.Point.SlotNumber > end.Slot { + break + } + blockBytes := block.Cbor[:] + err := ctx.Server.Block( + block.Type, + blockBytes, + ) + if err != nil { + return + } + } + if err := ctx.Server.BatchDone(); err != nil { + return + } + }() + return nil +} diff --git a/chainsync.go b/chainsync.go new file mode 100644 index 0000000..08e8cdf --- /dev/null +++ b/chainsync.go @@ -0,0 +1,119 @@ +// Copyright 2024 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package node + +import ( + "github.com/blinklabs-io/node/chainsync" + + ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync" + ocommon "github.com/blinklabs-io/gouroboros/protocol/common" +) + +func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, points []ocommon.Point) (ocommon.Point, ochainsync.Tip, error) { + var retPoint ocommon.Point + var retTip ochainsync.Tip + // Find intersection + var intersectPoint chainsync.ChainsyncPoint + for _, block := range n.chainsyncState.RecentBlocks() { + // Convert chainsync.ChainsyncPoint to ochainsync.Tip for easier comparison with ocommon.Point + blockPoint := block.Point.ToTip().Point + for _, point := range points { + if point.Slot != blockPoint.Slot { + continue + } + // Compare as string since we can't directly compare byte slices + if string(point.Hash) != string(blockPoint.Hash) { + continue + } + intersectPoint = block.Point + break + } + } + + // Populate return tip + retTip = n.chainsyncState.Tip().ToTip() + + if intersectPoint.SlotNumber == 0 { + return retPoint, retTip, ochainsync.IntersectNotFoundError + } + + // Add our client to the chainsync state + _ = n.chainsyncState.AddClient(ctx.ConnectionId, intersectPoint) + + // Populate return point + retPoint = intersectPoint.ToTip().Point + + return retPoint, retTip, nil +} + +func (n *Node) chainsyncServerRequestNext(ctx ochainsync.CallbackContext) error { + // Create/retrieve chainsync state for connection + clientState := n.chainsyncState.AddClient(ctx.ConnectionId, n.chainsyncState.Tip()) + if clientState.NeedsInitialRollback { + err := ctx.Server.RollBackward( + clientState.Cursor.ToTip().Point, + n.chainsyncState.Tip().ToTip(), + ) + if err != nil { + return err + } + clientState.NeedsInitialRollback = false + return nil + } + for { + sentAwaitReply := false + select { + case block := <-clientState.BlockChan: + // Ignore blocks older than what we've already sent + if clientState.Cursor.SlotNumber >= block.Point.SlotNumber { + continue + } + return n.chainsyncServerSendNext(ctx, block) + default: + err := ctx.Server.AwaitReply() + if err != nil { + return err + } + // Wait for next block and send + go func() { + block := <-clientState.BlockChan + _ = n.chainsyncServerSendNext(ctx, block) + }() + sentAwaitReply = true + } + if sentAwaitReply { + break + } + } + return nil +} + +func (n *Node) chainsyncServerSendNext(ctx ochainsync.CallbackContext, block chainsync.ChainsyncBlock) error { + var err error + if block.Rollback { + err = ctx.Server.RollBackward( + block.Point.ToTip().Point, + n.chainsyncState.Tip().ToTip(), + ) + } else { + blockBytes := block.Cbor[:] + err = ctx.Server.RollForward( + block.Type, + blockBytes, + n.chainsyncState.Tip().ToTip(), + ) + } + return err +} diff --git a/chainsync/chainsync.go b/chainsync/chainsync.go new file mode 100644 index 0000000..b856f6f --- /dev/null +++ b/chainsync/chainsync.go @@ -0,0 +1,192 @@ +// Copyright 2024 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chainsync + +import ( + "encoding/hex" + "fmt" + "sync" + + ouroboros "github.com/blinklabs-io/gouroboros" + "github.com/blinklabs-io/gouroboros/connection" + ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync" + ocommon "github.com/blinklabs-io/gouroboros/protocol/common" +) + +const ( + maxRecentBlocks = 20 // Number of recent blocks to cache +) + +type ChainsyncPoint struct { + SlotNumber uint64 + BlockHash string + BlockNumber uint64 +} + +func (c ChainsyncPoint) String() string { + return fmt.Sprintf( + "< slot_number = %d, block_number = %d, block_hash = %s >", + c.SlotNumber, + c.BlockNumber, + c.BlockHash, + ) +} + +func (c ChainsyncPoint) ToTip() ochainsync.Tip { + hashBytes, _ := hex.DecodeString(c.BlockHash) + return ochainsync.Tip{ + BlockNumber: c.BlockNumber, + Point: ocommon.Point{ + Slot: c.SlotNumber, + Hash: hashBytes[:], + }, + } +} + +type ChainsyncBlock struct { + Point ChainsyncPoint + Cbor []byte + Type uint + Rollback bool +} + +func (c ChainsyncBlock) String() string { + return fmt.Sprintf( + "%s (%d bytes)", + c.Point.String(), + len(c.Cbor), + ) +} + +type ChainsyncClientState struct { + Cursor ChainsyncPoint + BlockChan chan ChainsyncBlock + NeedsInitialRollback bool +} + +type State struct { + sync.Mutex + tip ChainsyncPoint + clients map[ouroboros.ConnectionId]*ChainsyncClientState + recentBlocks []ChainsyncBlock // TODO: replace with hook(s) for block storage/retrieval + subs map[ouroboros.ConnectionId]chan ChainsyncBlock +} + +func NewState() *State { + return &State{ + clients: make(map[ouroboros.ConnectionId]*ChainsyncClientState), + } +} + +func (s *State) Tip() ChainsyncPoint { + return s.tip +} + +func (s *State) RecentBlocks() []ChainsyncBlock { + // TODO: replace with hook to get recent blocks + return s.recentBlocks[:] +} + +func (s *State) AddClient(connId connection.ConnectionId, cursor ChainsyncPoint) *ChainsyncClientState { + s.Lock() + defer s.Unlock() + // Create initial chainsync state for connection + if _, ok := s.clients[connId]; !ok { + s.clients[connId] = &ChainsyncClientState{ + Cursor: cursor, + BlockChan: s.sub(connId), + NeedsInitialRollback: true, + } + } + return s.clients[connId] +} + +func (s *State) RemoveClient(connId connection.ConnectionId) { + s.Lock() + defer s.Unlock() + if clientState, ok := s.clients[connId]; ok { + // Unsub from chainsync updates + if clientState.BlockChan != nil { + s.unsub(connId) + } + // Remove client state entry + delete(s.clients, connId) + } +} + +func (s *State) sub(key ouroboros.ConnectionId) chan ChainsyncBlock { + s.Lock() + defer s.Unlock() + tmpChan := make(chan ChainsyncBlock, maxRecentBlocks) + if s.subs == nil { + s.subs = make(map[ouroboros.ConnectionId]chan ChainsyncBlock) + } + s.subs[key] = tmpChan + // Send all current blocks + for _, block := range s.recentBlocks { + tmpChan <- block + } + return tmpChan +} + +func (s *State) unsub(key ouroboros.ConnectionId) { + s.Lock() + defer s.Unlock() + if _, ok := s.subs[key]; ok { + close(s.subs[key]) + delete(s.subs, key) + } +} + +func (s *State) AddBlock(block ChainsyncBlock) { + s.Lock() + defer s.Unlock() + // TODO: add hooks for storing new blocks + s.recentBlocks = append( + s.recentBlocks, + block, + ) + // Prune older blocks + if len(s.recentBlocks) > maxRecentBlocks { + s.recentBlocks = s.recentBlocks[len(s.recentBlocks)-maxRecentBlocks:] + } + // Publish new block to subscribers + for _, pubChan := range s.subs { + pubChan <- block + } +} + +func (s *State) Rollback(slot uint64, hash string) { + s.Lock() + defer s.Unlock() + // TODO: add hook for getting recent blocks + // Remove recent blocks newer than the rollback block + for idx, block := range s.recentBlocks { + if block.Point.SlotNumber > slot { + s.recentBlocks = s.recentBlocks[:idx] + break + } + } + // Publish rollback to subscribers + for _, pubChan := range s.subs { + pubChan <- ChainsyncBlock{ + Rollback: true, + Point: ChainsyncPoint{ + SlotNumber: slot, + BlockHash: hash, + }, + } + } +} diff --git a/listener.go b/listener.go index 028c81b..5a79103 100644 --- a/listener.go +++ b/listener.go @@ -19,6 +19,8 @@ import ( "net" ouroboros "github.com/blinklabs-io/gouroboros" + "github.com/blinklabs-io/gouroboros/protocol/blockfetch" + "github.com/blinklabs-io/gouroboros/protocol/chainsync" "github.com/blinklabs-io/gouroboros/protocol/peersharing" "github.com/blinklabs-io/gouroboros/protocol/txsubmission" ) @@ -60,8 +62,19 @@ func (n *Node) startListener(l ListenerConfig) { txsubmission.WithInitFunc(n.txsubmissionServerInit), ), ), - // TODO: add chain-sync - // TODO: add block-fetch + // ChainSync + ouroboros.WithChainSyncConfig( + chainsync.NewConfig( + chainsync.WithFindIntersectFunc(n.chainsyncServerFindIntersect), + chainsync.WithRequestNextFunc(n.chainsyncServerRequestNext), + ), + ), + // BlockFetch + ouroboros.WithBlockFetchConfig( + blockfetch.NewConfig( + blockfetch.WithRequestRangeFunc(n.blockfetchServerRequestRange), + ), + ), ) } for { diff --git a/node.go b/node.go index 6a96e04..58b5c4a 100644 --- a/node.go +++ b/node.go @@ -17,18 +17,22 @@ package node import ( "fmt" + "github.com/blinklabs-io/node/chainsync" + ouroboros "github.com/blinklabs-io/gouroboros" ) type Node struct { - config Config - connManager *ouroboros.ConnectionManager + config Config + connManager *ouroboros.ConnectionManager + chainsyncState *chainsync.State // TODO } func New(cfg Config) (*Node, error) { n := &Node{ - config: cfg, + config: cfg, + chainsyncState: chainsync.NewState(), } if err := n.configPopulateNetworkMagic(); err != nil { return nil, fmt.Errorf("invalid configuration: %s", err) @@ -68,5 +72,7 @@ func (n *Node) connectionManagerConnClosed(connId ouroboros.ConnectionId, err er } // Remove connection n.connManager.RemoveConnection(connId) + // Remove any chainsync client state + n.chainsyncState.RemoveClient(connId) // TODO: additional cleanup }