Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion core/services/ocr2/plugins/vault/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,16 +1034,24 @@ func (r *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64,
}
}

seen := map[string]bool{}
for _, i := range obs.PendingQueueItems {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a check on the total size of this obs.PendingQueueItems?
Don't want a bad oracle to send a too large number of items here.

bh, err := r.unmarshalBlob(i)
if err != nil {
return fmt.Errorf("could not unmarshal blob handle from observation pending queue item: %w", err)
}

_, err = blobFetcher.FetchBlob(ctx, bh)
blob, err := blobFetcher.FetchBlob(ctx, bh)
if err != nil {
Comment on lines +1037 to 1045
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate detection in ValidateObservation happens after unmarshalling and fetching the blob, so a repeated blob-handle entry can still trigger repeated FetchBlob calls and hashing work. Consider first deduping based on the raw handle bytes (obs.PendingQueueItems entries) before calling FetchBlob, and (if you need content-based dedupe) computing the sha from the decoded StoredPendingQueueItem via the existing shaForProto helper for canonicalization.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional nice to have this

return fmt.Errorf("could not fetch blob for observation pending queue item: %w", err)
}

sha := fmt.Sprintf("%x", sha256.Sum256(blob))
if seen[sha] {
return errors.New("duplicate item found in pending queue item observation")
}
seen[sha] = true

}

