Skip to content

Commit

Permalink
Merge branch 'main' into ocr3config/returnActiveCandidate
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAustinWang authored Nov 1, 2024
2 parents 69118c4 + e4eab06 commit 7c16083
Show file tree
Hide file tree
Showing 27 changed files with 1,112 additions and 198 deletions.
6 changes: 4 additions & 2 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ packages:
github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn:
interfaces:
Controller:
PeerGroup:
PeerGroupFactory:
Stream:
github.com/smartcontractkit/chainlink-ccip/execute/internal/gas:
interfaces:
Expand All @@ -28,6 +26,10 @@ packages:
CCIPReader:
PriceReader:
RMNHome:
github.com/smartcontractkit/chainlink-ccip/pkg/peergroup:
interfaces:
PeerGroupFactory:
PeerGroup:
github.com/smartcontractkit/chainlink-ccip/pkg/contractreader:
interfaces:
Extended:
Expand Down
4 changes: 3 additions & 1 deletion commit/chainfee/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ func (p *processor) Observation(
chainFeeUpdates := FeeUpdatesFromTimestampedBig(timestampedPriceUpdates)

fChain := p.ObserveFChain()
now := time.Now().UTC()

p.lggr.Infow("observed fee components",
"supportedChains", supportedChains.ToSlice(),
"feeComponents", feeComponents,
"nativeTokenPrices", nativeTokenPrices,
"chainFeeUpdates", chainFeeUpdates,
"fChain", fChain,
"timestampNow", now,
)

return Observation{
FChain: fChain,
FeeComponents: feeComponents,
NativeTokenPrices: nativeTokenPrices,
ChainFeeUpdates: chainFeeUpdates,
TimestampNow: time.Now().UTC(),
TimestampNow: now,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions commit/chainfee/validate_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (p *processor) ValidateObservation(
}

func validateFChain(fChain map[cciptypes.ChainSelector]int) error {
for _, f := range fChain {
if f < 0 {
return fmt.Errorf("fChain %d is negative", f)
for chainSelector, f := range fChain {
if f <= 0 {
return fmt.Errorf("fChain for chain %d is not positive: %d", chainSelector, f)
}
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pkg/consts"
readerpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

func (w *Processor) ObservationQuorum(
Expand Down Expand Up @@ -86,21 +88,18 @@ func (w *Processor) initializeRMNController(ctx context.Context, prevOutcome Out
return fmt.Errorf("failed to get RMN nodes info: %w", err)
}

peerIDs := make([]string, 0, len(rmnNodesInfo))
for _, node := range rmnNodesInfo {
w.lggr.Infow("Adding RMN node to peerIDs", "node", node.PeerID.String())
peerIDs = append(peerIDs, node.PeerID.String())
}
oraclePeerIDs := make([]ragep2ptypes.PeerID, 0, len(w.oracleIDToP2pID))
for _, p2pID := range w.oracleIDToP2pID {
w.lggr.Infow("Adding oracle node to peerIDs", "p2pID", p2pID.String())
peerIDs = append(peerIDs, p2pID.String())
oraclePeerIDs = append(oraclePeerIDs, p2pID)
}

if err := w.rmnController.InitConnection(
ctx,
cciptypes.Bytes32(w.reportingCfg.ConfigDigest),
prevOutcome.RMNRemoteCfg.ConfigDigest,
peerIDs,
oraclePeerIDs,
rmnNodesInfo,
); err != nil {
return fmt.Errorf("failed to init connection to RMN: %w", err)
}
Expand Down Expand Up @@ -369,6 +368,7 @@ func (o ObserverImpl) ObserveMerkleRoots(
rootsMu := &sync.Mutex{}
wg := sync.WaitGroup{}
for _, chainRange := range ranges {
chainRange := chainRange
if supportedChains.Contains(chainRange.ChainSel) {
wg.Add(1)
go func() {
Expand Down
5 changes: 4 additions & 1 deletion commit/merkleroot/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/ragep2p/types"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -623,13 +624,15 @@ func Test_Processor_initializeRMNController(t *testing.T) {
{ID: 1, PeerID: types.PeerID{1, 2, 3}},
{ID: 10, PeerID: types.PeerID{1, 2, 31}},
}
oracleIDs := []ragep2ptypes.PeerID{}
rmnHomeReader.EXPECT().GetRMNNodesInfo(cfg.ConfigDigest).Return(rmnNodes, nil)

rmnController.EXPECT().InitConnection(
ctx,
cciptypes.Bytes32(p.reportingCfg.ConfigDigest),
cfg.ConfigDigest,
[]string{rmnNodes[0].PeerID.String(), rmnNodes[1].PeerID.String()},
oracleIDs,
rmnNodes,
).Return(nil)

err = p.initializeRMNController(ctx, Outcome{RMNRemoteCfg: cfg})
Expand Down
9 changes: 6 additions & 3 deletions commit/merkleroot/rmn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/protobuf/proto"

chainsel "github.com/smartcontractkit/chain-selectors"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

Expand Down Expand Up @@ -59,7 +60,8 @@ type Controller interface {
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string, // union of oraclePeerIDs and rmnNodePeerIDs (oracles required for peer discovery)
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,
) error

// Close closes the connection to the generic peer group endpoint and all the underlying streams.
Expand Down Expand Up @@ -242,9 +244,10 @@ func (c *controller) InitConnection(
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string,
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,
) error {
return c.peerClient.InitConnection(ctx, commitConfigDigest, rmnHomeConfigDigest, peerIDs)
return c.peerClient.InitConnection(ctx, commitConfigDigest, rmnHomeConfigDigest, oraclePeerIDs, rmnNodes)
}

func (c *controller) Close() error {
Expand Down
8 changes: 7 additions & 1 deletion commit/merkleroot/rmn/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/protobuf/proto"

chainsel "github.com/smartcontractkit/chain-selectors"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"

readerpkg_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader"

Expand Down Expand Up @@ -692,7 +693,12 @@ func (m *mockPeerClient) resetReceivedRequests() {
m.receivedRequests = make(map[rmntypes.NodeID][]*rmnpb.Request)
}

func (m *mockPeerClient) InitConnection(_ context.Context, _ cciptypes.Bytes32, _ cciptypes.Bytes32, _ []string) error {
func (m *mockPeerClient) InitConnection(
_ context.Context,
_ cciptypes.Bytes32,
_ cciptypes.Bytes32,
_ []ragep2ptypes.PeerID,
_ []rmntypes.HomeNodeInfo) error {
return nil
}

Expand Down
71 changes: 25 additions & 46 deletions commit/merkleroot/rmn/peerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ package rmn

import (
"context"
"crypto/sha256"
"encoding/binary"
"fmt"
"strings"
"sync"
"time"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/networking"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

rmntypes "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/types"
"github.com/smartcontractkit/chainlink-ccip/pkg/peergroup"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

var ErrNoConn = fmt.Errorf("no connection, please call InitConnection before further interaction")
Expand All @@ -31,7 +31,8 @@ type PeerClient interface {
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string, // union of oraclePeerIDs and rmnNodePeerIDs (oracles required for peer discovery)
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,
) error

Close() error
Expand All @@ -54,9 +55,9 @@ type PeerResponse struct {

type peerClient struct {
lggr logger.Logger
peerGroupFactory PeerGroupFactory
peerGroupCreator *peergroup.Creator
respChan chan PeerResponse
peerGroup PeerGroup // nil initially, until InitConnection is called
peerGroup peergroup.PeerGroup
genericEndpointConfigDigest cciptypes.Bytes32
rageP2PStreams map[rmntypes.NodeID]Stream
bootstrappers []commontypes.BootstrapperLocator
Expand All @@ -67,13 +68,13 @@ type peerClient struct {

func NewPeerClient(
lggr logger.Logger,
peerGroupFactory PeerGroupFactory,
peerGroupFactory peergroup.PeerGroupFactory,
bootstrappers []commontypes.BootstrapperLocator,
ocrRoundInterval time.Duration,
) PeerClient {
return &peerClient{
lggr: lggr,
peerGroupFactory: peerGroupFactory,
peerGroupCreator: peergroup.NewCreator(lggr, peerGroupFactory, bootstrappers),
respChan: make(chan PeerResponse),
peerGroup: nil,
rageP2PStreams: make(map[rmntypes.NodeID]Stream),
Expand All @@ -87,7 +88,9 @@ func NewPeerClient(
func (r *peerClient) InitConnection(
_ context.Context,
commitConfigDigest, rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string,
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,

) error {
if err := r.Close(); err != nil {
return fmt.Errorf("close existing peer group: %w", err)
Expand All @@ -96,25 +99,20 @@ func (r *peerClient) InitConnection(
r.mu.Lock()
defer r.mu.Unlock()

h := sha256.Sum256(append(commitConfigDigest[:], rmnHomeConfigDigest[:]...))
r.genericEndpointConfigDigest = writePrefix(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo, h)
r.lggr.Infow("Creating new peer group",
"genericEndpointConfigDigest", r.genericEndpointConfigDigest.String(),
"peerIDs", peerIDs,
"bootstrappers", r.bootstrappers,
)

peerGroup, err := r.peerGroupFactory.NewPeerGroup(
[32]byte(r.genericEndpointConfigDigest),
peerIDs,
r.bootstrappers,
)

result, err := r.peerGroupCreator.Create(peergroup.CreateOpts{
CommitConfigDigest: commitConfigDigest,
RMNHomeConfigDigest: rmnHomeConfigDigest,
// Note: For RMN peer client, we receive the combined peer IDs directly
// and don't need to separate oracle/RMN peers
OraclePeerIDs: oraclePeerIDs,
RMNNodes: rmnNodes,
})
if err != nil {
return fmt.Errorf("new peer group: %w", err)
return fmt.Errorf("create peer group: %w", err)
}

r.peerGroup = peerGroup
r.peerGroup = result.PeerGroup
r.genericEndpointConfigDigest = result.ConfigDigest
return nil
}

Expand All @@ -131,6 +129,8 @@ func (r *peerClient) Close() error {
return fmt.Errorf("close peer group: %w", err)
}

r.peerGroup = nil
r.rageP2PStreams = make(map[rmntypes.NodeID]Stream)
return nil
}

Expand Down Expand Up @@ -197,28 +197,7 @@ func (r *peerClient) Recv() <-chan PeerResponse {
return r.respChan
}

// writePrefix writes the prefix to the first 2 bytes of the hash.
func writePrefix(prefix ocr2types.ConfigDigestPrefix, hash cciptypes.Bytes32) cciptypes.Bytes32 {
var prefixBytes [2]byte
binary.BigEndian.PutUint16(prefixBytes[:], uint16(prefix))

hCopy := hash
hCopy[0] = prefixBytes[0]
hCopy[1] = prefixBytes[1]

return hCopy
}

// Redeclare interfaces for mocking purposes.

type PeerGroupFactory interface {
networking.PeerGroupFactory
}

type PeerGroup interface {
networking.PeerGroup
}

type Stream interface {
networking.Stream
}
19 changes: 0 additions & 19 deletions commit/merkleroot/rmn/peerclient_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions commit/tokenprice/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ func (p *processor) Observation(

feedTokenPrices := p.ObserveFeedTokenPrices(ctx)
feeQuoterUpdates := p.ObserveFeeQuoterTokenUpdates(ctx)
ts := time.Now().UTC()
now := time.Now().UTC()
p.lggr.Infow(
"observed token prices",
"feed prices", feedTokenPrices,
"fee quoter updates", feeQuoterUpdates,
"timestamp", ts,
"timestampNow", now,
)

return Observation{
FeedTokenPrices: feedTokenPrices,
FeeQuoterTokenUpdates: feeQuoterUpdates,
FChain: fChain,
Timestamp: ts,
Timestamp: now,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions commit/tokenprice/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ func (p *processor) Outcome(

consensusObservation, err := p.getConsensusObservation(aos)
if err != nil {
return Outcome{}, err
return Outcome{}, fmt.Errorf("get consensus observation: %w", err)
}

tokenPriceOutcome := p.selectTokensForUpdate(consensusObservation)
p.lggr.Infow(
"outcome token prices",
"token prices", tokenPriceOutcome,
"tokenPrices", tokenPriceOutcome,
)
return Outcome{
TokenPrices: tokenPriceOutcome,
Expand All @@ -103,7 +103,7 @@ func validateObservedTokenPrices(tokenPrices []cciptypes.TokenPrice) error {
tokensWithPrice.Add(t.TokenID)

if t.Price.IsEmpty() {
return fmt.Errorf("token price must not be empty")
return fmt.Errorf("token price of token %v must not be empty", t.TokenID)
}
}
return nil
Expand Down
Loading

0 comments on commit 7c16083

Please sign in to comment.