From eb00198742cd054b0560459532afda4cc8959369 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Thu, 5 Mar 2026 17:52:51 +0000 Subject: [PATCH 1/2] [PRIV-412] Add duplicate check for items in the pending queue --- core/services/ocr2/plugins/vault/plugin.go | 18 +- .../ocr2/plugins/vault/plugin_test.go | 155 ++++++++++++++++++ 2 files changed, 172 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/vault/plugin.go b/core/services/ocr2/plugins/vault/plugin.go index 71ca238cf9c..4a939f9b4f6 100644 --- a/core/services/ocr2/plugins/vault/plugin.go +++ b/core/services/ocr2/plugins/vault/plugin.go @@ -1034,16 +1034,24 @@ func (r *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, } } + seen := map[string]bool{} for _, i := range obs.PendingQueueItems { bh, err := r.unmarshalBlob(i) if err != nil { return fmt.Errorf("could not unmarshal blob handle from observation pending queue item: %w", err) } - _, err = blobFetcher.FetchBlob(ctx, bh) + blob, err := blobFetcher.FetchBlob(ctx, bh) if err != nil { return fmt.Errorf("could not fetch blob for observation pending queue item: %w", err) } + + sha := fmt.Sprintf("%x", sha256.Sum256(blob)) + if seen[sha] { + return errors.New("duplicate item found in pending queue item observation") + } + seen[sha] = true + } return nil @@ -1312,6 +1320,7 @@ func (r *ReportingPlugin) stateTransitionPendingQueue(ctx context.Context, store oidsToIDs := map[uint8][]string{} // for debugging only shaToItem := map[string]*vaultcommon.StoredPendingQueueItem{} for oid, o := range obs { + shaSeenForOracle := map[string]bool{} for _, pqi := range o.PendingQueueItems { bh, err := r.unmarshalBlob(pqi) if err != nil { @@ -1340,6 +1349,13 @@ func (r *ReportingPlugin) stateTransitionPendingQueue(ctx context.Context, store continue } + if shaSeenForOracle[sha] { + r.lggr.Warnw("duplicate sha found for oracle, skipping...") + continue + } + + shaSeenForOracle[sha] = true + shaToItem[sha] = i if _, ok := idToShaToCount[i.Id]; !ok { diff --git a/core/services/ocr2/plugins/vault/plugin_test.go b/core/services/ocr2/plugins/vault/plugin_test.go index f32825db6c6..66c89373555 100644 --- a/core/services/ocr2/plugins/vault/plugin_test.go +++ b/core/services/ocr2/plugins/vault/plugin_test.go @@ -2317,6 +2317,64 @@ func TestPlugin_ValidateObservations_IncludesAllItemsInPendingQueue(t *testing.T require.NoError(t, err) } +func TestPlugin_ValidateObservations_DisallowsDuplicateBlobHandles(t *testing.T) { + lggr, _ := logger.TestLoggerObserved(t, zapcore.DebugLevel) + store := requests.NewStore[*vaulttypes.Request]() + _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) + require.NoError(t, err) + r := &ReportingPlugin{ + lggr: lggr, + onchainCfg: ocr3types.ReportingPluginConfig{ + N: 4, + F: 1, + }, + store: store, + cfg: makeReportingPluginConfig( + t, + 10, + pk, + shares[0], + 1, + 1024, + 100, + 100, + 100, + ), + unmarshalBlob: mockUnmarshalBlob, + marshalBlob: mockMarshalBlob, + } + + seqNr := uint64(1) + kv := &kv{ + m: make(map[string]response), + } + + obs := &vaultcommon.Observations{ + PendingQueueItems: [][]byte{ + {0: 1}, + {0: 2}, + }, + } + obsb, err := proto.Marshal(obs) + require.NoError(t, err) + + bf := &blobber{ + blobs: [][]byte{ + {0: 1}, + {0: 1}, + }, + } + err = r.ValidateObservation( + t.Context(), + seqNr, + types.AttributedQuery{}, + types.AttributedObservation{Observer: 0, Observation: types.Observation(obsb)}, + kv, + bf, + ) + require.ErrorContains(t, err, "duplicate item found in pending queue item observation") +} + func TestPlugin_StateTransition_ShasDontMatch(t *testing.T) { lggr, observed := logger.TestLoggerObserved(t, zapcore.DebugLevel) store := requests.NewStore[*vaulttypes.Request]() @@ -4817,6 +4875,103 @@ func TestPlugin_StateTransition_StoresPendingQueue_LimitedToBatchSize(t *testing assert.ElementsMatch(t, []string{"request-id"}, ids) } +func TestPlugin_StateTransition_StoresPendingQueue_DoesntDoubleCountObservationsFromOneNode(t *testing.T) { + lggr := logger.TestLogger(t) + store := requests.NewStore[*vaulttypes.Request]() + _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) + require.NoError(t, err) + r := &ReportingPlugin{ + lggr: lggr, + store: store, + onchainCfg: ocr3types.ReportingPluginConfig{ + N: 4, + F: 1, + }, + cfg: makeReportingPluginConfig( + t, + 1, + pk, + shares[0], + 1, + 1024, + 30, + 30, + 30, + ), + unmarshalBlob: mockUnmarshalBlob, + } + + seqNr := uint64(1) + rdr := &kv{ + m: make(map[string]response), + } + + req1 := &vaultcommon.ListSecretIdentifiersRequest{ + Owner: "owner", + Namespace: "main", + RequestId: "request-id", + } + areq1, err := anypb.New(req1) + require.NoError(t, err) + + o1 := &vaultcommon.Observations{ + PendingQueueItems: [][]byte{ + {}, // maps to item 0 in the blobs + {}, // maps to item 1 in the blobs + {}, // maps to item 2 in the blobs + }, + } + o1b, err := proto.Marshal(o1) + require.NoError(t, err) + + bf := &blobber{ + blobs: [][]byte{ + protoMarshal(t, &vaultcommon.StoredPendingQueueItem{ + Id: "request-id", + Item: areq1, + }), + protoMarshal(t, &vaultcommon.StoredPendingQueueItem{ + Id: "request-id", + Item: areq1, + }), + protoMarshal(t, &vaultcommon.StoredPendingQueueItem{ + Id: "request-id", + Item: areq1, + }), + }, + } + + reportPrecursor, err := r.StateTransition( + t.Context(), + seqNr, + types.AttributedQuery{}, + []types.AttributedObservation{ + {Observer: 0, Observation: o1b}, + }, + rdr, + bf, + ) + require.NoError(t, err) + + os := &vaultcommon.Outcomes{} + err = proto.Unmarshal(reportPrecursor, os) + require.NoError(t, err) + + assert.Empty(t, os.Outcomes) + + pq, err := NewReadStore(rdr).GetPendingQueue() + require.NoError(t, err) + assert.Len(t, pq, 0) + + ids := []string{} + for _, item := range pq { + ids = append(ids, item.Id) + } + + // 1 oracle submitted duplicates, so skipping + assert.ElementsMatch(t, []string{}, ids) +} + func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *testing.T) { lggr, observed := logger.TestLoggerObserved(t, zapcore.DebugLevel) store := requests.NewStore[*vaulttypes.Request]() From ef7c36f0a92659b1450bc0bc6811b015f9b21ef6 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Fri, 6 Mar 2026 11:20:50 +0000 Subject: [PATCH 2/2] Fix linting --- core/services/ocr2/plugins/vault/plugin_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/vault/plugin_test.go b/core/services/ocr2/plugins/vault/plugin_test.go index 66c89373555..78731be9b86 100644 --- a/core/services/ocr2/plugins/vault/plugin_test.go +++ b/core/services/ocr2/plugins/vault/plugin_test.go @@ -4961,7 +4961,7 @@ func TestPlugin_StateTransition_StoresPendingQueue_DoesntDoubleCountObservations pq, err := NewReadStore(rdr).GetPendingQueue() require.NoError(t, err) - assert.Len(t, pq, 0) + assert.Empty(t, pq, 0) ids := []string{} for _, item := range pq {