From 8979ef36357087e5ce6d001baf4fa5dde9c4a1c7 Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:17:07 +0200 Subject: [PATCH 1/9] refactor: first draft --- .../ocr2/plugins/triggerqueue/factory.go | 77 ++++++++++++++++++ .../ocr2/plugins/triggerqueue/plugin.go | 68 ++++++++++++++++ core/services/workflows/v2/config.go | 16 +++- core/services/workflows/v2/ocrqueue.go | 79 +++++++++++++++++++ 4 files changed, 239 insertions(+), 1 deletion(-) create mode 100644 core/services/ocr2/plugins/triggerqueue/factory.go create mode 100644 core/services/ocr2/plugins/triggerqueue/plugin.go create mode 100644 core/services/workflows/v2/ocrqueue.go diff --git a/core/services/ocr2/plugins/triggerqueue/factory.go b/core/services/ocr2/plugins/triggerqueue/factory.go new file mode 100644 index 00000000000..a6fb069b763 --- /dev/null +++ b/core/services/ocr2/plugins/triggerqueue/factory.go @@ -0,0 +1,77 @@ +package triggerqueue + +import ( + "context" + "errors" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +const ( + defaultMaxQueryBytes = 100 + defaultMaxObservationBytes = 500 * 1024 + defaultMaxReportsPlusPrecursorBytes = 500 * 1024 + defaultMaxReportBytes = 500 * 1024 + defaultMaxReportCount = 20 + defaultMaxKeyValueModifiedKeysPlusValuesBytes = 1024 * 1024 + defaultMaxKeyValueModifiedKeys = 500 + defaultMaxBlobPayloadBytes = 25 * 1024 + defaultMaxPerOracleUnexpiredBlobCumulativePayloadBytes = 30 * 1024 * 1024 + defaultMaxPerOracleUnexpiredBlobCount = 1000 +) + +// Factory creates OCR 3.1 ReportingPlugins for the trigger queue. Draft: NewReportingPlugin returns a plugin that errors on all calls. +type Factory struct { + lggr logger.Logger + services.StateMachine +} + +// NewFactory creates a new trigger queue plugin factory. +func NewFactory(lggr logger.Logger) (*Factory, error) { + if lggr == nil { + return nil, errors.New("logger is required") + } + return &Factory{ + lggr: lggr.Named("TriggerQueuePluginFactory"), + }, nil +} + +// NewReportingPlugin creates a new OCR 3.1 ReportingPlugin. Draft: returns plugin that errors on all OCR calls. +func (f *Factory) NewReportingPlugin(_ context.Context, config ocr3types.ReportingPluginConfig, fetcher ocr3_1types.BlobBroadcastFetcher) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) { + plugin := NewReportingPlugin(f.lggr) + _, _ = config, fetcher + info := ocr3_1types.ReportingPluginInfo1{ + Name: "TriggerQueuePlugin", + Limits: ocr3_1types.ReportingPluginLimits{ + MaxQueryBytes: defaultMaxQueryBytes, + MaxObservationBytes: defaultMaxObservationBytes, + MaxReportsPlusPrecursorBytes: defaultMaxReportsPlusPrecursorBytes, + MaxReportBytes: defaultMaxReportBytes, + MaxReportCount: defaultMaxReportCount, + MaxKeyValueModifiedKeysPlusValuesBytes: defaultMaxKeyValueModifiedKeysPlusValuesBytes, + MaxKeyValueModifiedKeys: defaultMaxKeyValueModifiedKeys, + MaxBlobPayloadBytes: defaultMaxBlobPayloadBytes, + MaxPerOracleUnexpiredBlobCumulativePayloadBytes: defaultMaxPerOracleUnexpiredBlobCumulativePayloadBytes, + MaxPerOracleUnexpiredBlobCount: defaultMaxPerOracleUnexpiredBlobCount, + }, + } + return plugin, info, nil +} + +func (f *Factory) Start(ctx context.Context) error { + return f.StartOnce("TriggerQueuePluginFactory", func() error { return nil }) +} + +func (f *Factory) Close() error { + return f.StopOnce("TriggerQueuePluginFactory", func() error { return nil }) +} + +func (f *Factory) Name() string { return f.lggr.Name() } + +func (f *Factory) HealthReport() map[string]error { + return map[string]error{f.Name(): f.Healthy()} +} diff --git a/core/services/ocr2/plugins/triggerqueue/plugin.go b/core/services/ocr2/plugins/triggerqueue/plugin.go new file mode 100644 index 00000000000..2264cc2ecc0 --- /dev/null +++ b/core/services/ocr2/plugins/triggerqueue/plugin.go @@ -0,0 +1,68 @@ +package triggerqueue + +import ( + "context" + "errors" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// errNotImplemented is returned by all plugin methods in this draft. +var errNotImplemented = errors.New("triggerqueue plugin: draft implementation, not yet implemented") + +var _ ocr3_1types.ReportingPlugin[[]byte] = (*ReportingPlugin)(nil) + +// ReportingPlugin implements OCR 3.1 ReportingPlugin for the trigger queue. +// Draft: all methods return errors. +type ReportingPlugin struct { + lggr logger.Logger +} + +// NewReportingPlugin creates a new ReportingPlugin. Draft: returns plugin that errors on all calls. +func NewReportingPlugin(lggr logger.Logger) *ReportingPlugin { + return &ReportingPlugin{lggr: lggr.Named("TriggerQueuePlugin")} +} + +func (p *ReportingPlugin) Query(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Query, error) { + return nil, errNotImplemented +} + +func (p *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) { + return nil, errNotImplemented +} + +func (p *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, ao types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error { + return errNotImplemented +} + +func (p *ReportingPlugin) ObservationQuorum(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) (bool, error) { + return false, errNotImplemented +} + +func (p *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { + return ocr3_1types.ReportsPlusPrecursor{}, errNotImplemented +} + +func (p *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlusPrecursor ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[[]byte], error) { + return nil, errNotImplemented +} + +func (p *ReportingPlugin) Committed(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader) error { + return errNotImplemented +} + +func (p *ReportingPlugin) ShouldAcceptAttestedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) { + return false, errNotImplemented +} + +func (p *ReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) { + return false, errNotImplemented +} + +func (p *ReportingPlugin) Close() error { + return nil +} diff --git a/core/services/workflows/v2/config.go b/core/services/workflows/v2/config.go index b2636c42c66..82d95546f5e 100644 --- a/core/services/workflows/v2/config.go +++ b/core/services/workflows/v2/config.go @@ -1,6 +1,7 @@ package v2 import ( + "context" "errors" "fmt" @@ -129,10 +130,23 @@ func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflo if err != nil { return } - l.TriggerEventQueue, err = limits.MakeQueueLimiter[enqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) + stdQueue, err := limits.MakeQueueLimiter[enqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) if err != nil { return } + ocrQueueEnabled, err := cresettings.Default.OCRTriggerEventQueueEnabled.GetOrDefault(context.Background(), lf.Settings) + if err != nil { + return + } + if ocrQueueEnabled { + newOCRQueue := NewOCRQueueWithInnerQueue(stdQueue) + l.TriggerEventQueue, err = newOCRQueue(OCRQueueDeps{Lf: lf, Cfg: &cfg}) + if err != nil { + return + } + } else { + l.TriggerEventQueue = stdQueue + } l.TriggerEventQueueTime, err = lf.MakeTimeLimiter(cfg.TriggerEventQueueTimeout) if err != nil { return diff --git a/core/services/workflows/v2/ocrqueue.go b/core/services/workflows/v2/ocrqueue.go new file mode 100644 index 00000000000..17c7f13fa1d --- /dev/null +++ b/core/services/workflows/v2/ocrqueue.go @@ -0,0 +1,79 @@ +package v2 + +import ( + "context" + "errors" + + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var errOCRQueueNotImplemented = errors.New("OCRQueue: draft, use NewOCRQueueWithInnerQueue for delegating implementation") + +// DONInfo holds DON membership info (members, F, bootstrap peers) for OCR. +// Placeholder for design; concrete type TBD when wiring from handler. +type DONInfo struct{} + +// OCR3_1PluginFactory creates OCR 3.1 ReportingPlugins (Blob + KV). +// Placeholder for design; use triggerqueue.Factory when implemented. +type OCR3_1PluginFactory interface{} + +// OCRQueueDeps holds the planned dependencies for NewOCRQueue from the design. +// See ocr-trigger-queue.md. Draft: DonInfo and PluginFactory may be nil when using +// NewOCRQueueWithInnerQueue. +type OCRQueueDeps struct { + Lf limits.Factory + Cfg *cresettings.Workflows + DonInfo *DONInfo + PluginFactory OCR3_1PluginFactory + Lggr logger.Logger +} + +// OCRQueue wraps a standard QueueLimiter and delegates all operations to it. +// Draft: when OCRTriggerEventQueueEnabled is true, NewLimiters uses NewOCRQueueWithInnerQueue +// to get a constructor that returns this wrapper. Functionally identical to the standard queue. +// Future: will run OCR internally and feed from report callbacks. +type OCRQueue struct { + inner limits.QueueLimiter[enqueuedTriggerEvent] +} + +// NewOCRQueue creates an OCRQueue from the planned dependencies. +// Draft: returns errOCRQueueNotImplemented. Use NewOCRQueueWithInnerQueue to obtain +// a constructor that delegates to an inner queue. +func NewOCRQueue(deps OCRQueueDeps) (limits.QueueLimiter[enqueuedTriggerEvent], error) { + return nil, errOCRQueueNotImplemented +} + +// NewOCRQueueWithInnerQueue returns a constructor with the same signature as NewOCRQueue. +// The returned function ignores deps and returns an OCRQueue that delegates to inner. +// Use this for the draft to match current behavior (delegate to standard queue). +func NewOCRQueueWithInnerQueue(inner limits.QueueLimiter[enqueuedTriggerEvent]) func(OCRQueueDeps) (limits.QueueLimiter[enqueuedTriggerEvent], error) { + return func(_ OCRQueueDeps) (limits.QueueLimiter[enqueuedTriggerEvent], error) { + return &OCRQueue{inner: inner}, nil + } +} + +func (q *OCRQueue) Limit(ctx context.Context) (int, error) { + return q.inner.Limit(ctx) +} + +func (q *OCRQueue) Len(ctx context.Context) (int, error) { + return q.inner.Len(ctx) +} + +func (q *OCRQueue) Put(ctx context.Context, event enqueuedTriggerEvent) error { + return q.inner.Put(ctx, event) +} + +func (q *OCRQueue) Get(ctx context.Context) (enqueuedTriggerEvent, error) { + return q.inner.Get(ctx) +} + +func (q *OCRQueue) Wait(ctx context.Context) (enqueuedTriggerEvent, error) { + return q.inner.Wait(ctx) +} + +func (q *OCRQueue) Close() error { + return q.inner.Close() +} From 08076567e52dfed9e8258279a41a7079bb654725 Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:48:17 +0200 Subject: [PATCH 2/9] refactor: more draft work --- core/services/chainlink/application.go | 1 + core/services/cre/cre.go | 30 +++++++++++++++++- core/services/ocr2/delegate.go | 13 ++++++++ core/services/workflows/v2/config.go | 42 ++++++++++++++++---------- core/services/workflows/v2/engine.go | 9 +++--- core/services/workflows/v2/ocrqueue.go | 30 ++++++++++-------- 6 files changed, 92 insertions(+), 33 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 9ee3b2bc8c9..9bb67159343 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -742,6 +742,7 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err if ocr2Delegate == nil { return nil, errors.New("ocr2.NewDelegate() returned nil") } + creServices.SetOCRTriggerQueueCreator(ocr2Delegate) delegates[job.OffchainReporting2] = ocr2Delegate delegates[job.Bootstrap] = ocrbootstrap.NewDelegateBootstrap( opts.DS, diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index baf87584cc8..ea59c66b742 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -58,8 +58,14 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" wftypes "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" + + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" ) +// ocrTriggerEventQueueEnabled gates use of the OCR-backed trigger queue. +// TODO: replace with cresettings.OCRTriggerEventQueueEnabled when added to chainlink-common. +const ocrTriggerEventQueueEnabled = false + // Keystore is the minimal interface needed from keystore for CRE type Keystore interface { CSA() keystore.CSA @@ -116,6 +122,15 @@ type Services struct { // callback to wire Delegates into CRE services (e.g. Launcher) when ready SetDelegatesDeps func(*standardcapabilities.Delegate) (commonsrv.Service, error) + + // ocrTriggerQueueCreator is set by application.go after creating the OCR delegate. + ocrTriggerQueueCreator v2.TriggerQueueCreator +} + +// SetOCRTriggerQueueCreator sets the creator for the OCR-backed trigger queue. +// Called by application.go after creating the OCR delegate. +func (s *Services) SetOCRTriggerQueueCreator(c v2.TriggerQueueCreator) { + s.ocrTriggerQueueCreator = c } func (s *Services) close() error { @@ -222,6 +237,7 @@ func (s *Services) newSubservices( cfg, relayerChainInterops, opts, + s.ocrTriggerQueueCreator, lggr, ds, opts.DonTimeStore, @@ -830,6 +846,7 @@ func newWorkflowRegistrySyncerV2( relayerChainInterops RelayerChainInterops, billingClient metering.BillingClient, opts Opts, + ocrTriggerQueueCreator v2.TriggerQueueCreator, lggr logger.Logger, ds sqlutil.DataSource, dontimeStore *dontime.Store, @@ -874,7 +891,16 @@ func newWorkflowRegistrySyncerV2( engineRegistry := syncerV2.NewEngineRegistry() - engineLimiters, err := v2.NewLimiters(lf, nil) + var triggerQueue limits.QueueLimiter[v2.EnqueuedTriggerEvent] + if ocrTriggerEventQueueEnabled && ocrTriggerQueueCreator != nil { + cfg := cresettings.Default.PerWorkflow + var tqErr error + triggerQueue, tqErr = ocrTriggerQueueCreator.NewTriggerQueueOCRQueue(context.Background(), lf, &cfg) + if tqErr != nil { + return nil, nil, fmt.Errorf("could not create OCR trigger queue: %w", tqErr) + } + } + engineLimiters, err := v2.NewLimitersWithTriggerQueue(lf, nil, triggerQueue) if err != nil { return nil, nil, fmt.Errorf("could not instantiate engine limiters: %w", err) } @@ -984,6 +1010,7 @@ func newWorkflowRegistrySyncer( cfg Config, relayerChainInterops RelayerChainInterops, opts Opts, + ocrTriggerQueueCreator v2.TriggerQueueCreator, lggr logger.Logger, ds sqlutil.DataSource, dontimeStore *dontime.Store, @@ -1032,6 +1059,7 @@ func newWorkflowRegistrySyncer( relayerChainInterops, billingClient, opts, + ocrTriggerQueueCreator, lggr, ds, dontimeStore, diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 831e257d6cc..ad2bdcede8d 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -40,6 +40,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes" "github.com/smartcontractkit/chainlink/v2/core/config/env" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" syncerV2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/v2" "github.com/smartcontractkit/smdkg/dkgocr/oracleargs" @@ -50,6 +51,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins" "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins/ocr3" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -336,6 +338,17 @@ func (d *Delegate) JobType() job.Type { return job.OffchainReporting2 } +// NewTriggerQueueOCRQueue creates the trigger queue for the workflow syncer. +// Uses the delegate's OCR infra (DB, peer wrapper, keyring, bootstrap peers). +// Draft: calls NewOCRQueue with Inner from NewStandardTriggerQueue; full OCR implementation is TODO. +func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, lf limits.Factory, cfg *cresettings.Workflows) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { + inner, err := v2.NewStandardTriggerQueue(lf, cfg) + if err != nil { + return nil, err + } + return v2.NewOCRQueue(v2.OCRQueueDeps{Lf: lf, Cfg: cfg, Inner: inner}) +} + func (d *Delegate) BeforeJobCreated(_ job.Job) { // This is only called first time the job is created d.isNewlyCreatedJob = true diff --git a/core/services/workflows/v2/config.go b/core/services/workflows/v2/config.go index 82d95546f5e..62e894dd845 100644 --- a/core/services/workflows/v2/config.go +++ b/core/services/workflows/v2/config.go @@ -28,6 +28,18 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" ) +// TriggerQueueCreator creates an OCR-backed trigger queue using the delegate's OCR infra. +// Implemented by the OCR delegate; called by cre.go when OCRTriggerEventQueueEnabled is on. +type TriggerQueueCreator interface { + NewTriggerQueueOCRQueue(ctx context.Context, lf limits.Factory, cfg *cresettings.Workflows) (limits.QueueLimiter[EnqueuedTriggerEvent], error) +} + +// NewStandardTriggerQueue creates the default in-process trigger queue. +// Used by the delegate's NewTriggerQueueOCRQueue as a fallback until the full OCR queue is implemented. +func NewStandardTriggerQueue(lf limits.Factory, cfg *cresettings.Workflows) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { + return limits.MakeQueueLimiter[EnqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) +} + type EngineConfig struct { Lggr logger.Logger Module host.ModuleV2 @@ -76,7 +88,7 @@ type EngineLimiters struct { TriggerSubscriptionTime limits.TimeLimiter TriggerRegistrationsTime limits.TimeLimiter TriggerSubscription limits.BoundLimiter[int] - TriggerEventQueue limits.QueueLimiter[enqueuedTriggerEvent] + TriggerEventQueue limits.QueueLimiter[EnqueuedTriggerEvent] TriggerEventQueueTime limits.TimeLimiter ExecutionConcurrency limits.ResourcePoolLimiter[int] @@ -104,12 +116,19 @@ type EngineLimiters struct { // NewLimiters returns a new set of EngineLimiters based on the default configuration, and optionally modified by cfgFn. func NewLimiters(lf limits.Factory, cfgFn func(*cresettings.Workflows)) (*EngineLimiters, error) { + return NewLimitersWithTriggerQueue(lf, cfgFn, nil) +} + +// NewLimitersWithTriggerQueue is like NewLimiters but accepts an optional pre-built trigger queue. +// When triggerQueue is non-nil, it is used instead of creating a standard queue. Used when +// OCRTriggerEventQueueEnabled is on and the OCR delegate provides an OCR-backed queue. +func NewLimitersWithTriggerQueue(lf limits.Factory, cfgFn func(*cresettings.Workflows), triggerQueue limits.QueueLimiter[EnqueuedTriggerEvent]) (*EngineLimiters, error) { l := &EngineLimiters{} - err := l.init(lf, cfgFn) + err := l.init(lf, cfgFn, triggerQueue) return l, err } -func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflows)) (err error) { +func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflows), triggerQueue limits.QueueLimiter[EnqueuedTriggerEvent]) (err error) { cfg := cresettings.Default.PerWorkflow // make copy if cfgFn != nil { cfgFn(&cfg) @@ -130,22 +149,13 @@ func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflo if err != nil { return } - stdQueue, err := limits.MakeQueueLimiter[enqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) - if err != nil { - return - } - ocrQueueEnabled, err := cresettings.Default.OCRTriggerEventQueueEnabled.GetOrDefault(context.Background(), lf.Settings) - if err != nil { - return - } - if ocrQueueEnabled { - newOCRQueue := NewOCRQueueWithInnerQueue(stdQueue) - l.TriggerEventQueue, err = newOCRQueue(OCRQueueDeps{Lf: lf, Cfg: &cfg}) + if triggerQueue != nil { + l.TriggerEventQueue = triggerQueue + } else { + l.TriggerEventQueue, err = limits.MakeQueueLimiter[EnqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) if err != nil { return } - } else { - l.TriggerEventQueue = stdQueue } l.TriggerEventQueueTime, err = lf.MakeTimeLimiter(cfg.TriggerEventQueueTimeout) if err != nil { diff --git a/core/services/workflows/v2/engine.go b/core/services/workflows/v2/engine.go index 8cc9cfa992f..b763940bf4a 100644 --- a/core/services/workflows/v2/engine.go +++ b/core/services/workflows/v2/engine.go @@ -72,7 +72,7 @@ type Engine struct { // used to separate registration and unregistration phases triggersRegMu sync.Mutex - allTriggerEventsQueueCh limits.QueueLimiter[enqueuedTriggerEvent] + allTriggerEventsQueueCh limits.QueueLimiter[EnqueuedTriggerEvent] executionsSemaphore limits.ResourcePoolLimiter[int] capCallsSemaphore limits.ResourcePoolLimiter[int] @@ -90,7 +90,8 @@ type triggerCapability struct { method string } -type enqueuedTriggerEvent struct { +// EnqueuedTriggerEvent is exported for use by TriggerQueueCreator (OCR delegate). +type EnqueuedTriggerEvent struct { triggerCapID string triggerIndex int timestamp time.Time @@ -537,7 +538,7 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error { e.metrics.With(platform.KeyTriggerID, triggerID).IncrementWorkflowTriggerEventErrorCounter(ctx) continue } - if err := e.allTriggerEventsQueueCh.Put(ctx, enqueuedTriggerEvent{ + if err := e.allTriggerEventsQueueCh.Put(ctx, EnqueuedTriggerEvent{ triggerCapID: triggerID, triggerIndex: idx, timestamp: e.cfg.Clock.Now(), @@ -603,7 +604,7 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) { } // startExecution initiates a new workflow execution, blocking until completed -func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueuedTriggerEvent) { +func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent EnqueuedTriggerEvent) { fullExecutionID, err := events.GenerateExecutionIDWithTriggerIndex(e.cfg.WorkflowID, wrappedTriggerEvent.event.Event.ID, wrappedTriggerEvent.triggerIndex) if err != nil { e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.triggerCapID) diff --git a/core/services/workflows/v2/ocrqueue.go b/core/services/workflows/v2/ocrqueue.go index 17c7f13fa1d..435e0186320 100644 --- a/core/services/workflows/v2/ocrqueue.go +++ b/core/services/workflows/v2/ocrqueue.go @@ -21,35 +21,41 @@ type OCR3_1PluginFactory interface{} // OCRQueueDeps holds the planned dependencies for NewOCRQueue from the design. // See ocr-trigger-queue.md. Draft: DonInfo and PluginFactory may be nil when using -// NewOCRQueueWithInnerQueue. +// Inner as fallback. type OCRQueueDeps struct { Lf limits.Factory Cfg *cresettings.Workflows DonInfo *DONInfo PluginFactory OCR3_1PluginFactory Lggr logger.Logger + + // Inner is an optional fallback queue for the draft. When set, NewOCRQueue returns + // an OCRQueue that delegates to it. When nil, NewOCRQueue returns errOCRQueueNotImplemented. + Inner limits.QueueLimiter[EnqueuedTriggerEvent] } // OCRQueue wraps a standard QueueLimiter and delegates all operations to it. -// Draft: when OCRTriggerEventQueueEnabled is true, NewLimiters uses NewOCRQueueWithInnerQueue -// to get a constructor that returns this wrapper. Functionally identical to the standard queue. +// Draft: created via NewOCRQueue with deps.Inner set; functionally identical to the standard queue. // Future: will run OCR internally and feed from report callbacks. type OCRQueue struct { - inner limits.QueueLimiter[enqueuedTriggerEvent] + inner limits.QueueLimiter[EnqueuedTriggerEvent] } // NewOCRQueue creates an OCRQueue from the planned dependencies. -// Draft: returns errOCRQueueNotImplemented. Use NewOCRQueueWithInnerQueue to obtain -// a constructor that delegates to an inner queue. -func NewOCRQueue(deps OCRQueueDeps) (limits.QueueLimiter[enqueuedTriggerEvent], error) { +// Draft: when deps.Inner is set, returns an OCRQueue delegating to it. Otherwise returns +// errOCRQueueNotImplemented. The delegate calls this with Inner from NewStandardTriggerQueue. +func NewOCRQueue(deps OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { + if deps.Inner != nil { + return &OCRQueue{inner: deps.Inner}, nil + } return nil, errOCRQueueNotImplemented } // NewOCRQueueWithInnerQueue returns a constructor with the same signature as NewOCRQueue. // The returned function ignores deps and returns an OCRQueue that delegates to inner. // Use this for the draft to match current behavior (delegate to standard queue). -func NewOCRQueueWithInnerQueue(inner limits.QueueLimiter[enqueuedTriggerEvent]) func(OCRQueueDeps) (limits.QueueLimiter[enqueuedTriggerEvent], error) { - return func(_ OCRQueueDeps) (limits.QueueLimiter[enqueuedTriggerEvent], error) { +func NewOCRQueueWithInnerQueue(inner limits.QueueLimiter[EnqueuedTriggerEvent]) func(OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { + return func(_ OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { return &OCRQueue{inner: inner}, nil } } @@ -62,15 +68,15 @@ func (q *OCRQueue) Len(ctx context.Context) (int, error) { return q.inner.Len(ctx) } -func (q *OCRQueue) Put(ctx context.Context, event enqueuedTriggerEvent) error { +func (q *OCRQueue) Put(ctx context.Context, event EnqueuedTriggerEvent) error { return q.inner.Put(ctx, event) } -func (q *OCRQueue) Get(ctx context.Context) (enqueuedTriggerEvent, error) { +func (q *OCRQueue) Get(ctx context.Context) (EnqueuedTriggerEvent, error) { return q.inner.Get(ctx) } -func (q *OCRQueue) Wait(ctx context.Context) (enqueuedTriggerEvent, error) { +func (q *OCRQueue) Wait(ctx context.Context) (EnqueuedTriggerEvent, error) { return q.inner.Wait(ctx) } From a5321d2b9d9a96abf53421315cb4a67019dce8f1 Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 18:24:19 +0200 Subject: [PATCH 3/9] refactor: use don notifier --- core/services/cre/cre.go | 7 +++++- core/services/ocr2/delegate.go | 20 +++++++++++------ core/services/workflows/v2/config.go | 13 ++++++++++- core/services/workflows/v2/ocrqueue.go | 31 ++++++++++---------------- 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index ea59c66b742..ac7fcbbb0d8 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -894,8 +894,13 @@ func newWorkflowRegistrySyncerV2( var triggerQueue limits.QueueLimiter[v2.EnqueuedTriggerEvent] if ocrTriggerEventQueueEnabled && ocrTriggerQueueCreator != nil { cfg := cresettings.Default.PerWorkflow + deps := v2.TriggerQueueDeps{ + Lf: lf, + Cfg: &cfg, + DonSubscriber: workflowDonNotifier, + } var tqErr error - triggerQueue, tqErr = ocrTriggerQueueCreator.NewTriggerQueueOCRQueue(context.Background(), lf, &cfg) + triggerQueue, tqErr = ocrTriggerQueueCreator.NewTriggerQueueOCRQueue(context.Background(), deps) if tqErr != nil { return nil, nil, fmt.Errorf("could not create OCR trigger queue: %w", tqErr) } diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index ad2bdcede8d..d7d9f2c1e0b 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -40,8 +40,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes" "github.com/smartcontractkit/chainlink/v2/core/config/env" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" syncerV2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/v2" + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" "github.com/smartcontractkit/smdkg/dkgocr/oracleargs" @@ -51,7 +51,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins" "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins/ocr3" - "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -91,6 +90,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/autotelemetry21" ocr2keeper21core "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" ringconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ring/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/triggerqueue" vaultocrplugin "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/vault" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate" "github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper" @@ -341,12 +341,20 @@ func (d *Delegate) JobType() job.Type { // NewTriggerQueueOCRQueue creates the trigger queue for the workflow syncer. // Uses the delegate's OCR infra (DB, peer wrapper, keyring, bootstrap peers). // Draft: calls NewOCRQueue with Inner from NewStandardTriggerQueue; full OCR implementation is TODO. -func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, lf limits.Factory, cfg *cresettings.Workflows) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { - inner, err := v2.NewStandardTriggerQueue(lf, cfg) +// DonSubscriber and CapRegistry from deps enable DON sync when the full OCR queue is implemented. +func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { + inner, err := v2.NewStandardTriggerQueue(deps.Lf, deps.Cfg) if err != nil { return nil, err } - return v2.NewOCRQueue(v2.OCRQueueDeps{Lf: lf, Cfg: cfg, Inner: inner}) + return v2.NewOCRQueue(v2.OCRQueueDeps{ + Lf: deps.Lf, + Cfg: deps.Cfg, + PluginFactory: triggerqueue.NewReportingPlugin(d.lggr), + Lggr: d.lggr, + DonSubscriber: deps.DonSubscriber, + Inner: inner, + }) } func (d *Delegate) BeforeJobCreated(_ job.Job) { @@ -2458,7 +2466,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay) } dstRid, err := spec.RelayID() - if err != nil { return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)} } @@ -2519,7 +2526,6 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay) } dstRid, err := spec.RelayID() - if err != nil { return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)} } diff --git a/core/services/workflows/v2/config.go b/core/services/workflows/v2/config.go index 62e894dd845..f9af8ed9ef2 100644 --- a/core/services/workflows/v2/config.go +++ b/core/services/workflows/v2/config.go @@ -28,10 +28,21 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" ) +// TriggerQueueDeps holds dependencies for creating the OCR-backed trigger queue. +// DonSubscriber enables DON sync: the queue subscribes to DON updates and reconfigures +// the OCR oracle when members, F, or bootstrap peers change. +type TriggerQueueDeps struct { + Lf limits.Factory + Cfg *cresettings.Workflows + DonSubscriber capabilities.DonSubscriber +} + // TriggerQueueCreator creates an OCR-backed trigger queue using the delegate's OCR infra. // Implemented by the OCR delegate; called by cre.go when OCRTriggerEventQueueEnabled is on. +// DonSubscriber in deps enables DON sync: the queue stays current with dynamic DON info +// (members, F, bootstrap peers) delivered on the subscription channel. type TriggerQueueCreator interface { - NewTriggerQueueOCRQueue(ctx context.Context, lf limits.Factory, cfg *cresettings.Workflows) (limits.QueueLimiter[EnqueuedTriggerEvent], error) + NewTriggerQueueOCRQueue(ctx context.Context, deps TriggerQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) } // NewStandardTriggerQueue creates the default in-process trigger queue. diff --git a/core/services/workflows/v2/ocrqueue.go b/core/services/workflows/v2/ocrqueue.go index 435e0186320..02983b3c4db 100644 --- a/core/services/workflows/v2/ocrqueue.go +++ b/core/services/workflows/v2/ocrqueue.go @@ -6,54 +6,47 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/logger" ) var errOCRQueueNotImplemented = errors.New("OCRQueue: draft, use NewOCRQueueWithInnerQueue for delegating implementation") -// DONInfo holds DON membership info (members, F, bootstrap peers) for OCR. -// Placeholder for design; concrete type TBD when wiring from handler. -type DONInfo struct{} - // OCR3_1PluginFactory creates OCR 3.1 ReportingPlugins (Blob + KV). // Placeholder for design; use triggerqueue.Factory when implemented. -type OCR3_1PluginFactory interface{} +type OCR3_1PluginFactory any // OCRQueueDeps holds the planned dependencies for NewOCRQueue from the design. -// See ocr-trigger-queue.md. Draft: DonInfo and PluginFactory may be nil when using -// Inner as fallback. type OCRQueueDeps struct { Lf limits.Factory Cfg *cresettings.Workflows - DonInfo *DONInfo PluginFactory OCR3_1PluginFactory Lggr logger.Logger - // Inner is an optional fallback queue for the draft. When set, NewOCRQueue returns - // an OCRQueue that delegates to it. When nil, NewOCRQueue returns errOCRQueueNotImplemented. + // Subscribe to receive capabilities.DON (members, F, etc.) + // on each update; reconfigure OCR oracle accordingly. + // + // TODO: implement DON sync + DonSubscriber capabilities.DonSubscriber + + // Inner is the fallback implementation for draft/mock mode to get things running Inner limits.QueueLimiter[EnqueuedTriggerEvent] } // OCRQueue wraps a standard QueueLimiter and delegates all operations to it. -// Draft: created via NewOCRQueue with deps.Inner set; functionally identical to the standard queue. -// Future: will run OCR internally and feed from report callbacks. type OCRQueue struct { inner limits.QueueLimiter[EnqueuedTriggerEvent] } // NewOCRQueue creates an OCRQueue from the planned dependencies. -// Draft: when deps.Inner is set, returns an OCRQueue delegating to it. Otherwise returns -// errOCRQueueNotImplemented. The delegate calls this with Inner from NewStandardTriggerQueue. func NewOCRQueue(deps OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { - if deps.Inner != nil { - return &OCRQueue{inner: deps.Inner}, nil + if deps.Inner == nil { + return nil, errOCRQueueNotImplemented } - return nil, errOCRQueueNotImplemented + return &OCRQueue{inner: deps.Inner}, nil } // NewOCRQueueWithInnerQueue returns a constructor with the same signature as NewOCRQueue. -// The returned function ignores deps and returns an OCRQueue that delegates to inner. -// Use this for the draft to match current behavior (delegate to standard queue). func NewOCRQueueWithInnerQueue(inner limits.QueueLimiter[EnqueuedTriggerEvent]) func(OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { return func(_ OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { return &OCRQueue{inner: inner}, nil From 8b41b82b98325ca2d36dc005aa3b6ed3206cbf26 Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 18:54:08 +0200 Subject: [PATCH 4/9] refactor: adds todo wiring --- core/services/ocr2/delegate.go | 31 ++++++++- .../ocr2/plugins/triggerqueue/factory.go | 67 +++++-------------- core/services/workflows/v2/config.go | 8 ++- 3 files changed, 50 insertions(+), 56 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index d7d9f2c1e0b..81cba3e8299 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -338,15 +338,40 @@ func (d *Delegate) JobType() job.Type { return job.OffchainReporting2 } +// triggerQueueContractID is a synthetic ID for monitoring; trigger queue has no on-chain contract. +const triggerQueueContractID = "trigger_queue" + // NewTriggerQueueOCRQueue creates the trigger queue for the workflow syncer. -// Uses the delegate's OCR infra (DB, peer wrapper, keyring, bootstrap peers). -// Draft: calls NewOCRQueue with Inner from NewStandardTriggerQueue; full OCR implementation is TODO. -// DonSubscriber and CapRegistry from deps enable DON sync when the full OCR queue is implemented. +// Uses the delegate's OCR infra. Draft: builds OCR3_1OracleArgs with TODOs for unwired fields; +// returns queue delegating to standard in-process queue until full OCR is implemented. func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { inner, err := v2.NewStandardTriggerQueue(deps.Lf, deps.Cfg) if err != nil { return nil, err } + + // Build OCR3_1OracleArgs for the trigger queue. TODO: wire all fields and call libocr2.NewOracle. + ocrLogger := ocrcommon.NewOCRWrapper(d.lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + d.lggr.Warnw("OCR trigger queue", "msg", msg) + }) + oracleArgs := libocr2.OCR3_1OracleArgs[[]byte]{ + BinaryNetworkEndpointFactory: d.peerWrapper.Peer3_1, + V2Bootstrappers: nil, // TODO: wire bootstrap peers from workflow DON (deps.DonSubscriber.WaitForDon or config) + ContractConfigTracker: nil, // TODO: in-process config; need static/dynamic config from DON (no on-chain contract) + ContractTransmitter: nil, // TODO: trigger queue is off-chain only; wire if libocr2 requires non-nil + Database: nil, // TODO: wire OCR DB (e.g. NewDB(d.ds, triggerQueuePluginID, 0, d.lggr)); need unique plugin ID + KeyValueDatabaseFactory: nil, // TODO: wire KV factory (e.g. kvdb.NewPebbleKeyValueDatabaseFactory(path)); path = KeyValueStoreRootDir/trigger_queue + LocalConfig: ocrtypes.LocalConfig{}, // TODO: wire LocalConfig (BlockDelta, etc.) + Logger: ocrLogger, + MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": triggerQueueContractID}, prometheus.DefaultRegisterer), + MonitoringEndpoint: nil, // TODO: wire d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, triggerQueueContractID, ...); need rid for trigger queue + OffchainConfigDigester: nil, // TODO: in-process digester; derive config digest from DON (members, F, offchain config) + OffchainKeyring: nil, // TODO: wire OCR key bundle (d.ks.Get or workflow DON key) + OnchainKeyring: nil, // TODO: wire onchain keyring adapter (trigger queue may use same as vault/dontime for in-process) + ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr), d.lggr, "triggerqueue"), + } + _ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired; add oracle to OCRQueue lifecycle + return v2.NewOCRQueue(v2.OCRQueueDeps{ Lf: deps.Lf, Cfg: deps.Cfg, diff --git a/core/services/ocr2/plugins/triggerqueue/factory.go b/core/services/ocr2/plugins/triggerqueue/factory.go index a6fb069b763..99f73d1e892 100644 --- a/core/services/ocr2/plugins/triggerqueue/factory.go +++ b/core/services/ocr2/plugins/triggerqueue/factory.go @@ -2,76 +2,41 @@ package triggerqueue import ( "context" - "errors" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" ) -const ( - defaultMaxQueryBytes = 100 - defaultMaxObservationBytes = 500 * 1024 - defaultMaxReportsPlusPrecursorBytes = 500 * 1024 - defaultMaxReportBytes = 500 * 1024 - defaultMaxReportCount = 20 - defaultMaxKeyValueModifiedKeysPlusValuesBytes = 1024 * 1024 - defaultMaxKeyValueModifiedKeys = 500 - defaultMaxBlobPayloadBytes = 25 * 1024 - defaultMaxPerOracleUnexpiredBlobCumulativePayloadBytes = 30 * 1024 * 1024 - defaultMaxPerOracleUnexpiredBlobCount = 1000 -) +var _ ocr3_1types.ReportingPluginFactory[[]byte] = (*Factory)(nil) -// Factory creates OCR 3.1 ReportingPlugins for the trigger queue. Draft: NewReportingPlugin returns a plugin that errors on all calls. +// Factory creates ReportingPlugin instances for the trigger queue. type Factory struct { lggr logger.Logger - services.StateMachine } // NewFactory creates a new trigger queue plugin factory. -func NewFactory(lggr logger.Logger) (*Factory, error) { - if lggr == nil { - return nil, errors.New("logger is required") - } - return &Factory{ - lggr: lggr.Named("TriggerQueuePluginFactory"), - }, nil +func NewFactory(lggr logger.Logger) *Factory { + return &Factory{lggr: lggr.Named("TriggerQueueFactory")} } -// NewReportingPlugin creates a new OCR 3.1 ReportingPlugin. Draft: returns plugin that errors on all OCR calls. -func (f *Factory) NewReportingPlugin(_ context.Context, config ocr3types.ReportingPluginConfig, fetcher ocr3_1types.BlobBroadcastFetcher) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) { +func (f *Factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig, fetcher ocr3_1types.BlobBroadcastFetcher) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) { plugin := NewReportingPlugin(f.lggr) - _, _ = config, fetcher info := ocr3_1types.ReportingPluginInfo1{ - Name: "TriggerQueuePlugin", + Name: "triggerqueue", Limits: ocr3_1types.ReportingPluginLimits{ - MaxQueryBytes: defaultMaxQueryBytes, - MaxObservationBytes: defaultMaxObservationBytes, - MaxReportsPlusPrecursorBytes: defaultMaxReportsPlusPrecursorBytes, - MaxReportBytes: defaultMaxReportBytes, - MaxReportCount: defaultMaxReportCount, - MaxKeyValueModifiedKeysPlusValuesBytes: defaultMaxKeyValueModifiedKeysPlusValuesBytes, - MaxKeyValueModifiedKeys: defaultMaxKeyValueModifiedKeys, - MaxBlobPayloadBytes: defaultMaxBlobPayloadBytes, - MaxPerOracleUnexpiredBlobCumulativePayloadBytes: defaultMaxPerOracleUnexpiredBlobCumulativePayloadBytes, - MaxPerOracleUnexpiredBlobCount: defaultMaxPerOracleUnexpiredBlobCount, + MaxQueryBytes: 100, + MaxObservationBytes: 500 * 1024, // 500KB per design doc + MaxReportsPlusPrecursorBytes: 500 * 1024, + MaxReportBytes: 500 * 1024, + MaxReportCount: 1, + MaxKeyValueModifiedKeys: 500, + MaxKeyValueModifiedKeysPlusValuesBytes: 1024 * 1024, // 1MB + MaxBlobPayloadBytes: 25 * 1024, // 25KB per design doc + MaxPerOracleUnexpiredBlobCumulativePayloadBytes: 30 * 1024 * 1024, + MaxPerOracleUnexpiredBlobCount: 1000, }, } return plugin, info, nil } - -func (f *Factory) Start(ctx context.Context) error { - return f.StartOnce("TriggerQueuePluginFactory", func() error { return nil }) -} - -func (f *Factory) Close() error { - return f.StopOnce("TriggerQueuePluginFactory", func() error { return nil }) -} - -func (f *Factory) Name() string { return f.lggr.Name() } - -func (f *Factory) HealthReport() map[string]error { - return map[string]error{f.Name(): f.Healthy()} -} diff --git a/core/services/workflows/v2/config.go b/core/services/workflows/v2/config.go index f9af8ed9ef2..5aec2bf8759 100644 --- a/core/services/workflows/v2/config.go +++ b/core/services/workflows/v2/config.go @@ -29,12 +29,16 @@ import ( ) // TriggerQueueDeps holds dependencies for creating the OCR-backed trigger queue. -// DonSubscriber enables DON sync: the queue subscribes to DON updates and reconfigures -// the OCR oracle when members, F, or bootstrap peers change. +// Extend as needed when wiring the full OCR 3.1 oracle (bootstrap peers, monitoring, etc.). type TriggerQueueDeps struct { Lf limits.Factory Cfg *cresettings.Workflows DonSubscriber capabilities.DonSubscriber + + // TODO: add fields as wiring progresses: + // - BootstrapPeers []commontypes.BootstrapperLocator (from DON or config) + // - MonitoringEndpointGen (delegate has it; may need synthetic contract ID) + // - OCR key bundle (workflow DON uses OCR keys) } // TriggerQueueCreator creates an OCR-backed trigger queue using the delegate's OCR infra. From c012510e10f0bb438d033c1003a77869e3c7ed7a Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 19:05:59 +0200 Subject: [PATCH 5/9] refactor: trigger event as interface --- core/services/workflows/v2/engine.go | 55 +++++++++++++++++----------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/core/services/workflows/v2/engine.go b/core/services/workflows/v2/engine.go index b763940bf4a..cbf815a108d 100644 --- a/core/services/workflows/v2/engine.go +++ b/core/services/workflows/v2/engine.go @@ -90,14 +90,32 @@ type triggerCapability struct { method string } -// EnqueuedTriggerEvent is exported for use by TriggerQueueCreator (OCR delegate). -type EnqueuedTriggerEvent struct { +// EnqueuedTriggerEvent is the type queued for workflow trigger execution. +// Implementations are opaque; consumers use the accessors. +type EnqueuedTriggerEvent interface { + TriggerCapID() string + TriggerIndex() int + Timestamp() time.Time + Event() capabilities.TriggerResponse +} + +type enqueuedTriggerEvent struct { triggerCapID string triggerIndex int timestamp time.Time event capabilities.TriggerResponse } +func (e *enqueuedTriggerEvent) TriggerCapID() string { return e.triggerCapID } +func (e *enqueuedTriggerEvent) TriggerIndex() int { return e.triggerIndex } +func (e *enqueuedTriggerEvent) Timestamp() time.Time { return e.timestamp } +func (e *enqueuedTriggerEvent) Event() capabilities.TriggerResponse { return e.event } + +// NewEnqueuedTriggerEvent constructs an EnqueuedTriggerEvent for the queue. +func NewEnqueuedTriggerEvent(triggerCapID string, triggerIndex int, timestamp time.Time, event capabilities.TriggerResponse) EnqueuedTriggerEvent { + return &enqueuedTriggerEvent{triggerCapID, triggerIndex, timestamp, event} +} + func TriggerRegistrationID(workflowID string, triggerIndex int) string { return fmt.Sprintf("trigger_reg_%s_%d", workflowID, triggerIndex) } @@ -538,12 +556,7 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error { e.metrics.With(platform.KeyTriggerID, triggerID).IncrementWorkflowTriggerEventErrorCounter(ctx) continue } - if err := e.allTriggerEventsQueueCh.Put(ctx, EnqueuedTriggerEvent{ - triggerCapID: triggerID, - triggerIndex: idx, - timestamp: e.cfg.Clock.Now(), - event: event, - }); err != nil { + if err := e.allTriggerEventsQueueCh.Put(ctx, NewEnqueuedTriggerEvent(triggerID, idx, e.cfg.Clock.Now(), event)); err != nil { var errFull limits.ErrorQueueFull if errors.As(err, &errFull) { // queue full, drop the event @@ -571,8 +584,8 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) { if err != nil { return } - eventAge := queueHead.timestamp.Sub(e.cfg.Clock.Now()) - eventID := queueHead.event.Event.ID + eventAge := queueHead.Timestamp().Sub(e.cfg.Clock.Now()) + eventID := queueHead.Event().Event.ID e.logger().Debugw("Popped a trigger event from the queue", "eventID", eventID, "eventAgeMs", eventAge.Milliseconds()) triggerEventMaxAge, err := e.cfg.LocalLimiters.TriggerEventQueueTime.Limit(ctx) if err != nil { @@ -580,7 +593,7 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) { continue } if eventAge > triggerEventMaxAge { - e.logger().Warnw("Trigger event is too old, skipping execution", "triggerID", queueHead.triggerCapID, "eventID", eventID, "eventAgeMs", eventAge.Milliseconds()) + e.logger().Warnw("Trigger event is too old, skipping execution", "triggerID", queueHead.TriggerCapID(), "eventID", eventID, "eventAgeMs", eventAge.Milliseconds()) continue } free, err := e.executionsSemaphore.Wait(ctx, 1) // block if too many concurrent workflow executions @@ -605,9 +618,9 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) { // startExecution initiates a new workflow execution, blocking until completed func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent EnqueuedTriggerEvent) { - fullExecutionID, err := events.GenerateExecutionIDWithTriggerIndex(e.cfg.WorkflowID, wrappedTriggerEvent.event.Event.ID, wrappedTriggerEvent.triggerIndex) + fullExecutionID, err := events.GenerateExecutionIDWithTriggerIndex(e.cfg.WorkflowID, wrappedTriggerEvent.Event().Event.ID, wrappedTriggerEvent.TriggerIndex()) if err != nil { - e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.triggerCapID) + e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.TriggerCapID()) return } @@ -645,7 +658,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent Enqueue } } - triggerEvent := wrappedTriggerEvent.event.Event + triggerEvent := wrappedTriggerEvent.Event().Event var executionID string if e.cfg.FeatureFlags.FeatureMultiTriggerExecutionIDs.Check(ctx, config.Timestamp(executionTimestamp)) == nil { @@ -654,7 +667,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent Enqueue } else { executionID, err = events.GenerateExecutionID(e.cfg.WorkflowID, triggerEvent.ID) if err != nil { - e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.triggerCapID) + e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.TriggerCapID()) return } e.metrics.IncrementExecutionIDLegacyCounter(ctx) @@ -664,9 +677,9 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent Enqueue _, addErr := e.cfg.ExecutionsStore.Add(ctx, nil, executionID, e.cfg.WorkflowID, store.StatusStarted) if addErr != nil { if errors.Is(addErr, store.ErrDuplicateExecution) { - lggr.Infow("Skipping duplicate execution", "executionID", executionID, "triggerID", wrappedTriggerEvent.triggerCapID, "triggerIndex", wrappedTriggerEvent.triggerIndex) - e.metrics.With(platform.KeyTriggerID, wrappedTriggerEvent.triggerCapID).IncrementWorkflowTriggerEventErrorCounter(ctx) - registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.triggerIndex) + lggr.Infow("Skipping duplicate execution", "executionID", executionID, "triggerID", wrappedTriggerEvent.TriggerCapID(), "triggerIndex", wrappedTriggerEvent.TriggerIndex()) + e.metrics.With(platform.KeyTriggerID, wrappedTriggerEvent.TriggerCapID()).IncrementWorkflowTriggerEventErrorCounter(ctx) + registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.TriggerIndex()) err = e.ackTriggerEvent(ctx, registrationID, &triggerEvent) if err != nil { e.lggr.Errorw("failed to re-ACK trigger event", "eventID", triggerEvent.ID, "err", err) @@ -713,7 +726,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent Enqueue return } defer execCancel() - executionLogger := logger.With(lggr, "executionID", executionID, "triggerID", wrappedTriggerEvent.triggerCapID, "triggerIndex", wrappedTriggerEvent.triggerIndex) + executionLogger := logger.With(lggr, "executionID", executionID, "triggerID", wrappedTriggerEvent.TriggerCapID(), "triggerIndex", wrappedTriggerEvent.TriggerIndex()) maxUserLogEventsPerExecution, err := e.cfg.LocalLimiters.LogEvent.Limit(ctx) if err != nil { @@ -726,7 +739,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent Enqueue e.emitUserLogs(execCtx, userLogChan, executionID, loggerLabels) }) - tid, err := safe.IntToUint64(wrappedTriggerEvent.triggerIndex) + tid, err := safe.IntToUint64(wrappedTriggerEvent.TriggerIndex()) if err != nil { executionLogger.Errorw("Failed to convert trigger index to uint64", "err", err) return @@ -736,7 +749,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent Enqueue executionLogger.Infow("Workflow execution starting ...") _ = events.EmitExecutionStartedEvent(ctx, loggerLabels, triggerEvent.ID, executionID) - registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.triggerIndex) + registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.TriggerIndex()) err = e.ackTriggerEvent(ctx, registrationID, &triggerEvent) if err != nil { e.lggr.Errorf("failed to ACK trigger event (eventID=%s): %v", triggerEvent.ID, err) From d76e05d6454db963b1cb67d03de0ba2861f96a4f Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 19:16:40 +0200 Subject: [PATCH 6/9] refactor: show how transmitter works with queue --- core/services/ocr2/delegate.go | 2 +- .../ocr2/plugins/triggerqueue/transmitter.go | 72 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 core/services/ocr2/plugins/triggerqueue/transmitter.go diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 81cba3e8299..8e1120f0fe1 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -358,7 +358,7 @@ func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQ BinaryNetworkEndpointFactory: d.peerWrapper.Peer3_1, V2Bootstrappers: nil, // TODO: wire bootstrap peers from workflow DON (deps.DonSubscriber.WaitForDon or config) ContractConfigTracker: nil, // TODO: in-process config; need static/dynamic config from DON (no on-chain contract) - ContractTransmitter: nil, // TODO: trigger queue is off-chain only; wire if libocr2 requires non-nil + ContractTransmitter: triggerqueue.NewTransmitter(inner, d.lggr), Database: nil, // TODO: wire OCR DB (e.g. NewDB(d.ds, triggerQueuePluginID, 0, d.lggr)); need unique plugin ID KeyValueDatabaseFactory: nil, // TODO: wire KV factory (e.g. kvdb.NewPebbleKeyValueDatabaseFactory(path)); path = KeyValueStoreRootDir/trigger_queue LocalConfig: ocrtypes.LocalConfig{}, // TODO: wire LocalConfig (BlockDelta, etc.) diff --git a/core/services/ocr2/plugins/triggerqueue/transmitter.go b/core/services/ocr2/plugins/triggerqueue/transmitter.go new file mode 100644 index 00000000000..45b2a3014ef --- /dev/null +++ b/core/services/ocr2/plugins/triggerqueue/transmitter.go @@ -0,0 +1,72 @@ +package triggerqueue + +import ( + "context" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink/v2/core/logger" + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" +) + +var _ ocr3types.ContractTransmitter[[]byte] = (*Transmitter)(nil) + +// Transmitter receives OCR consensus reports and enqueues decoded trigger events. +// No on-chain transmit; reports are consumed locally to feed the internal queue. +type Transmitter struct { + queue limits.QueueLimiter[v2.EnqueuedTriggerEvent] + lggr logger.Logger +} + +// NewTransmitter creates a transmitter that decodes reports and Puts into the queue. +func NewTransmitter(queue limits.QueueLimiter[v2.EnqueuedTriggerEvent], lggr logger.Logger) *Transmitter { + return &Transmitter{queue: queue, lggr: lggr.Named("TriggerQueueTransmitter")} +} + +// decodedTriggerEvent is the result of decoding a report; maps to EnqueuedTriggerEvent fields. +type decodedTriggerEvent struct { + triggerCapID string + triggerIndex int + timestamp time.Time + event capabilities.TriggerResponse +} + +// decodeReport extracts the consensus slice of events from the OCR report. +// Overridable in tests to mock the decode. +var decodeReport = func(rwi ocr3types.ReportWithInfo[[]byte]) ([]decodedTriggerEvent, error) { + // TODO: implement real decode. Report format per design: ordered event IDs in rwi.Report; + // fetch payloads from KV (keyValueReader from plugin context; transmitter may need KV ref). + // Stub: no real decode. Real impl would: + // - Parse rwi.Report (proto: ordered event IDs) + // - For each event ID, fetch payload from KV via "Event::"+eventID + // - Unmarshal to TriggerResponse, build decodedTriggerEvent + _ = rwi + return nil, nil +} + +// Transmit is called by libOCR when consensus is reached. Decodes the report and +// enqueues each event into the internal queue. Engine Wait() will return these. +func (t *Transmitter) Transmit(ctx context.Context, cd types.ConfigDigest, seqNr uint64, rwi ocr3types.ReportWithInfo[[]byte], sigs []types.AttributedOnchainSignature) error { + events, err := decodeReport(rwi) + if err != nil { + t.lggr.Errorw("Failed to decode trigger queue report", "err", err, "seqNr", seqNr) + return err + } + for _, ev := range events { + enqueued := v2.NewEnqueuedTriggerEvent(ev.triggerCapID, ev.triggerIndex, ev.timestamp, ev.event) + if err := t.queue.Put(ctx, enqueued); err != nil { + t.lggr.Errorw("Failed to enqueue consensus event", "triggerCapID", ev.triggerCapID, "err", err) + return err + } + t.lggr.Debugw("Enqueued consensus event", "triggerCapID", ev.triggerCapID, "triggerIndex", ev.triggerIndex) + } + return nil +} + +func (t *Transmitter) FromAccount(_ context.Context) (types.Account, error) { + return types.Account(""), nil +} From cff5e98b83ccf671bd558cd1b56baea4c048d7fa Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 19:22:28 +0200 Subject: [PATCH 7/9] refactor: clean up delegate vs queue --- core/services/ocr2/delegate.go | 15 ++++----------- core/services/workflows/v2/ocrqueue.go | 25 ++++--------------------- 2 files changed, 8 insertions(+), 32 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 8e1120f0fe1..7c7d8fbbd89 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -342,8 +342,8 @@ func (d *Delegate) JobType() job.Type { const triggerQueueContractID = "trigger_queue" // NewTriggerQueueOCRQueue creates the trigger queue for the workflow syncer. -// Uses the delegate's OCR infra. Draft: builds OCR3_1OracleArgs with TODOs for unwired fields; -// returns queue delegating to standard in-process queue until full OCR is implemented. +// Delegate owns the oracle; builds OCR3_1OracleArgs with TODOs for unwired fields. +// Returns OCRQueue wrapping Inner; transmitter feeds consensus events into Inner. func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { inner, err := v2.NewStandardTriggerQueue(deps.Lf, deps.Cfg) if err != nil { @@ -370,16 +370,9 @@ func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQ OnchainKeyring: nil, // TODO: wire onchain keyring adapter (trigger queue may use same as vault/dontime for in-process) ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr), d.lggr, "triggerqueue"), } - _ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired; add oracle to OCRQueue lifecycle + _ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired - return v2.NewOCRQueue(v2.OCRQueueDeps{ - Lf: deps.Lf, - Cfg: deps.Cfg, - PluginFactory: triggerqueue.NewReportingPlugin(d.lggr), - Lggr: d.lggr, - DonSubscriber: deps.DonSubscriber, - Inner: inner, - }) + return v2.NewOCRQueue(v2.OCRQueueDeps{Inner: inner}) } func (d *Delegate) BeforeJobCreated(_ job.Job) { diff --git a/core/services/workflows/v2/ocrqueue.go b/core/services/workflows/v2/ocrqueue.go index 02983b3c4db..0893cf8ecc7 100644 --- a/core/services/workflows/v2/ocrqueue.go +++ b/core/services/workflows/v2/ocrqueue.go @@ -4,36 +4,19 @@ import ( "context" "errors" - "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" - "github.com/smartcontractkit/chainlink/v2/core/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/logger" ) var errOCRQueueNotImplemented = errors.New("OCRQueue: draft, use NewOCRQueueWithInnerQueue for delegating implementation") -// OCR3_1PluginFactory creates OCR 3.1 ReportingPlugins (Blob + KV). -// Placeholder for design; use triggerqueue.Factory when implemented. -type OCR3_1PluginFactory any - -// OCRQueueDeps holds the planned dependencies for NewOCRQueue from the design. +// OCRQueueDeps holds dependencies for NewOCRQueue. +// Delegate owns the oracle; OCRQueue wraps the queue the transmitter feeds. type OCRQueueDeps struct { - Lf limits.Factory - Cfg *cresettings.Workflows - PluginFactory OCR3_1PluginFactory - Lggr logger.Logger - - // Subscribe to receive capabilities.DON (members, F, etc.) - // on each update; reconfigure OCR oracle accordingly. - // - // TODO: implement DON sync - DonSubscriber capabilities.DonSubscriber - - // Inner is the fallback implementation for draft/mock mode to get things running Inner limits.QueueLimiter[EnqueuedTriggerEvent] } -// OCRQueue wraps a standard QueueLimiter and delegates all operations to it. +// OCRQueue wraps a QueueLimiter and delegates all operations to it. +// Delegate owns the oracle; transmitter decodes reports and Puts into Inner. type OCRQueue struct { inner limits.QueueLimiter[EnqueuedTriggerEvent] } From 0ce59c5b22be12ee63014f4d44e013a3bc205a44 Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Mar 2026 19:30:48 +0200 Subject: [PATCH 8/9] refactor: wires in an observation buffer --- core/services/ocr2/delegate.go | 5 ++-- .../ocr2/plugins/triggerqueue/factory.go | 10 ++++--- .../ocr2/plugins/triggerqueue/plugin.go | 24 +++++++++++---- .../workflows/v2/observation_buffer.go | 29 +++++++++++++++++++ core/services/workflows/v2/ocrqueue.go | 22 +++++++++----- 5 files changed, 70 insertions(+), 20 deletions(-) create mode 100644 core/services/workflows/v2/observation_buffer.go diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 7c7d8fbbd89..52cfd21e230 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -349,6 +349,7 @@ func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQ if err != nil { return nil, err } + buffer := &v2.ObservationBuffer{} // Build OCR3_1OracleArgs for the trigger queue. TODO: wire all fields and call libocr2.NewOracle. ocrLogger := ocrcommon.NewOCRWrapper(d.lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { @@ -368,11 +369,11 @@ func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQ OffchainConfigDigester: nil, // TODO: in-process digester; derive config digest from DON (members, F, offchain config) OffchainKeyring: nil, // TODO: wire OCR key bundle (d.ks.Get or workflow DON key) OnchainKeyring: nil, // TODO: wire onchain keyring adapter (trigger queue may use same as vault/dontime for in-process) - ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr), d.lggr, "triggerqueue"), + ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr, buffer), d.lggr, "triggerqueue"), } _ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired - return v2.NewOCRQueue(v2.OCRQueueDeps{Inner: inner}) + return v2.NewOCRQueue(v2.OCRQueueDeps{Inner: inner, Buffer: buffer}) } func (d *Delegate) BeforeJobCreated(_ job.Job) { diff --git a/core/services/ocr2/plugins/triggerqueue/factory.go b/core/services/ocr2/plugins/triggerqueue/factory.go index 99f73d1e892..c7133bcf233 100644 --- a/core/services/ocr2/plugins/triggerqueue/factory.go +++ b/core/services/ocr2/plugins/triggerqueue/factory.go @@ -6,6 +6,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -13,16 +14,17 @@ var _ ocr3_1types.ReportingPluginFactory[[]byte] = (*Factory)(nil) // Factory creates ReportingPlugin instances for the trigger queue. type Factory struct { - lggr logger.Logger + lggr logger.Logger + buffer *v2.ObservationBuffer } // NewFactory creates a new trigger queue plugin factory. -func NewFactory(lggr logger.Logger) *Factory { - return &Factory{lggr: lggr.Named("TriggerQueueFactory")} +func NewFactory(lggr logger.Logger, buffer *v2.ObservationBuffer) *Factory { + return &Factory{lggr: lggr.Named("TriggerQueueFactory"), buffer: buffer} } func (f *Factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig, fetcher ocr3_1types.BlobBroadcastFetcher) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) { - plugin := NewReportingPlugin(f.lggr) + plugin := NewReportingPlugin(f.lggr, f.buffer) info := ocr3_1types.ReportingPluginInfo1{ Name: "triggerqueue", Limits: ocr3_1types.ReportingPluginLimits{ diff --git a/core/services/ocr2/plugins/triggerqueue/plugin.go b/core/services/ocr2/plugins/triggerqueue/plugin.go index 2264cc2ecc0..ed89e362738 100644 --- a/core/services/ocr2/plugins/triggerqueue/plugin.go +++ b/core/services/ocr2/plugins/triggerqueue/plugin.go @@ -2,12 +2,14 @@ package triggerqueue import ( "context" + "encoding/json" "errors" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -17,22 +19,32 @@ var errNotImplemented = errors.New("triggerqueue plugin: draft implementation, n var _ ocr3_1types.ReportingPlugin[[]byte] = (*ReportingPlugin)(nil) // ReportingPlugin implements OCR 3.1 ReportingPlugin for the trigger queue. -// Draft: all methods return errors. type ReportingPlugin struct { - lggr logger.Logger + lggr logger.Logger + buffer *v2.ObservationBuffer } -// NewReportingPlugin creates a new ReportingPlugin. Draft: returns plugin that errors on all calls. -func NewReportingPlugin(lggr logger.Logger) *ReportingPlugin { - return &ReportingPlugin{lggr: lggr.Named("TriggerQueuePlugin")} +// NewReportingPlugin creates a new ReportingPlugin. +func NewReportingPlugin(lggr logger.Logger, buffer *v2.ObservationBuffer) *ReportingPlugin { + return &ReportingPlugin{lggr: lggr.Named("TriggerQueuePlugin"), buffer: buffer} } func (p *ReportingPlugin) Query(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Query, error) { return nil, errNotImplemented } +// Observation reads from the buffer (filled by OCRQueue.Put) and produces an observation. +// Draft: returns minimal observation (event IDs as JSON); full impl would BroadcastBlob for payloads. func (p *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) { - return nil, errNotImplemented + events := p.buffer.TakeForObservation() + if len(events) == 0 { + return []byte("[]"), nil + } + ids := make([]string, len(events)) + for i, ev := range events { + ids[i] = ev.Event().Event.ID + } + return json.Marshal(ids) } func (p *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, ao types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error { diff --git a/core/services/workflows/v2/observation_buffer.go b/core/services/workflows/v2/observation_buffer.go new file mode 100644 index 00000000000..3f41ddcb184 --- /dev/null +++ b/core/services/workflows/v2/observation_buffer.go @@ -0,0 +1,29 @@ +package v2 + +import ( + "sync" +) + +// ObservationBuffer holds events from Engine.Put for the plugin's Observation to read. +// Draft: in-memory buffer; full impl would integrate with OCR 3.1 KV store. +type ObservationBuffer struct { + mu sync.Mutex + events []EnqueuedTriggerEvent +} + +// Add appends an event to the buffer. Called by OCRQueue.Put. +func (b *ObservationBuffer) Add(event EnqueuedTriggerEvent) { + b.mu.Lock() + defer b.mu.Unlock() + b.events = append(b.events, event) +} + +// TakeForObservation returns and clears buffered events for this round. +// Called by the plugin's Observation to produce its observation. +func (b *ObservationBuffer) TakeForObservation() []EnqueuedTriggerEvent { + b.mu.Lock() + defer b.mu.Unlock() + out := b.events + b.events = nil + return out +} diff --git a/core/services/workflows/v2/ocrqueue.go b/core/services/workflows/v2/ocrqueue.go index 0893cf8ecc7..142a8a11362 100644 --- a/core/services/workflows/v2/ocrqueue.go +++ b/core/services/workflows/v2/ocrqueue.go @@ -12,13 +12,15 @@ var errOCRQueueNotImplemented = errors.New("OCRQueue: draft, use NewOCRQueueWith // OCRQueueDeps holds dependencies for NewOCRQueue. // Delegate owns the oracle; OCRQueue wraps the queue the transmitter feeds. type OCRQueueDeps struct { - Inner limits.QueueLimiter[EnqueuedTriggerEvent] + Inner limits.QueueLimiter[EnqueuedTriggerEvent] + Buffer *ObservationBuffer // events from Put feed plugin's Observation } -// OCRQueue wraps a QueueLimiter and delegates all operations to it. -// Delegate owns the oracle; transmitter decodes reports and Puts into Inner. +// OCRQueue wraps a QueueLimiter. Put buffers to ObservationBuffer (feeds Observation). +// Get/Wait read from Inner (fed by transmitter when consensus reports arrive). type OCRQueue struct { - inner limits.QueueLimiter[EnqueuedTriggerEvent] + inner limits.QueueLimiter[EnqueuedTriggerEvent] + buffer *ObservationBuffer } // NewOCRQueue creates an OCRQueue from the planned dependencies. @@ -26,13 +28,16 @@ func NewOCRQueue(deps OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], if deps.Inner == nil { return nil, errOCRQueueNotImplemented } - return &OCRQueue{inner: deps.Inner}, nil + if deps.Buffer == nil { + return nil, errors.New("OCRQueue requires Buffer") + } + return &OCRQueue{inner: deps.Inner, buffer: deps.Buffer}, nil } // NewOCRQueueWithInnerQueue returns a constructor with the same signature as NewOCRQueue. -func NewOCRQueueWithInnerQueue(inner limits.QueueLimiter[EnqueuedTriggerEvent]) func(OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { +func NewOCRQueueWithInnerQueue(inner limits.QueueLimiter[EnqueuedTriggerEvent], buffer *ObservationBuffer) func(OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { return func(_ OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { - return &OCRQueue{inner: inner}, nil + return &OCRQueue{inner: inner, buffer: buffer}, nil } } @@ -45,7 +50,8 @@ func (q *OCRQueue) Len(ctx context.Context) (int, error) { } func (q *OCRQueue) Put(ctx context.Context, event EnqueuedTriggerEvent) error { - return q.inner.Put(ctx, event) + q.buffer.Add(event) + return nil } func (q *OCRQueue) Get(ctx context.Context) (EnqueuedTriggerEvent, error) { From e93361387382da7a93a69b7adba9409a59d7814e Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Tue, 10 Mar 2026 13:40:05 +0200 Subject: [PATCH 9/9] refactor: clean up --- core/services/chainlink/application.go | 2 +- core/services/cre/cre.go | 25 +++++++------ core/services/ocr2/delegate.go | 15 ++++---- .../ocr2/plugins/triggerqueue/transmitter.go | 14 ++++---- core/services/workflows/v2/config.go | 35 ++++++++----------- core/services/workflows/v2/ocrqueue.go | 3 +- 6 files changed, 45 insertions(+), 49 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 9bb67159343..9081c9465db 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -742,7 +742,7 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err if ocr2Delegate == nil { return nil, errors.New("ocr2.NewDelegate() returned nil") } - creServices.SetOCRTriggerQueueCreator(ocr2Delegate) + creServices.SetOCRTriggerQueueFactory(ocr2Delegate) delegates[job.OffchainReporting2] = ocr2Delegate delegates[job.Bootstrap] = ocrbootstrap.NewDelegateBootstrap( opts.DS, diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index ac7fcbbb0d8..3c1bf7cf72c 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -123,14 +123,15 @@ type Services struct { // callback to wire Delegates into CRE services (e.g. Launcher) when ready SetDelegatesDeps func(*standardcapabilities.Delegate) (commonsrv.Service, error) - // ocrTriggerQueueCreator is set by application.go after creating the OCR delegate. - ocrTriggerQueueCreator v2.TriggerQueueCreator + // triggerQueueFactory builds the trigger queue constructor, which is used + // to manage the queue of trigger events in the engine. + triggerQueueFactory v2.TriggerQueueFactory } -// SetOCRTriggerQueueCreator sets the creator for the OCR-backed trigger queue. +// SetOCRTriggerQueueFactory sets the creator for the OCR-backed trigger queue. // Called by application.go after creating the OCR delegate. -func (s *Services) SetOCRTriggerQueueCreator(c v2.TriggerQueueCreator) { - s.ocrTriggerQueueCreator = c +func (s *Services) SetOCRTriggerQueueFactory(c v2.TriggerQueueFactory) { + s.triggerQueueFactory = c } func (s *Services) close() error { @@ -237,7 +238,7 @@ func (s *Services) newSubservices( cfg, relayerChainInterops, opts, - s.ocrTriggerQueueCreator, + s.triggerQueueFactory, lggr, ds, opts.DonTimeStore, @@ -846,7 +847,7 @@ func newWorkflowRegistrySyncerV2( relayerChainInterops RelayerChainInterops, billingClient metering.BillingClient, opts Opts, - ocrTriggerQueueCreator v2.TriggerQueueCreator, + triggerQueueFactory v2.TriggerQueueFactory, lggr logger.Logger, ds sqlutil.DataSource, dontimeStore *dontime.Store, @@ -891,8 +892,10 @@ func newWorkflowRegistrySyncerV2( engineRegistry := syncerV2.NewEngineRegistry() + // Enable override of default queue behavior + // TODO: use cre settings package for feature flag var triggerQueue limits.QueueLimiter[v2.EnqueuedTriggerEvent] - if ocrTriggerEventQueueEnabled && ocrTriggerQueueCreator != nil { + if ocrTriggerEventQueueEnabled && triggerQueueFactory != nil { cfg := cresettings.Default.PerWorkflow deps := v2.TriggerQueueDeps{ Lf: lf, @@ -900,7 +903,7 @@ func newWorkflowRegistrySyncerV2( DonSubscriber: workflowDonNotifier, } var tqErr error - triggerQueue, tqErr = ocrTriggerQueueCreator.NewTriggerQueueOCRQueue(context.Background(), deps) + triggerQueue, tqErr = triggerQueueFactory.NewOCRTriggerQueue(context.Background(), deps) if tqErr != nil { return nil, nil, fmt.Errorf("could not create OCR trigger queue: %w", tqErr) } @@ -1015,7 +1018,7 @@ func newWorkflowRegistrySyncer( cfg Config, relayerChainInterops RelayerChainInterops, opts Opts, - ocrTriggerQueueCreator v2.TriggerQueueCreator, + triggerQueueFactory v2.TriggerQueueFactory, lggr logger.Logger, ds sqlutil.DataSource, dontimeStore *dontime.Store, @@ -1064,7 +1067,7 @@ func newWorkflowRegistrySyncer( relayerChainInterops, billingClient, opts, - ocrTriggerQueueCreator, + triggerQueueFactory, lggr, ds, dontimeStore, diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 52cfd21e230..784824ba6cf 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -341,10 +341,10 @@ func (d *Delegate) JobType() job.Type { // triggerQueueContractID is a synthetic ID for monitoring; trigger queue has no on-chain contract. const triggerQueueContractID = "trigger_queue" -// NewTriggerQueueOCRQueue creates the trigger queue for the workflow syncer. +// NewOCRTriggerQueue creates the trigger queue for the workflow syncer. // Delegate owns the oracle; builds OCR3_1OracleArgs with TODOs for unwired fields. // Returns OCRQueue wrapping Inner; transmitter feeds consensus events into Inner. -func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { +func (d *Delegate) NewOCRTriggerQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { inner, err := v2.NewStandardTriggerQueue(deps.Lf, deps.Cfg) if err != nil { return nil, err @@ -360,16 +360,16 @@ func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQ V2Bootstrappers: nil, // TODO: wire bootstrap peers from workflow DON (deps.DonSubscriber.WaitForDon or config) ContractConfigTracker: nil, // TODO: in-process config; need static/dynamic config from DON (no on-chain contract) ContractTransmitter: triggerqueue.NewTransmitter(inner, d.lggr), - Database: nil, // TODO: wire OCR DB (e.g. NewDB(d.ds, triggerQueuePluginID, 0, d.lggr)); need unique plugin ID - KeyValueDatabaseFactory: nil, // TODO: wire KV factory (e.g. kvdb.NewPebbleKeyValueDatabaseFactory(path)); path = KeyValueStoreRootDir/trigger_queue + Database: nil, // TODO: wire OCR DB (e.g. NewDB(d.ds, triggerQueuePluginID, 0, d.lggr)); need unique plugin ID + KeyValueDatabaseFactory: nil, // TODO: wire KV factory (e.g. kvdb.NewPebbleKeyValueDatabaseFactory(path)); path = KeyValueStoreRootDir/trigger_queue LocalConfig: ocrtypes.LocalConfig{}, // TODO: wire LocalConfig (BlockDelta, etc.) Logger: ocrLogger, - MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": triggerQueueContractID}, prometheus.DefaultRegisterer), + MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": triggerQueueContractID}, prometheus.DefaultRegisterer), MonitoringEndpoint: nil, // TODO: wire d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, triggerQueueContractID, ...); need rid for trigger queue OffchainConfigDigester: nil, // TODO: in-process digester; derive config digest from DON (members, F, offchain config) OffchainKeyring: nil, // TODO: wire OCR key bundle (d.ks.Get or workflow DON key) OnchainKeyring: nil, // TODO: wire onchain keyring adapter (trigger queue may use same as vault/dontime for in-process) - ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr, buffer), d.lggr, "triggerqueue"), + ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr, buffer), d.lggr, "triggerqueue"), } _ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired @@ -1086,9 +1086,6 @@ func (d *Delegate) newDonTimePlugin( return nil, err } - if err != nil { - return nil, err - } srvs = append(srvs, job.NewServiceAdapter(oracle)) return srvs, nil } diff --git a/core/services/ocr2/plugins/triggerqueue/transmitter.go b/core/services/ocr2/plugins/triggerqueue/transmitter.go index 45b2a3014ef..97f1065061d 100644 --- a/core/services/ocr2/plugins/triggerqueue/transmitter.go +++ b/core/services/ocr2/plugins/triggerqueue/transmitter.go @@ -27,7 +27,7 @@ func NewTransmitter(queue limits.QueueLimiter[v2.EnqueuedTriggerEvent], lggr log return &Transmitter{queue: queue, lggr: lggr.Named("TriggerQueueTransmitter")} } -// decodedTriggerEvent is the result of decoding a report; maps to EnqueuedTriggerEvent fields. +// decodedTriggerEvent is the result of decoding a report type decodedTriggerEvent struct { triggerCapID string triggerIndex int @@ -36,11 +36,10 @@ type decodedTriggerEvent struct { } // decodeReport extracts the consensus slice of events from the OCR report. -// Overridable in tests to mock the decode. +// +// TODO: make interface dependency to mock in tests var decodeReport = func(rwi ocr3types.ReportWithInfo[[]byte]) ([]decodedTriggerEvent, error) { - // TODO: implement real decode. Report format per design: ordered event IDs in rwi.Report; - // fetch payloads from KV (keyValueReader from plugin context; transmitter may need KV ref). - // Stub: no real decode. Real impl would: + // TODO: implement real decode. // - Parse rwi.Report (proto: ordered event IDs) // - For each event ID, fetch payload from KV via "Event::"+eventID // - Unmarshal to TriggerResponse, build decodedTriggerEvent @@ -48,8 +47,9 @@ var decodeReport = func(rwi ocr3types.ReportWithInfo[[]byte]) ([]decodedTriggerE return nil, nil } -// Transmit is called by libOCR when consensus is reached. Decodes the report and -// enqueues each event into the internal queue. Engine Wait() will return these. +// Transmit decodes the report after consensus is reached and +// enqueues each event into the internal queue. Engine Wait() will return the head +// of these events. func (t *Transmitter) Transmit(ctx context.Context, cd types.ConfigDigest, seqNr uint64, rwi ocr3types.ReportWithInfo[[]byte], sigs []types.AttributedOnchainSignature) error { events, err := decodeReport(rwi) if err != nil { diff --git a/core/services/workflows/v2/config.go b/core/services/workflows/v2/config.go index 5aec2bf8759..344c5e42363 100644 --- a/core/services/workflows/v2/config.go +++ b/core/services/workflows/v2/config.go @@ -29,28 +29,24 @@ import ( ) // TriggerQueueDeps holds dependencies for creating the OCR-backed trigger queue. -// Extend as needed when wiring the full OCR 3.1 oracle (bootstrap peers, monitoring, etc.). +// +// TODO: add fields as wiring progresses: +// - BootstrapPeers []commontypes.BootstrapperLocator (from DON or config) +// - MonitoringEndpointGen (delegate has it; may need synthetic contract ID) +// - OCR key bundle (workflow DON uses OCR keys) type TriggerQueueDeps struct { Lf limits.Factory Cfg *cresettings.Workflows DonSubscriber capabilities.DonSubscriber - - // TODO: add fields as wiring progresses: - // - BootstrapPeers []commontypes.BootstrapperLocator (from DON or config) - // - MonitoringEndpointGen (delegate has it; may need synthetic contract ID) - // - OCR key bundle (workflow DON uses OCR keys) } -// TriggerQueueCreator creates an OCR-backed trigger queue using the delegate's OCR infra. -// Implemented by the OCR delegate; called by cre.go when OCRTriggerEventQueueEnabled is on. -// DonSubscriber in deps enables DON sync: the queue stays current with dynamic DON info -// (members, F, bootstrap peers) delivered on the subscription channel. -type TriggerQueueCreator interface { - NewTriggerQueueOCRQueue(ctx context.Context, deps TriggerQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) +// TriggerQueueFactory returns a trigger queue implementation that uses OCR +// to reach consensus on the queue among a workflow DON. +type TriggerQueueFactory interface { + NewOCRTriggerQueue(ctx context.Context, deps TriggerQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) } // NewStandardTriggerQueue creates the default in-process trigger queue. -// Used by the delegate's NewTriggerQueueOCRQueue as a fallback until the full OCR queue is implemented. func NewStandardTriggerQueue(lf limits.Factory, cfg *cresettings.Workflows) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { return limits.MakeQueueLimiter[EnqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) } @@ -135,8 +131,6 @@ func NewLimiters(lf limits.Factory, cfgFn func(*cresettings.Workflows)) (*Engine } // NewLimitersWithTriggerQueue is like NewLimiters but accepts an optional pre-built trigger queue. -// When triggerQueue is non-nil, it is used instead of creating a standard queue. Used when -// OCRTriggerEventQueueEnabled is on and the OCR delegate provides an OCR-backed queue. func NewLimitersWithTriggerQueue(lf limits.Factory, cfgFn func(*cresettings.Workflows), triggerQueue limits.QueueLimiter[EnqueuedTriggerEvent]) (*EngineLimiters, error) { l := &EngineLimiters{} err := l.init(lf, cfgFn, triggerQueue) @@ -164,14 +158,15 @@ func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflo if err != nil { return } + l.TriggerEventQueue, err = limits.MakeQueueLimiter[EnqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) + if err != nil { + return + } + if triggerQueue != nil { l.TriggerEventQueue = triggerQueue - } else { - l.TriggerEventQueue, err = limits.MakeQueueLimiter[EnqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) - if err != nil { - return - } } + l.TriggerEventQueueTime, err = lf.MakeTimeLimiter(cfg.TriggerEventQueueTimeout) if err != nil { return diff --git a/core/services/workflows/v2/ocrqueue.go b/core/services/workflows/v2/ocrqueue.go index 142a8a11362..d684723b702 100644 --- a/core/services/workflows/v2/ocrqueue.go +++ b/core/services/workflows/v2/ocrqueue.go @@ -50,8 +50,9 @@ func (q *OCRQueue) Len(ctx context.Context) (int, error) { } func (q *OCRQueue) Put(ctx context.Context, event EnqueuedTriggerEvent) error { + // TODO: only add to buffer, delegate to Put for now. q.buffer.Add(event) - return nil + return q.inner.Put(ctx, event) } func (q *OCRQueue) Get(ctx context.Context) (EnqueuedTriggerEvent, error) {