diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 9ee3b2bc8c9..9081c9465db 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -742,6 +742,7 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err if ocr2Delegate == nil { return nil, errors.New("ocr2.NewDelegate() returned nil") } + creServices.SetOCRTriggerQueueFactory(ocr2Delegate) delegates[job.OffchainReporting2] = ocr2Delegate delegates[job.Bootstrap] = ocrbootstrap.NewDelegateBootstrap( opts.DS, diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index baf87584cc8..3c1bf7cf72c 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -58,8 +58,14 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" wftypes "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" + + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" ) +// ocrTriggerEventQueueEnabled gates use of the OCR-backed trigger queue. +// TODO: replace with cresettings.OCRTriggerEventQueueEnabled when added to chainlink-common. +const ocrTriggerEventQueueEnabled = false + // Keystore is the minimal interface needed from keystore for CRE type Keystore interface { CSA() keystore.CSA @@ -116,6 +122,16 @@ type Services struct { // callback to wire Delegates into CRE services (e.g. Launcher) when ready SetDelegatesDeps func(*standardcapabilities.Delegate) (commonsrv.Service, error) + + // triggerQueueFactory builds the trigger queue constructor, which is used + // to manage the queue of trigger events in the engine. + triggerQueueFactory v2.TriggerQueueFactory +} + +// SetOCRTriggerQueueFactory sets the creator for the OCR-backed trigger queue. +// Called by application.go after creating the OCR delegate. +func (s *Services) SetOCRTriggerQueueFactory(c v2.TriggerQueueFactory) { + s.triggerQueueFactory = c } func (s *Services) close() error { @@ -222,6 +238,7 @@ func (s *Services) newSubservices( cfg, relayerChainInterops, opts, + s.triggerQueueFactory, lggr, ds, opts.DonTimeStore, @@ -830,6 +847,7 @@ func newWorkflowRegistrySyncerV2( relayerChainInterops RelayerChainInterops, billingClient metering.BillingClient, opts Opts, + triggerQueueFactory v2.TriggerQueueFactory, lggr logger.Logger, ds sqlutil.DataSource, dontimeStore *dontime.Store, @@ -874,7 +892,23 @@ func newWorkflowRegistrySyncerV2( engineRegistry := syncerV2.NewEngineRegistry() - engineLimiters, err := v2.NewLimiters(lf, nil) + // Enable override of default queue behavior + // TODO: use cre settings package for feature flag + var triggerQueue limits.QueueLimiter[v2.EnqueuedTriggerEvent] + if ocrTriggerEventQueueEnabled && triggerQueueFactory != nil { + cfg := cresettings.Default.PerWorkflow + deps := v2.TriggerQueueDeps{ + Lf: lf, + Cfg: &cfg, + DonSubscriber: workflowDonNotifier, + } + var tqErr error + triggerQueue, tqErr = triggerQueueFactory.NewOCRTriggerQueue(context.Background(), deps) + if tqErr != nil { + return nil, nil, fmt.Errorf("could not create OCR trigger queue: %w", tqErr) + } + } + engineLimiters, err := v2.NewLimitersWithTriggerQueue(lf, nil, triggerQueue) if err != nil { return nil, nil, fmt.Errorf("could not instantiate engine limiters: %w", err) } @@ -984,6 +1018,7 @@ func newWorkflowRegistrySyncer( cfg Config, relayerChainInterops RelayerChainInterops, opts Opts, + triggerQueueFactory v2.TriggerQueueFactory, lggr logger.Logger, ds sqlutil.DataSource, dontimeStore *dontime.Store, @@ -1032,6 +1067,7 @@ func newWorkflowRegistrySyncer( relayerChainInterops, billingClient, opts, + triggerQueueFactory, lggr, ds, dontimeStore, diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 831e257d6cc..784824ba6cf 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -41,6 +41,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes" "github.com/smartcontractkit/chainlink/v2/core/config/env" syncerV2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/v2" + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" "github.com/smartcontractkit/smdkg/dkgocr/oracleargs" @@ -89,6 +90,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/autotelemetry21" ocr2keeper21core "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" ringconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ring/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/triggerqueue" vaultocrplugin "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/vault" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate" "github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper" @@ -336,6 +338,44 @@ func (d *Delegate) JobType() job.Type { return job.OffchainReporting2 } +// triggerQueueContractID is a synthetic ID for monitoring; trigger queue has no on-chain contract. +const triggerQueueContractID = "trigger_queue" + +// NewOCRTriggerQueue creates the trigger queue for the workflow syncer. +// Delegate owns the oracle; builds OCR3_1OracleArgs with TODOs for unwired fields. +// Returns OCRQueue wrapping Inner; transmitter feeds consensus events into Inner. +func (d *Delegate) NewOCRTriggerQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) { + inner, err := v2.NewStandardTriggerQueue(deps.Lf, deps.Cfg) + if err != nil { + return nil, err + } + buffer := &v2.ObservationBuffer{} + + // Build OCR3_1OracleArgs for the trigger queue. TODO: wire all fields and call libocr2.NewOracle. + ocrLogger := ocrcommon.NewOCRWrapper(d.lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + d.lggr.Warnw("OCR trigger queue", "msg", msg) + }) + oracleArgs := libocr2.OCR3_1OracleArgs[[]byte]{ + BinaryNetworkEndpointFactory: d.peerWrapper.Peer3_1, + V2Bootstrappers: nil, // TODO: wire bootstrap peers from workflow DON (deps.DonSubscriber.WaitForDon or config) + ContractConfigTracker: nil, // TODO: in-process config; need static/dynamic config from DON (no on-chain contract) + ContractTransmitter: triggerqueue.NewTransmitter(inner, d.lggr), + Database: nil, // TODO: wire OCR DB (e.g. NewDB(d.ds, triggerQueuePluginID, 0, d.lggr)); need unique plugin ID + KeyValueDatabaseFactory: nil, // TODO: wire KV factory (e.g. kvdb.NewPebbleKeyValueDatabaseFactory(path)); path = KeyValueStoreRootDir/trigger_queue + LocalConfig: ocrtypes.LocalConfig{}, // TODO: wire LocalConfig (BlockDelta, etc.) + Logger: ocrLogger, + MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": triggerQueueContractID}, prometheus.DefaultRegisterer), + MonitoringEndpoint: nil, // TODO: wire d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, triggerQueueContractID, ...); need rid for trigger queue + OffchainConfigDigester: nil, // TODO: in-process digester; derive config digest from DON (members, F, offchain config) + OffchainKeyring: nil, // TODO: wire OCR key bundle (d.ks.Get or workflow DON key) + OnchainKeyring: nil, // TODO: wire onchain keyring adapter (trigger queue may use same as vault/dontime for in-process) + ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr, buffer), d.lggr, "triggerqueue"), + } + _ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired + + return v2.NewOCRQueue(v2.OCRQueueDeps{Inner: inner, Buffer: buffer}) +} + func (d *Delegate) BeforeJobCreated(_ job.Job) { // This is only called first time the job is created d.isNewlyCreatedJob = true @@ -1046,9 +1086,6 @@ func (d *Delegate) newDonTimePlugin( return nil, err } - if err != nil { - return nil, err - } srvs = append(srvs, job.NewServiceAdapter(oracle)) return srvs, nil } @@ -2445,7 +2482,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay) } dstRid, err := spec.RelayID() - if err != nil { return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)} } @@ -2506,7 +2542,6 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay) } dstRid, err := spec.RelayID() - if err != nil { return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)} } diff --git a/core/services/ocr2/plugins/triggerqueue/factory.go b/core/services/ocr2/plugins/triggerqueue/factory.go new file mode 100644 index 00000000000..c7133bcf233 --- /dev/null +++ b/core/services/ocr2/plugins/triggerqueue/factory.go @@ -0,0 +1,44 @@ +package triggerqueue + +import ( + "context" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var _ ocr3_1types.ReportingPluginFactory[[]byte] = (*Factory)(nil) + +// Factory creates ReportingPlugin instances for the trigger queue. +type Factory struct { + lggr logger.Logger + buffer *v2.ObservationBuffer +} + +// NewFactory creates a new trigger queue plugin factory. +func NewFactory(lggr logger.Logger, buffer *v2.ObservationBuffer) *Factory { + return &Factory{lggr: lggr.Named("TriggerQueueFactory"), buffer: buffer} +} + +func (f *Factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig, fetcher ocr3_1types.BlobBroadcastFetcher) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) { + plugin := NewReportingPlugin(f.lggr, f.buffer) + info := ocr3_1types.ReportingPluginInfo1{ + Name: "triggerqueue", + Limits: ocr3_1types.ReportingPluginLimits{ + MaxQueryBytes: 100, + MaxObservationBytes: 500 * 1024, // 500KB per design doc + MaxReportsPlusPrecursorBytes: 500 * 1024, + MaxReportBytes: 500 * 1024, + MaxReportCount: 1, + MaxKeyValueModifiedKeys: 500, + MaxKeyValueModifiedKeysPlusValuesBytes: 1024 * 1024, // 1MB + MaxBlobPayloadBytes: 25 * 1024, // 25KB per design doc + MaxPerOracleUnexpiredBlobCumulativePayloadBytes: 30 * 1024 * 1024, + MaxPerOracleUnexpiredBlobCount: 1000, + }, + } + return plugin, info, nil +} diff --git a/core/services/ocr2/plugins/triggerqueue/plugin.go b/core/services/ocr2/plugins/triggerqueue/plugin.go new file mode 100644 index 00000000000..ed89e362738 --- /dev/null +++ b/core/services/ocr2/plugins/triggerqueue/plugin.go @@ -0,0 +1,80 @@ +package triggerqueue + +import ( + "context" + "encoding/json" + "errors" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// errNotImplemented is returned by all plugin methods in this draft. +var errNotImplemented = errors.New("triggerqueue plugin: draft implementation, not yet implemented") + +var _ ocr3_1types.ReportingPlugin[[]byte] = (*ReportingPlugin)(nil) + +// ReportingPlugin implements OCR 3.1 ReportingPlugin for the trigger queue. +type ReportingPlugin struct { + lggr logger.Logger + buffer *v2.ObservationBuffer +} + +// NewReportingPlugin creates a new ReportingPlugin. +func NewReportingPlugin(lggr logger.Logger, buffer *v2.ObservationBuffer) *ReportingPlugin { + return &ReportingPlugin{lggr: lggr.Named("TriggerQueuePlugin"), buffer: buffer} +} + +func (p *ReportingPlugin) Query(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Query, error) { + return nil, errNotImplemented +} + +// Observation reads from the buffer (filled by OCRQueue.Put) and produces an observation. +// Draft: returns minimal observation (event IDs as JSON); full impl would BroadcastBlob for payloads. +func (p *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) { + events := p.buffer.TakeForObservation() + if len(events) == 0 { + return []byte("[]"), nil + } + ids := make([]string, len(events)) + for i, ev := range events { + ids[i] = ev.Event().Event.ID + } + return json.Marshal(ids) +} + +func (p *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, ao types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error { + return errNotImplemented +} + +func (p *ReportingPlugin) ObservationQuorum(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) (bool, error) { + return false, errNotImplemented +} + +func (p *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { + return ocr3_1types.ReportsPlusPrecursor{}, errNotImplemented +} + +func (p *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlusPrecursor ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[[]byte], error) { + return nil, errNotImplemented +} + +func (p *ReportingPlugin) Committed(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader) error { + return errNotImplemented +} + +func (p *ReportingPlugin) ShouldAcceptAttestedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) { + return false, errNotImplemented +} + +func (p *ReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) { + return false, errNotImplemented +} + +func (p *ReportingPlugin) Close() error { + return nil +} diff --git a/core/services/ocr2/plugins/triggerqueue/transmitter.go b/core/services/ocr2/plugins/triggerqueue/transmitter.go new file mode 100644 index 00000000000..97f1065061d --- /dev/null +++ b/core/services/ocr2/plugins/triggerqueue/transmitter.go @@ -0,0 +1,72 @@ +package triggerqueue + +import ( + "context" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink/v2/core/logger" + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" +) + +var _ ocr3types.ContractTransmitter[[]byte] = (*Transmitter)(nil) + +// Transmitter receives OCR consensus reports and enqueues decoded trigger events. +// No on-chain transmit; reports are consumed locally to feed the internal queue. +type Transmitter struct { + queue limits.QueueLimiter[v2.EnqueuedTriggerEvent] + lggr logger.Logger +} + +// NewTransmitter creates a transmitter that decodes reports and Puts into the queue. +func NewTransmitter(queue limits.QueueLimiter[v2.EnqueuedTriggerEvent], lggr logger.Logger) *Transmitter { + return &Transmitter{queue: queue, lggr: lggr.Named("TriggerQueueTransmitter")} +} + +// decodedTriggerEvent is the result of decoding a report +type decodedTriggerEvent struct { + triggerCapID string + triggerIndex int + timestamp time.Time + event capabilities.TriggerResponse +} + +// decodeReport extracts the consensus slice of events from the OCR report. +// +// TODO: make interface dependency to mock in tests +var decodeReport = func(rwi ocr3types.ReportWithInfo[[]byte]) ([]decodedTriggerEvent, error) { + // TODO: implement real decode. + // - Parse rwi.Report (proto: ordered event IDs) + // - For each event ID, fetch payload from KV via "Event::"+eventID + // - Unmarshal to TriggerResponse, build decodedTriggerEvent + _ = rwi + return nil, nil +} + +// Transmit decodes the report after consensus is reached and +// enqueues each event into the internal queue. Engine Wait() will return the head +// of these events. +func (t *Transmitter) Transmit(ctx context.Context, cd types.ConfigDigest, seqNr uint64, rwi ocr3types.ReportWithInfo[[]byte], sigs []types.AttributedOnchainSignature) error { + events, err := decodeReport(rwi) + if err != nil { + t.lggr.Errorw("Failed to decode trigger queue report", "err", err, "seqNr", seqNr) + return err + } + for _, ev := range events { + enqueued := v2.NewEnqueuedTriggerEvent(ev.triggerCapID, ev.triggerIndex, ev.timestamp, ev.event) + if err := t.queue.Put(ctx, enqueued); err != nil { + t.lggr.Errorw("Failed to enqueue consensus event", "triggerCapID", ev.triggerCapID, "err", err) + return err + } + t.lggr.Debugw("Enqueued consensus event", "triggerCapID", ev.triggerCapID, "triggerIndex", ev.triggerIndex) + } + return nil +} + +func (t *Transmitter) FromAccount(_ context.Context) (types.Account, error) { + return types.Account(""), nil +} diff --git a/core/services/workflows/v2/config.go b/core/services/workflows/v2/config.go index b2636c42c66..344c5e42363 100644 --- a/core/services/workflows/v2/config.go +++ b/core/services/workflows/v2/config.go @@ -1,6 +1,7 @@ package v2 import ( + "context" "errors" "fmt" @@ -27,6 +28,29 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" ) +// TriggerQueueDeps holds dependencies for creating the OCR-backed trigger queue. +// +// TODO: add fields as wiring progresses: +// - BootstrapPeers []commontypes.BootstrapperLocator (from DON or config) +// - MonitoringEndpointGen (delegate has it; may need synthetic contract ID) +// - OCR key bundle (workflow DON uses OCR keys) +type TriggerQueueDeps struct { + Lf limits.Factory + Cfg *cresettings.Workflows + DonSubscriber capabilities.DonSubscriber +} + +// TriggerQueueFactory returns a trigger queue implementation that uses OCR +// to reach consensus on the queue among a workflow DON. +type TriggerQueueFactory interface { + NewOCRTriggerQueue(ctx context.Context, deps TriggerQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) +} + +// NewStandardTriggerQueue creates the default in-process trigger queue. +func NewStandardTriggerQueue(lf limits.Factory, cfg *cresettings.Workflows) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { + return limits.MakeQueueLimiter[EnqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) +} + type EngineConfig struct { Lggr logger.Logger Module host.ModuleV2 @@ -75,7 +99,7 @@ type EngineLimiters struct { TriggerSubscriptionTime limits.TimeLimiter TriggerRegistrationsTime limits.TimeLimiter TriggerSubscription limits.BoundLimiter[int] - TriggerEventQueue limits.QueueLimiter[enqueuedTriggerEvent] + TriggerEventQueue limits.QueueLimiter[EnqueuedTriggerEvent] TriggerEventQueueTime limits.TimeLimiter ExecutionConcurrency limits.ResourcePoolLimiter[int] @@ -103,12 +127,17 @@ type EngineLimiters struct { // NewLimiters returns a new set of EngineLimiters based on the default configuration, and optionally modified by cfgFn. func NewLimiters(lf limits.Factory, cfgFn func(*cresettings.Workflows)) (*EngineLimiters, error) { + return NewLimitersWithTriggerQueue(lf, cfgFn, nil) +} + +// NewLimitersWithTriggerQueue is like NewLimiters but accepts an optional pre-built trigger queue. +func NewLimitersWithTriggerQueue(lf limits.Factory, cfgFn func(*cresettings.Workflows), triggerQueue limits.QueueLimiter[EnqueuedTriggerEvent]) (*EngineLimiters, error) { l := &EngineLimiters{} - err := l.init(lf, cfgFn) + err := l.init(lf, cfgFn, triggerQueue) return l, err } -func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflows)) (err error) { +func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflows), triggerQueue limits.QueueLimiter[EnqueuedTriggerEvent]) (err error) { cfg := cresettings.Default.PerWorkflow // make copy if cfgFn != nil { cfgFn(&cfg) @@ -129,10 +158,15 @@ func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflo if err != nil { return } - l.TriggerEventQueue, err = limits.MakeQueueLimiter[enqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) + l.TriggerEventQueue, err = limits.MakeQueueLimiter[EnqueuedTriggerEvent](lf, cfg.TriggerEventQueueLimit) if err != nil { return } + + if triggerQueue != nil { + l.TriggerEventQueue = triggerQueue + } + l.TriggerEventQueueTime, err = lf.MakeTimeLimiter(cfg.TriggerEventQueueTimeout) if err != nil { return diff --git a/core/services/workflows/v2/engine.go b/core/services/workflows/v2/engine.go index 8cc9cfa992f..cbf815a108d 100644 --- a/core/services/workflows/v2/engine.go +++ b/core/services/workflows/v2/engine.go @@ -72,7 +72,7 @@ type Engine struct { // used to separate registration and unregistration phases triggersRegMu sync.Mutex - allTriggerEventsQueueCh limits.QueueLimiter[enqueuedTriggerEvent] + allTriggerEventsQueueCh limits.QueueLimiter[EnqueuedTriggerEvent] executionsSemaphore limits.ResourcePoolLimiter[int] capCallsSemaphore limits.ResourcePoolLimiter[int] @@ -90,6 +90,15 @@ type triggerCapability struct { method string } +// EnqueuedTriggerEvent is the type queued for workflow trigger execution. +// Implementations are opaque; consumers use the accessors. +type EnqueuedTriggerEvent interface { + TriggerCapID() string + TriggerIndex() int + Timestamp() time.Time + Event() capabilities.TriggerResponse +} + type enqueuedTriggerEvent struct { triggerCapID string triggerIndex int @@ -97,6 +106,16 @@ type enqueuedTriggerEvent struct { event capabilities.TriggerResponse } +func (e *enqueuedTriggerEvent) TriggerCapID() string { return e.triggerCapID } +func (e *enqueuedTriggerEvent) TriggerIndex() int { return e.triggerIndex } +func (e *enqueuedTriggerEvent) Timestamp() time.Time { return e.timestamp } +func (e *enqueuedTriggerEvent) Event() capabilities.TriggerResponse { return e.event } + +// NewEnqueuedTriggerEvent constructs an EnqueuedTriggerEvent for the queue. +func NewEnqueuedTriggerEvent(triggerCapID string, triggerIndex int, timestamp time.Time, event capabilities.TriggerResponse) EnqueuedTriggerEvent { + return &enqueuedTriggerEvent{triggerCapID, triggerIndex, timestamp, event} +} + func TriggerRegistrationID(workflowID string, triggerIndex int) string { return fmt.Sprintf("trigger_reg_%s_%d", workflowID, triggerIndex) } @@ -537,12 +556,7 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error { e.metrics.With(platform.KeyTriggerID, triggerID).IncrementWorkflowTriggerEventErrorCounter(ctx) continue } - if err := e.allTriggerEventsQueueCh.Put(ctx, enqueuedTriggerEvent{ - triggerCapID: triggerID, - triggerIndex: idx, - timestamp: e.cfg.Clock.Now(), - event: event, - }); err != nil { + if err := e.allTriggerEventsQueueCh.Put(ctx, NewEnqueuedTriggerEvent(triggerID, idx, e.cfg.Clock.Now(), event)); err != nil { var errFull limits.ErrorQueueFull if errors.As(err, &errFull) { // queue full, drop the event @@ -570,8 +584,8 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) { if err != nil { return } - eventAge := queueHead.timestamp.Sub(e.cfg.Clock.Now()) - eventID := queueHead.event.Event.ID + eventAge := queueHead.Timestamp().Sub(e.cfg.Clock.Now()) + eventID := queueHead.Event().Event.ID e.logger().Debugw("Popped a trigger event from the queue", "eventID", eventID, "eventAgeMs", eventAge.Milliseconds()) triggerEventMaxAge, err := e.cfg.LocalLimiters.TriggerEventQueueTime.Limit(ctx) if err != nil { @@ -579,7 +593,7 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) { continue } if eventAge > triggerEventMaxAge { - e.logger().Warnw("Trigger event is too old, skipping execution", "triggerID", queueHead.triggerCapID, "eventID", eventID, "eventAgeMs", eventAge.Milliseconds()) + e.logger().Warnw("Trigger event is too old, skipping execution", "triggerID", queueHead.TriggerCapID(), "eventID", eventID, "eventAgeMs", eventAge.Milliseconds()) continue } free, err := e.executionsSemaphore.Wait(ctx, 1) // block if too many concurrent workflow executions @@ -603,10 +617,10 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) { } // startExecution initiates a new workflow execution, blocking until completed -func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueuedTriggerEvent) { - fullExecutionID, err := events.GenerateExecutionIDWithTriggerIndex(e.cfg.WorkflowID, wrappedTriggerEvent.event.Event.ID, wrappedTriggerEvent.triggerIndex) +func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent EnqueuedTriggerEvent) { + fullExecutionID, err := events.GenerateExecutionIDWithTriggerIndex(e.cfg.WorkflowID, wrappedTriggerEvent.Event().Event.ID, wrappedTriggerEvent.TriggerIndex()) if err != nil { - e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.triggerCapID) + e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.TriggerCapID()) return } @@ -644,7 +658,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue } } - triggerEvent := wrappedTriggerEvent.event.Event + triggerEvent := wrappedTriggerEvent.Event().Event var executionID string if e.cfg.FeatureFlags.FeatureMultiTriggerExecutionIDs.Check(ctx, config.Timestamp(executionTimestamp)) == nil { @@ -653,7 +667,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue } else { executionID, err = events.GenerateExecutionID(e.cfg.WorkflowID, triggerEvent.ID) if err != nil { - e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.triggerCapID) + e.logger().Errorw("Failed to generate execution ID", "err", err, "triggerID", wrappedTriggerEvent.TriggerCapID()) return } e.metrics.IncrementExecutionIDLegacyCounter(ctx) @@ -663,9 +677,9 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue _, addErr := e.cfg.ExecutionsStore.Add(ctx, nil, executionID, e.cfg.WorkflowID, store.StatusStarted) if addErr != nil { if errors.Is(addErr, store.ErrDuplicateExecution) { - lggr.Infow("Skipping duplicate execution", "executionID", executionID, "triggerID", wrappedTriggerEvent.triggerCapID, "triggerIndex", wrappedTriggerEvent.triggerIndex) - e.metrics.With(platform.KeyTriggerID, wrappedTriggerEvent.triggerCapID).IncrementWorkflowTriggerEventErrorCounter(ctx) - registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.triggerIndex) + lggr.Infow("Skipping duplicate execution", "executionID", executionID, "triggerID", wrappedTriggerEvent.TriggerCapID(), "triggerIndex", wrappedTriggerEvent.TriggerIndex()) + e.metrics.With(platform.KeyTriggerID, wrappedTriggerEvent.TriggerCapID()).IncrementWorkflowTriggerEventErrorCounter(ctx) + registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.TriggerIndex()) err = e.ackTriggerEvent(ctx, registrationID, &triggerEvent) if err != nil { e.lggr.Errorw("failed to re-ACK trigger event", "eventID", triggerEvent.ID, "err", err) @@ -712,7 +726,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue return } defer execCancel() - executionLogger := logger.With(lggr, "executionID", executionID, "triggerID", wrappedTriggerEvent.triggerCapID, "triggerIndex", wrappedTriggerEvent.triggerIndex) + executionLogger := logger.With(lggr, "executionID", executionID, "triggerID", wrappedTriggerEvent.TriggerCapID(), "triggerIndex", wrappedTriggerEvent.TriggerIndex()) maxUserLogEventsPerExecution, err := e.cfg.LocalLimiters.LogEvent.Limit(ctx) if err != nil { @@ -725,7 +739,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue e.emitUserLogs(execCtx, userLogChan, executionID, loggerLabels) }) - tid, err := safe.IntToUint64(wrappedTriggerEvent.triggerIndex) + tid, err := safe.IntToUint64(wrappedTriggerEvent.TriggerIndex()) if err != nil { executionLogger.Errorw("Failed to convert trigger index to uint64", "err", err) return @@ -735,7 +749,7 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue executionLogger.Infow("Workflow execution starting ...") _ = events.EmitExecutionStartedEvent(ctx, loggerLabels, triggerEvent.ID, executionID) - registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.triggerIndex) + registrationID := TriggerRegistrationID(e.cfg.WorkflowID, wrappedTriggerEvent.TriggerIndex()) err = e.ackTriggerEvent(ctx, registrationID, &triggerEvent) if err != nil { e.lggr.Errorf("failed to ACK trigger event (eventID=%s): %v", triggerEvent.ID, err) diff --git a/core/services/workflows/v2/observation_buffer.go b/core/services/workflows/v2/observation_buffer.go new file mode 100644 index 00000000000..3f41ddcb184 --- /dev/null +++ b/core/services/workflows/v2/observation_buffer.go @@ -0,0 +1,29 @@ +package v2 + +import ( + "sync" +) + +// ObservationBuffer holds events from Engine.Put for the plugin's Observation to read. +// Draft: in-memory buffer; full impl would integrate with OCR 3.1 KV store. +type ObservationBuffer struct { + mu sync.Mutex + events []EnqueuedTriggerEvent +} + +// Add appends an event to the buffer. Called by OCRQueue.Put. +func (b *ObservationBuffer) Add(event EnqueuedTriggerEvent) { + b.mu.Lock() + defer b.mu.Unlock() + b.events = append(b.events, event) +} + +// TakeForObservation returns and clears buffered events for this round. +// Called by the plugin's Observation to produce its observation. +func (b *ObservationBuffer) TakeForObservation() []EnqueuedTriggerEvent { + b.mu.Lock() + defer b.mu.Unlock() + out := b.events + b.events = nil + return out +} diff --git a/core/services/workflows/v2/ocrqueue.go b/core/services/workflows/v2/ocrqueue.go new file mode 100644 index 00000000000..d684723b702 --- /dev/null +++ b/core/services/workflows/v2/ocrqueue.go @@ -0,0 +1,68 @@ +package v2 + +import ( + "context" + "errors" + + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" +) + +var errOCRQueueNotImplemented = errors.New("OCRQueue: draft, use NewOCRQueueWithInnerQueue for delegating implementation") + +// OCRQueueDeps holds dependencies for NewOCRQueue. +// Delegate owns the oracle; OCRQueue wraps the queue the transmitter feeds. +type OCRQueueDeps struct { + Inner limits.QueueLimiter[EnqueuedTriggerEvent] + Buffer *ObservationBuffer // events from Put feed plugin's Observation +} + +// OCRQueue wraps a QueueLimiter. Put buffers to ObservationBuffer (feeds Observation). +// Get/Wait read from Inner (fed by transmitter when consensus reports arrive). +type OCRQueue struct { + inner limits.QueueLimiter[EnqueuedTriggerEvent] + buffer *ObservationBuffer +} + +// NewOCRQueue creates an OCRQueue from the planned dependencies. +func NewOCRQueue(deps OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { + if deps.Inner == nil { + return nil, errOCRQueueNotImplemented + } + if deps.Buffer == nil { + return nil, errors.New("OCRQueue requires Buffer") + } + return &OCRQueue{inner: deps.Inner, buffer: deps.Buffer}, nil +} + +// NewOCRQueueWithInnerQueue returns a constructor with the same signature as NewOCRQueue. +func NewOCRQueueWithInnerQueue(inner limits.QueueLimiter[EnqueuedTriggerEvent], buffer *ObservationBuffer) func(OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { + return func(_ OCRQueueDeps) (limits.QueueLimiter[EnqueuedTriggerEvent], error) { + return &OCRQueue{inner: inner, buffer: buffer}, nil + } +} + +func (q *OCRQueue) Limit(ctx context.Context) (int, error) { + return q.inner.Limit(ctx) +} + +func (q *OCRQueue) Len(ctx context.Context) (int, error) { + return q.inner.Len(ctx) +} + +func (q *OCRQueue) Put(ctx context.Context, event EnqueuedTriggerEvent) error { + // TODO: only add to buffer, delegate to Put for now. + q.buffer.Add(event) + return q.inner.Put(ctx, event) +} + +func (q *OCRQueue) Get(ctx context.Context) (EnqueuedTriggerEvent, error) { + return q.inner.Get(ctx) +} + +func (q *OCRQueue) Wait(ctx context.Context) (EnqueuedTriggerEvent, error) { + return q.inner.Wait(ctx) +} + +func (q *OCRQueue) Close() error { + return q.inner.Close() +}