diff --git a/commit/merkleroot/rmn/controller.go b/commit/merkleroot/rmn/controller.go index 5cdd2e33..53dc4df2 100644 --- a/commit/merkleroot/rmn/controller.go +++ b/commit/merkleroot/rmn/controller.go @@ -145,37 +145,22 @@ func (c *controller) ComputeReportSignatures( updateRequests []*rmnpb.FixedDestLaneUpdateRequest, rmnRemoteCfg rmntypes.RemoteConfig, ) (*ReportSignatures, error) { - rmnNodeInfo := make(map[rmntypes.NodeID]rmntypes.HomeNodeInfo) - rmnNodes, err := c.rmnHomeReader.GetRMNNodesInfo(rmnRemoteCfg.ConfigDigest) if err != nil { return nil, fmt.Errorf("get rmn nodes info: %w", err) } - c.lggr.Infow("got RMN nodes info", "nodes", rmnNodes) - c.lggr.Infow("requested updates", "updates", updateRequests) - - // Group the lane update requests by their source chain and mark the RMN nodes that can sign each update - // based on whether it supports the source chain or not. - updatesPerChain := make(map[uint64]updateRequestWithMeta) - for _, updateReq := range updateRequests { - if _, exists := updatesPerChain[updateReq.LaneSource.SourceChainSelector]; exists { - return nil, errors.New("controller implementation assumes each lane update is for a different chain") - } - - updatesPerChain[updateReq.LaneSource.SourceChainSelector] = updateRequestWithMeta{ - Data: updateReq, - RmnNodes: mapset.NewSet[rmntypes.NodeID](), - } + rmnNodeInfo := make(map[rmntypes.NodeID]rmntypes.HomeNodeInfo, len(rmnNodes)) + for _, node := range rmnNodes { + rmnNodeInfo[node.ID] = node + } - for _, node := range rmnNodes { - rmnNodeInfo[node.ID] = node + c.lggr.Infow("got RMN nodes info", "nodes", rmnNodeInfo) + c.lggr.Infow("requested updates", "updates", updateRequests) - // If RMN node supports the chain add it to the list of RMN nodes that can sign the update. - if node.SupportedSourceChains.Contains(cciptypes.ChainSelector(updateReq.LaneSource.SourceChainSelector)) { - updatesPerChain[updateReq.LaneSource.SourceChainSelector].RmnNodes.Add(node.ID) - } - } + updatesPerChain, err := populateUpdatesPerChain(updateRequests, rmnNodes) + if err != nil { + return nil, fmt.Errorf("process update requests: %w", err) } homeFMap, err := c.rmnHomeReader.GetF(rmnRemoteCfg.ConfigDigest) @@ -240,6 +225,35 @@ func (c *controller) ComputeReportSignatures( return rmnReportSignatures, nil } +// populateUpdatesPerChain processes a list of update requests, groups the lane updates by their source chain +// and populates the items with metadata for each update request, including a set of RMN nodes supporting that request. +func populateUpdatesPerChain( + updateRequests []*rmnpb.FixedDestLaneUpdateRequest, + rmnNodes []rmntypes.HomeNodeInfo, +) (map[uint64]updateRequestWithMeta, error) { + updatesPerChain := make(map[uint64]updateRequestWithMeta) + + for _, updateReq := range updateRequests { + if _, exists := updatesPerChain[updateReq.LaneSource.SourceChainSelector]; exists { + return nil, errors.New("controller implementation assumes each lane update is for a different chain") + } + + updatesPerChain[updateReq.LaneSource.SourceChainSelector] = updateRequestWithMeta{ + Data: updateReq, + RmnNodes: mapset.NewSet[rmntypes.NodeID](), + } + + for _, node := range rmnNodes { + // If RMN node supports the chain add it to the list of RMN nodes that can sign the update. + if node.SupportedSourceChains.Contains(cciptypes.ChainSelector(updateReq.LaneSource.SourceChainSelector)) { + updatesPerChain[updateReq.LaneSource.SourceChainSelector].RmnNodes.Add(node.ID) + } + } + } + + return updatesPerChain, nil +} + func (c *controller) InitConnection( ctx context.Context, commitConfigDigest cciptypes.Bytes32, diff --git a/commit/merkleroot/rmn/controller_test.go b/commit/merkleroot/rmn/controller_test.go index 6a387c0d..c904b7eb 100644 --- a/commit/merkleroot/rmn/controller_test.go +++ b/commit/merkleroot/rmn/controller_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ed25519" "crypto/sha256" + "errors" "fmt" "sort" "strconv" @@ -287,6 +288,122 @@ func Test_selectRoots(t *testing.T) { } } +func Test_populateUpdatesPerChain(t *testing.T) { + testCases := []struct { + name string + updateRequests []*rmnpb.FixedDestLaneUpdateRequest + rmnNodes []rmntypes.HomeNodeInfo + rmnNodeInfo map[rmntypes.NodeID]rmntypes.HomeNodeInfo + expectedResult map[uint64]updateRequestWithMeta + expectedError error + }{ + { + name: "single update request, single supported node", + updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{ + {LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + }, + rmnNodes: []rmntypes.HomeNodeInfo{ + { + ID: 1, + SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1)), + }, + }, + expectedResult: map[uint64]updateRequestWithMeta{ + 1: { + Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + RmnNodes: mapset.NewSet[rmntypes.NodeID](rmntypes.NodeID(1)), + }, + }, + expectedError: nil, + }, + { + name: "duplicate sourceChainSelector error", + updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{ + {LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + {LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + }, + rmnNodes: []rmntypes.HomeNodeInfo{ + { + ID: 1, + SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1)), + }, + }, + rmnNodeInfo: map[rmntypes.NodeID]rmntypes.HomeNodeInfo{}, + expectedError: errors.New("controller implementation assumes each lane update is for a different chain"), + }, + { + name: "Single Update Request, No Supported Nodes", + updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{ + {LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + }, + rmnNodes: []rmntypes.HomeNodeInfo{ + { + ID: 2, + SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(2)), + }, + }, + expectedResult: map[uint64]updateRequestWithMeta{ + 1: { + Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + RmnNodes: mapset.NewSet[rmntypes.NodeID](), + }, + }, + expectedError: nil, + }, + { + name: "multiple update requests, multiple nodes", + updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{ + {LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + {LaneSource: &rmnpb.LaneSource{SourceChainSelector: 2}}, + {LaneSource: &rmnpb.LaneSource{SourceChainSelector: 3}}, + }, + rmnNodes: []rmntypes.HomeNodeInfo{ + {ID: 1, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1))}, + {ID: 2, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1))}, + {ID: 3, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(2))}, + {ID: 4, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1))}, + }, + expectedResult: map[uint64]updateRequestWithMeta{ + 1: { + Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}}, + RmnNodes: mapset.NewSet[rmntypes.NodeID](rmntypes.NodeID(1), rmntypes.NodeID(2), rmntypes.NodeID(4)), + }, + 2: { + Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 2}}, + RmnNodes: mapset.NewSet[rmntypes.NodeID](rmntypes.NodeID(3)), + }, + 3: { + Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 3}}, + RmnNodes: mapset.NewSet[rmntypes.NodeID](), + }, + }, + expectedError: nil, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // Run the function + result, err := populateUpdatesPerChain(tt.updateRequests, tt.rmnNodes) + + // Check for expected error + if tt.expectedError != nil { + assert.EqualError(t, err, tt.expectedError.Error()) + } else { + assert.NoError(t, err) + } + + // Check for expected result + assert.Equal(t, tt.expectedResult, result) + + // Check if rmnNodeInfo was populated correctly + for id, info := range tt.rmnNodeInfo { + assert.Equal(t, info, tt.rmnNodeInfo[id]) + } + }) + } +} + func TestClient_ComputeReportSignatures(t *testing.T) { newTestSetup := func(t *testing.T) testSetup { lggr := logger.Test(t) diff --git a/commit/merkleroot/rmn/peerclient.go b/commit/merkleroot/rmn/peerclient.go index 8e4da38d..57f21cb9 100644 --- a/commit/merkleroot/rmn/peerclient.go +++ b/commit/merkleroot/rmn/peerclient.go @@ -160,10 +160,7 @@ func (r *peerClient) getOrCreateRageP2PStream(rmnNode rmntypes.HomeNodeInfo) (St } rmnPeerID := rmnNode.PeerID.String() - - // todo: versioning for stream names e.g. for 'v1_7' - streamName := fmt.Sprintf("ccip-rmn/v1_6/%s", - strings.TrimPrefix(r.genericEndpointConfigDigest.String(), "0x")) + streamName := rmnNode.StreamNamePrefix + strings.TrimPrefix(r.genericEndpointConfigDigest.String(), "0x") r.lggr.Infow("creating new stream", "streamName", streamName, diff --git a/commit/merkleroot/rmn/types/config.go b/commit/merkleroot/rmn/types/config.go index a9d4daa8..de27b889 100644 --- a/commit/merkleroot/rmn/types/config.go +++ b/commit/merkleroot/rmn/types/config.go @@ -26,6 +26,7 @@ type HomeNodeInfo struct { PeerID ragep2ptypes.PeerID // The peer ID of the node SupportedSourceChains mapset.Set[cciptypes.ChainSelector] // Set of supported source chains by the node OffchainPublicKey *ed25519.PublicKey // The public key is used to verify observations + StreamNamePrefix string // RageP2P stream name prefix e.g. "ccip-rmn/v1_6/" } // RemoteConfig contains the configuration fetched from the RMNRemote contract. diff --git a/internal/libs/slicelib/generic.go b/internal/libs/slicelib/generic.go index 157c9044..e815e0ea 100644 --- a/internal/libs/slicelib/generic.go +++ b/internal/libs/slicelib/generic.go @@ -1,19 +1,5 @@ package slicelib -// GroupBy groups a slice based on a specific item property. The returned groups slice is deterministic. -func GroupBy[T any, K comparable](items []T, prop func(T) K) ([]K, map[K][]T) { - groups := make([]K, 0) - grouped := make(map[K][]T) - for _, item := range items { - k := prop(item) - if _, exists := grouped[k]; !exists { - groups = append(groups, k) - } - grouped[k] = append(grouped[k], item) - } - return groups, grouped -} - // CountUnique counts the unique items of the provided slice. func CountUnique[T comparable](items []T) int { m := make(map[T]struct{}) diff --git a/internal/libs/slicelib/generic_test.go b/internal/libs/slicelib/generic_test.go index 83b55e86..2a111aba 100644 --- a/internal/libs/slicelib/generic_test.go +++ b/internal/libs/slicelib/generic_test.go @@ -6,68 +6,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestGroupBy(t *testing.T) { - type person struct { - id string - name string - age int - } - - testCases := []struct { - name string - items []person - expGroupNames []string - expGroups map[string][]person - }{ - { - name: "empty slice", - items: []person{}, - expGroupNames: []string{}, - expGroups: map[string][]person{}, - }, - { - name: "no duplicate", - items: []person{ - {id: "2", name: "Bob", age: 25}, - {id: "1", name: "Alice", age: 23}, - {id: "3", name: "Charlie", age: 22}, - {id: "4", name: "Dim", age: 13}, - }, - expGroupNames: []string{"2", "1", "3", "4"}, // should be deterministic - expGroups: map[string][]person{ - "1": {{id: "1", name: "Alice", age: 23}}, - "2": {{id: "2", name: "Bob", age: 25}}, - "3": {{id: "3", name: "Charlie", age: 22}}, - "4": {{id: "4", name: "Dim", age: 13}}, - }, - }, - { - name: "with duplicate", - items: []person{ - {id: "1", name: "Alice", age: 23}, - {id: "1", name: "Bob", age: 25}, - {id: "3", name: "Charlie", age: 22}, - }, - expGroupNames: []string{"1", "3"}, - expGroups: map[string][]person{ - "1": {{id: "1", name: "Alice", age: 23}, {id: "1", name: "Bob", age: 25}}, - "3": {{id: "3", name: "Charlie", age: 22}}, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - keys, groups := GroupBy(tc.items, func(p person) string { return p.id }) - assert.Equal(t, tc.expGroupNames, keys) - assert.Equal(t, len(tc.expGroups), len(groups)) - for _, k := range keys { - assert.Equal(t, tc.expGroups[k], groups[k]) - } - }) - } -} - func TestCountUnique(t *testing.T) { testCases := []struct { name string diff --git a/pkg/reader/rmn_home.go b/pkg/reader/rmn_home.go index f2e3a44b..bc9df5f3 100644 --- a/pkg/reader/rmn_home.go +++ b/pkg/reader/rmn_home.go @@ -290,6 +290,7 @@ func convertOnChainConfigToRMNHomeChainConfig( PeerID: ragep2ptypes.PeerID(node.PeerID), OffchainPublicKey: &pubKey, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](), + StreamNamePrefix: "ccip-rmn/v1_6/", // todo: when contract is updated, this should be fetched from the contract } }