Skip to content

Commit

Permalink
feat: initial chainsync/blockfetch server support
Browse files Browse the repository at this point in the history
  • Loading branch information
agaffney committed Apr 11, 2024
1 parent f5ce6b0 commit 5975944
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 5 deletions.
50 changes: 50 additions & 0 deletions blockfetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package node

import (
oblockfetch "github.com/blinklabs-io/gouroboros/protocol/blockfetch"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)

func (n *Node) blockfetchServerRequestRange(ctx oblockfetch.CallbackContext, start ocommon.Point, end ocommon.Point) error {
// TODO: check if we have requested block range available and send NoBlocks if not
// Start async process to send requested block range
go func() {
if err := ctx.Server.StartBatch(); err != nil {
return
}
for _, block := range n.chainsyncState.RecentBlocks() {
if block.Point.SlotNumber < start.Slot {
continue
}
if block.Point.SlotNumber > end.Slot {
break
}
blockBytes := block.Cbor[:]
err := ctx.Server.Block(
block.Type,
blockBytes,
)
if err != nil {
return
}
}
if err := ctx.Server.BatchDone(); err != nil {
return
}
}()
return nil
}
119 changes: 119 additions & 0 deletions chainsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package node

import (
"github.com/blinklabs-io/node/chainsync"

ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)

func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, points []ocommon.Point) (ocommon.Point, ochainsync.Tip, error) {
var retPoint ocommon.Point
var retTip ochainsync.Tip
// Find intersection
var intersectPoint chainsync.ChainsyncPoint
for _, block := range n.chainsyncState.RecentBlocks() {
// Convert chainsync.ChainsyncPoint to ochainsync.Tip for easier comparison with ocommon.Point
blockPoint := block.Point.ToTip().Point
for _, point := range points {
if point.Slot != blockPoint.Slot {
continue
}
// Compare as string since we can't directly compare byte slices
if string(point.Hash) != string(blockPoint.Hash) {
continue
}
intersectPoint = block.Point
break
}
}

// Populate return tip
retTip = n.chainsyncState.Tip().ToTip()

if intersectPoint.SlotNumber == 0 {
return retPoint, retTip, ochainsync.IntersectNotFoundError
}

// Add our client to the chainsync state
_ = n.chainsyncState.AddClient(ctx.ConnectionId, intersectPoint)

// Populate return point
retPoint = intersectPoint.ToTip().Point

return retPoint, retTip, nil
}

func (n *Node) chainsyncServerRequestNext(ctx ochainsync.CallbackContext) error {
// Create/retrieve chainsync state for connection
clientState := n.chainsyncState.AddClient(ctx.ConnectionId, n.chainsyncState.Tip())
if clientState.NeedsInitialRollback {
err := ctx.Server.RollBackward(
clientState.Cursor.ToTip().Point,
n.chainsyncState.Tip().ToTip(),
)
if err != nil {
return err
}
clientState.NeedsInitialRollback = false
return nil
}
for {
sentAwaitReply := false
select {
case block := <-clientState.BlockChan:
// Ignore blocks older than what we've already sent
if clientState.Cursor.SlotNumber >= block.Point.SlotNumber {
continue
}
return n.chainsyncServerSendNext(ctx, block)
default:
err := ctx.Server.AwaitReply()
if err != nil {
return err
}
// Wait for next block and send
go func() {
block := <-clientState.BlockChan
_ = n.chainsyncServerSendNext(ctx, block)
}()
sentAwaitReply = true
}
if sentAwaitReply {
break
}
}
return nil
}

func (n *Node) chainsyncServerSendNext(ctx ochainsync.CallbackContext, block chainsync.ChainsyncBlock) error {
var err error
if block.Rollback {
err = ctx.Server.RollBackward(
block.Point.ToTip().Point,
n.chainsyncState.Tip().ToTip(),
)
} else {
blockBytes := block.Cbor[:]
err = ctx.Server.RollForward(
block.Type,
blockBytes,
n.chainsyncState.Tip().ToTip(),
)
}
return err
}
192 changes: 192 additions & 0 deletions chainsync/chainsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chainsync

import (
"encoding/hex"
"fmt"
"sync"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/connection"
ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)

const (
maxRecentBlocks = 20 // Number of recent blocks to cache
)

type ChainsyncPoint struct {
SlotNumber uint64
BlockHash string
BlockNumber uint64
}

func (c ChainsyncPoint) String() string {
return fmt.Sprintf(
"< slot_number = %d, block_number = %d, block_hash = %s >",
c.SlotNumber,
c.BlockNumber,
c.BlockHash,
)
}

func (c ChainsyncPoint) ToTip() ochainsync.Tip {
hashBytes, _ := hex.DecodeString(c.BlockHash)
return ochainsync.Tip{
BlockNumber: c.BlockNumber,
Point: ocommon.Point{
Slot: c.SlotNumber,
Hash: hashBytes[:],
},
}
}