return nil
Expand Down Expand Up @@ -1312,6 +1320,7 @@ func (r *ReportingPlugin) stateTransitionPendingQueue(ctx context.Context, store
oidsToIDs := map[uint8][]string{} // for debugging only
shaToItem := map[string]*vaultcommon.StoredPendingQueueItem{}
for oid, o := range obs {
shaSeenForOracle := map[string]bool{}
for _, pqi := range o.PendingQueueItems {
bh, err := r.unmarshalBlob(pqi)
if err != nil {
Expand Down Expand Up @@ -1340,6 +1349,13 @@ func (r *ReportingPlugin) stateTransitionPendingQueue(ctx context.Context, store
continue
}

if shaSeenForOracle[sha] {
r.lggr.Warnw("duplicate sha found for oracle, skipping...")
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Warnw call for duplicate pending-queue items doesn’t include any context (e.g., oracle id, request id, sha), which makes it hard to debug which observer is misbehaving and what was duplicated. Recommend adding structured fields like "oracleID", "sha", and/or the decoded item Id (and consider lowering to Debug if it can be triggered frequently).

Suggested change
r.lggr.Warnw("duplicate sha found for oracle, skipping...")
r.lggr.Warnw("duplicate sha found for oracle, skipping...",
"oracleID", oid,
"sha", sha,
"itemID", i.Id,
)

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good find.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps also log the blob for which we saw this. So we can troubleshoot and see the raw request which was being duplicated.

continue
}

shaSeenForOracle[sha] = true

shaToItem[sha] = i

if _, ok := idToShaToCount[i.Id]; !ok {
Expand Down
155 changes: 155 additions & 0 deletions core/services/ocr2/plugins/vault/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,64 @@ func TestPlugin_ValidateObservations_IncludesAllItemsInPendingQueue(t *testing.T
require.NoError(t, err)
}

func TestPlugin_ValidateObservations_DisallowsDuplicateBlobHandles(t *testing.T) {
lggr, _ := logger.TestLoggerObserved(t, zapcore.DebugLevel)
store := requests.NewStore[*vaulttypes.Request]()
_, pk, shares, err := tdh2easy.GenerateKeys(1, 3)
require.NoError(t, err)
r := &ReportingPlugin{
lggr: lggr,
onchainCfg: ocr3types.ReportingPluginConfig{
N: 4,
F: 1,
},
store: store,
cfg: makeReportingPluginConfig(
t,
10,
pk,
shares[0],
1,
1024,
100,
100,
100,
),
unmarshalBlob: mockUnmarshalBlob,
marshalBlob: mockMarshalBlob,
}

seqNr := uint64(1)
kv := &kv{
m: make(map[string]response),
}

obs := &vaultcommon.Observations{
PendingQueueItems: [][]byte{
{0: 1},
{0: 2},
},
}
obsb, err := proto.Marshal(obs)
require.NoError(t, err)

bf := &blobber{
blobs: [][]byte{
{0: 1},
{0: 1},
},
}
err = r.ValidateObservation(
t.Context(),
seqNr,
types.AttributedQuery{},
types.AttributedObservation{Observer: 0, Observation: types.Observation(obsb)},
kv,
bf,
)
require.ErrorContains(t, err, "duplicate item found in pending queue item observation")
}

func TestPlugin_StateTransition_ShasDontMatch(t *testing.T) {
lggr, observed := logger.TestLoggerObserved(t, zapcore.DebugLevel)
store := requests.NewStore[*vaulttypes.Request]()
Expand Down Expand Up @@ -4817,6 +4875,103 @@ func TestPlugin_StateTransition_StoresPendingQueue_LimitedToBatchSize(t *testing
assert.ElementsMatch(t, []string{"request-id"}, ids)
}

func TestPlugin_StateTransition_StoresPendingQueue_DoesntDoubleCountObservationsFromOneNode(t *testing.T) {
lggr := logger.TestLogger(t)
store := requests.NewStore[*vaulttypes.Request]()
_, pk, shares, err := tdh2easy.GenerateKeys(1, 3)
require.NoError(t, err)
r := &ReportingPlugin{
lggr: lggr,
store: store,
onchainCfg: ocr3types.ReportingPluginConfig{
N: 4,
F: 1,
},
cfg: makeReportingPluginConfig(
t,
1,
pk,
shares[0],
1,
1024,
30,
30,
30,
),
unmarshalBlob: mockUnmarshalBlob,
}

seqNr := uint64(1)
rdr := &kv{
m: make(map[string]response),
}

req1 := &vaultcommon.ListSecretIdentifiersRequest{
Owner: "owner",
Namespace: "main",
RequestId: "request-id",
}
areq1, err := anypb.New(req1)
require.NoError(t, err)

o1 := &vaultcommon.Observations{
PendingQueueItems: [][]byte{
{}, // maps to item 0 in the blobs
{}, // maps to item 1 in the blobs
{}, // maps to item 2 in the blobs
},
}
o1b, err := proto.Marshal(o1)
require.NoError(t, err)

bf := &blobber{
blobs: [][]byte{
protoMarshal(t, &vaultcommon.StoredPendingQueueItem{
Id: "request-id",
Item: areq1,
}),
protoMarshal(t, &vaultcommon.StoredPendingQueueItem{
Id: "request-id",
Item: areq1,
}),
protoMarshal(t, &vaultcommon.StoredPendingQueueItem{
Id: "request-id",
Item: areq1,
}),
},
}

reportPrecursor, err := r.StateTransition(
t.Context(),
seqNr,
types.AttributedQuery{},
[]types.AttributedObservation{
{Observer: 0, Observation: o1b},
},
rdr,
bf,
)
require.NoError(t, err)

os := &vaultcommon.Outcomes{}
err = proto.Unmarshal(reportPrecursor, os)
require.NoError(t, err)

assert.Empty(t, os.Outcomes)

pq, err := NewReadStore(rdr).GetPendingQueue()
require.NoError(t, err)
assert.Empty(t, pq, 0)

ids := []string{}
for _, item := range pq {
ids = append(ids, item.Id)
}

// 1 oracle submitted duplicates, so skipping
assert.ElementsMatch(t, []string{}, ids)
}

func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *testing.T) {
lggr, observed := logger.TestLoggerObserved(t, zapcore.DebugLevel)
store := requests.NewStore[*vaulttypes.Request]()
Expand Down
Loading