type ChainsyncBlock struct {
Point ChainsyncPoint
Cbor []byte
Type uint
Rollback bool
}

func (c ChainsyncBlock) String() string {
return fmt.Sprintf(
"%s (%d bytes)",
c.Point.String(),
len(c.Cbor),
)
}

type ChainsyncClientState struct {
Cursor ChainsyncPoint
BlockChan chan ChainsyncBlock
NeedsInitialRollback bool
}

type State struct {
sync.Mutex
tip ChainsyncPoint
clients map[ouroboros.ConnectionId]*ChainsyncClientState
recentBlocks []ChainsyncBlock // TODO: replace with hook(s) for block storage/retrieval
subs map[ouroboros.ConnectionId]chan ChainsyncBlock
}

func NewState() *State {
return &State{
clients: make(map[ouroboros.ConnectionId]*ChainsyncClientState),
}
}

func (s *State) Tip() ChainsyncPoint {
return s.tip
}

func (s *State) RecentBlocks() []ChainsyncBlock {
// TODO: replace with hook to get recent blocks
return s.recentBlocks[:]
}

func (s *State) AddClient(connId connection.ConnectionId, cursor ChainsyncPoint) *ChainsyncClientState {
s.Lock()
defer s.Unlock()
// Create initial chainsync state for connection
if _, ok := s.clients[connId]; !ok {
s.clients[connId] = &ChainsyncClientState{
Cursor: cursor,
BlockChan: s.sub(connId),
NeedsInitialRollback: true,
}
}
return s.clients[connId]
}

func (s *State) RemoveClient(connId connection.ConnectionId) {
s.Lock()
defer s.Unlock()
if clientState, ok := s.clients[connId]; ok {
// Unsub from chainsync updates
if clientState.BlockChan != nil {
s.unsub(connId)
}
// Remove client state entry
delete(s.clients, connId)
}
}

func (s *State) sub(key ouroboros.ConnectionId) chan ChainsyncBlock {
s.Lock()
defer s.Unlock()
tmpChan := make(chan ChainsyncBlock, maxRecentBlocks)
if s.subs == nil {
s.subs = make(map[ouroboros.ConnectionId]chan ChainsyncBlock)
}
s.subs[key] = tmpChan
// Send all current blocks
for _, block := range s.recentBlocks {
tmpChan <- block
}
return tmpChan
}

func (s *State) unsub(key ouroboros.ConnectionId) {
s.Lock()
defer s.Unlock()
if _, ok := s.subs[key]; ok {
close(s.subs[key])
delete(s.subs, key)
}
}

func (s *State) AddBlock(block ChainsyncBlock) {
s.Lock()
defer s.Unlock()
// TODO: add hooks for storing new blocks
s.recentBlocks = append(
s.recentBlocks,
block,
)
// Prune older blocks
if len(s.recentBlocks) > maxRecentBlocks {
s.recentBlocks = s.recentBlocks[len(s.recentBlocks)-maxRecentBlocks:]
}
// Publish new block to subscribers
for _, pubChan := range s.subs {
pubChan <- block
}
}

func (s *State) Rollback(slot uint64, hash string) {
s.Lock()
defer s.Unlock()
// TODO: add hook for getting recent blocks
// Remove recent blocks newer than the rollback block
for idx, block := range s.recentBlocks {
if block.Point.SlotNumber > slot {
s.recentBlocks = s.recentBlocks[:idx]
break
}
}
// Publish rollback to subscribers
for _, pubChan := range s.subs {
pubChan <- ChainsyncBlock{
Rollback: true,
Point: ChainsyncPoint{
SlotNumber: slot,
BlockHash: hash,
},
}
}
}
17 changes: 15 additions & 2 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"net"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/protocol/blockfetch"
"github.com/blinklabs-io/gouroboros/protocol/chainsync"
"github.com/blinklabs-io/gouroboros/protocol/peersharing"
"github.com/blinklabs-io/gouroboros/protocol/txsubmission"
)
Expand Down Expand Up @@ -60,8 +62,19 @@ func (n *Node) startListener(l ListenerConfig) {
txsubmission.WithInitFunc(n.txsubmissionServerInit),
),
),
// TODO: add chain-sync
// TODO: add block-fetch
// ChainSync
ouroboros.WithChainSyncConfig(
chainsync.NewConfig(
chainsync.WithFindIntersectFunc(n.chainsyncServerFindIntersect),
chainsync.WithRequestNextFunc(n.chainsyncServerRequestNext),
),
),
// BlockFetch
ouroboros.WithBlockFetchConfig(
blockfetch.NewConfig(
blockfetch.WithRequestRangeFunc(n.blockfetchServerRequestRange),
),
),
)
}
for {
Expand Down
Loading

0 comments on commit 5975944

Please sign in to comment.