Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err
if ocr2Delegate == nil {
return nil, errors.New("ocr2.NewDelegate() returned nil")
}
creServices.SetOCRTriggerQueueCreator(ocr2Delegate)
delegates[job.OffchainReporting2] = ocr2Delegate
delegates[job.Bootstrap] = ocrbootstrap.NewDelegateBootstrap(
opts.DS,
Expand Down
35 changes: 34 additions & 1 deletion core/services/cre/cre.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter"
wftypes "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types"
v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"

"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
)

// ocrTriggerEventQueueEnabled gates use of the OCR-backed trigger queue.
// TODO: replace with cresettings.OCRTriggerEventQueueEnabled when added to chainlink-common.
const ocrTriggerEventQueueEnabled = false

// Keystore is the minimal interface needed from keystore for CRE
type Keystore interface {
CSA() keystore.CSA
Expand Down Expand Up @@ -116,6 +122,15 @@ type Services struct {

// callback to wire Delegates into CRE services (e.g. Launcher) when ready
SetDelegatesDeps func(*standardcapabilities.Delegate) (commonsrv.Service, error)

// ocrTriggerQueueCreator is set by application.go after creating the OCR delegate.
ocrTriggerQueueCreator v2.TriggerQueueCreator
}

// SetOCRTriggerQueueCreator sets the creator for the OCR-backed trigger queue.
// Called by application.go after creating the OCR delegate.
func (s *Services) SetOCRTriggerQueueCreator(c v2.TriggerQueueCreator) {
s.ocrTriggerQueueCreator = c
}

func (s *Services) close() error {
Expand Down Expand Up @@ -222,6 +237,7 @@ func (s *Services) newSubservices(
cfg,
relayerChainInterops,
opts,
s.ocrTriggerQueueCreator,
lggr,
ds,
opts.DonTimeStore,
Expand Down Expand Up @@ -830,6 +846,7 @@ func newWorkflowRegistrySyncerV2(
relayerChainInterops RelayerChainInterops,
billingClient metering.BillingClient,
opts Opts,
ocrTriggerQueueCreator v2.TriggerQueueCreator,
lggr logger.Logger,
ds sqlutil.DataSource,
dontimeStore *dontime.Store,
Expand Down Expand Up @@ -874,7 +891,21 @@ func newWorkflowRegistrySyncerV2(

engineRegistry := syncerV2.NewEngineRegistry()

engineLimiters, err := v2.NewLimiters(lf, nil)
var triggerQueue limits.QueueLimiter[v2.EnqueuedTriggerEvent]
if ocrTriggerEventQueueEnabled && ocrTriggerQueueCreator != nil {
cfg := cresettings.Default.PerWorkflow
deps := v2.TriggerQueueDeps{
Lf: lf,
Cfg: &cfg,
DonSubscriber: workflowDonNotifier,
}
var tqErr error
triggerQueue, tqErr = ocrTriggerQueueCreator.NewTriggerQueueOCRQueue(context.Background(), deps)
if tqErr != nil {
return nil, nil, fmt.Errorf("could not create OCR trigger queue: %w", tqErr)
}
}
engineLimiters, err := v2.NewLimitersWithTriggerQueue(lf, nil, triggerQueue)
if err != nil {
return nil, nil, fmt.Errorf("could not instantiate engine limiters: %w", err)
}
Expand Down Expand Up @@ -984,6 +1015,7 @@ func newWorkflowRegistrySyncer(
cfg Config,
relayerChainInterops RelayerChainInterops,
opts Opts,
ocrTriggerQueueCreator v2.TriggerQueueCreator,
lggr logger.Logger,
ds sqlutil.DataSource,
dontimeStore *dontime.Store,
Expand Down Expand Up @@ -1032,6 +1064,7 @@ func newWorkflowRegistrySyncer(
relayerChainInterops,
billingClient,
opts,
ocrTriggerQueueCreator,
lggr,
ds,
dontimeStore,
Expand Down
42 changes: 40 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
syncerV2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/v2"
v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"

"github.com/smartcontractkit/smdkg/dkgocr/oracleargs"

Expand Down Expand Up @@ -89,6 +90,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/autotelemetry21"
ocr2keeper21core "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
ringconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ring/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/triggerqueue"
vaultocrplugin "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/vault"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper"
Expand Down Expand Up @@ -336,6 +338,44 @@ func (d *Delegate) JobType() job.Type {
return job.OffchainReporting2
}

// triggerQueueContractID is a synthetic ID for monitoring; trigger queue has no on-chain contract.
const triggerQueueContractID = "trigger_queue"

// NewTriggerQueueOCRQueue creates the trigger queue for the workflow syncer.
// Delegate owns the oracle; builds OCR3_1OracleArgs with TODOs for unwired fields.
// Returns OCRQueue wrapping Inner; transmitter feeds consensus events into Inner.
func (d *Delegate) NewTriggerQueueOCRQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) {
inner, err := v2.NewStandardTriggerQueue(deps.Lf, deps.Cfg)
if err != nil {
return nil, err
}
buffer := &v2.ObservationBuffer{}

// Build OCR3_1OracleArgs for the trigger queue. TODO: wire all fields and call libocr2.NewOracle.
ocrLogger := ocrcommon.NewOCRWrapper(d.lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) {
d.lggr.Warnw("OCR trigger queue", "msg", msg)
})
oracleArgs := libocr2.OCR3_1OracleArgs[[]byte]{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer3_1,
V2Bootstrappers: nil, // TODO: wire bootstrap peers from workflow DON (deps.DonSubscriber.WaitForDon or config)
ContractConfigTracker: nil, // TODO: in-process config; need static/dynamic config from DON (no on-chain contract)
ContractTransmitter: triggerqueue.NewTransmitter(inner, d.lggr),
Database: nil, // TODO: wire OCR DB (e.g. NewDB(d.ds, triggerQueuePluginID, 0, d.lggr)); need unique plugin ID
KeyValueDatabaseFactory: nil, // TODO: wire KV factory (e.g. kvdb.NewPebbleKeyValueDatabaseFactory(path)); path = KeyValueStoreRootDir/trigger_queue
LocalConfig: ocrtypes.LocalConfig{}, // TODO: wire LocalConfig (BlockDelta, etc.)
Logger: ocrLogger,
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": triggerQueueContractID}, prometheus.DefaultRegisterer),
MonitoringEndpoint: nil, // TODO: wire d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, triggerQueueContractID, ...); need rid for trigger queue
OffchainConfigDigester: nil, // TODO: in-process digester; derive config digest from DON (members, F, offchain config)
OffchainKeyring: nil, // TODO: wire OCR key bundle (d.ks.Get or workflow DON key)
OnchainKeyring: nil, // TODO: wire onchain keyring adapter (trigger queue may use same as vault/dontime for in-process)
ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr, buffer), d.lggr, "triggerqueue"),
}
_ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired

return v2.NewOCRQueue(v2.OCRQueueDeps{Inner: inner, Buffer: buffer})
}

func (d *Delegate) BeforeJobCreated(_ job.Job) {
// This is only called first time the job is created
d.isNewlyCreatedJob = true
Expand Down Expand Up @@ -2445,7 +2485,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay)
}
dstRid, err := spec.RelayID()

if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}
Expand Down Expand Up @@ -2506,7 +2545,6 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi
return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay)
}
dstRid, err := spec.RelayID()

if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}
Expand Down
44 changes: 44 additions & 0 deletions core/services/ocr2/plugins/triggerqueue/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package triggerqueue

import (
"context"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var _ ocr3_1types.ReportingPluginFactory[[]byte] = (*Factory)(nil)

// Factory creates ReportingPlugin instances for the trigger queue.
type Factory struct {
lggr logger.Logger
buffer *v2.ObservationBuffer
}

// NewFactory creates a new trigger queue plugin factory.
func NewFactory(lggr logger.Logger, buffer *v2.ObservationBuffer) *Factory {
return &Factory{lggr: lggr.Named("TriggerQueueFactory"), buffer: buffer}
}

func (f *Factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig, fetcher ocr3_1types.BlobBroadcastFetcher) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) {
plugin := NewReportingPlugin(f.lggr, f.buffer)
info := ocr3_1types.ReportingPluginInfo1{
Name: "triggerqueue",
Limits: ocr3_1types.ReportingPluginLimits{
MaxQueryBytes: 100,
MaxObservationBytes: 500 * 1024, // 500KB per design doc
MaxReportsPlusPrecursorBytes: 500 * 1024,
MaxReportBytes: 500 * 1024,
MaxReportCount: 1,
MaxKeyValueModifiedKeys: 500,
MaxKeyValueModifiedKeysPlusValuesBytes: 1024 * 1024, // 1MB
MaxBlobPayloadBytes: 25 * 1024, // 25KB per design doc
MaxPerOracleUnexpiredBlobCumulativePayloadBytes: 30 * 1024 * 1024,
MaxPerOracleUnexpiredBlobCount: 1000,
},
}
return plugin, info, nil
}
80 changes: 80 additions & 0 deletions core/services/ocr2/plugins/triggerqueue/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package triggerqueue

import (
"context"
"encoding/json"
"errors"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// errNotImplemented is returned by all plugin methods in this draft.
var errNotImplemented = errors.New("triggerqueue plugin: draft implementation, not yet implemented")

var _ ocr3_1types.ReportingPlugin[[]byte] = (*ReportingPlugin)(nil)

// ReportingPlugin implements OCR 3.1 ReportingPlugin for the trigger queue.
type ReportingPlugin struct {
lggr logger.Logger
buffer *v2.ObservationBuffer
}

// NewReportingPlugin creates a new ReportingPlugin.
func NewReportingPlugin(lggr logger.Logger, buffer *v2.ObservationBuffer) *ReportingPlugin {
return &ReportingPlugin{lggr: lggr.Named("TriggerQueuePlugin"), buffer: buffer}
}

func (p *ReportingPlugin) Query(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Query, error) {
return nil, errNotImplemented
}

// Observation reads from the buffer (filled by OCRQueue.Put) and produces an observation.
// Draft: returns minimal observation (event IDs as JSON); full impl would BroadcastBlob for payloads.
func (p *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) {
events := p.buffer.TakeForObservation()
if len(events) == 0 {
return []byte("[]"), nil
}
ids := make([]string, len(events))
for i, ev := range events {
ids[i] = ev.Event().Event.ID
}
return json.Marshal(ids)
}

func (p *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, ao types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error {
return errNotImplemented
}

func (p *ReportingPlugin) ObservationQuorum(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) (bool, error) {
return false, errNotImplemented
}

func (p *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) {
return ocr3_1types.ReportsPlusPrecursor{}, errNotImplemented
}

func (p *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlusPrecursor ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[[]byte], error) {
return nil, errNotImplemented
}

func (p *ReportingPlugin) Committed(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader) error {
return errNotImplemented
}

func (p *ReportingPlugin) ShouldAcceptAttestedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) {
return false, errNotImplemented
}

func (p *ReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) {
return false, errNotImplemented
}

func (p *ReportingPlugin) Close() error {
return nil
}
72 changes: 72 additions & 0 deletions core/services/ocr2/plugins/triggerqueue/transmitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package triggerqueue

import (
"context"
"time"

"github.com/smartcontractkit/libocr/offchainreporting2/types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
"github.com/smartcontractkit/chainlink/v2/core/logger"
v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"
)

var _ ocr3types.ContractTransmitter[[]byte] = (*Transmitter)(nil)

// Transmitter receives OCR consensus reports and enqueues decoded trigger events.
// No on-chain transmit; reports are consumed locally to feed the internal queue.
type Transmitter struct {
queue limits.QueueLimiter[v2.EnqueuedTriggerEvent]
lggr logger.Logger
}

// NewTransmitter creates a transmitter that decodes reports and Puts into the queue.
func NewTransmitter(queue limits.QueueLimiter[v2.EnqueuedTriggerEvent], lggr logger.Logger) *Transmitter {
return &Transmitter{queue: queue, lggr: lggr.Named("TriggerQueueTransmitter")}
}

// decodedTriggerEvent is the result of decoding a report; maps to EnqueuedTriggerEvent fields.
type decodedTriggerEvent struct {
triggerCapID string
triggerIndex int
timestamp time.Time
event capabilities.TriggerResponse
}

// decodeReport extracts the consensus slice of events from the OCR report.
// Overridable in tests to mock the decode.
var decodeReport = func(rwi ocr3types.ReportWithInfo[[]byte]) ([]decodedTriggerEvent, error) {
// TODO: implement real decode. Report format per design: ordered event IDs in rwi.Report;
// fetch payloads from KV (keyValueReader from plugin context; transmitter may need KV ref).
// Stub: no real decode. Real impl would:
// - Parse rwi.Report (proto: ordered event IDs)
// - For each event ID, fetch payload from KV via "Event::"+eventID
// - Unmarshal to TriggerResponse, build decodedTriggerEvent
_ = rwi
return nil, nil
}

// Transmit is called by libOCR when consensus is reached. Decodes the report and
// enqueues each event into the internal queue. Engine Wait() will return these.
func (t *Transmitter) Transmit(ctx context.Context, cd types.ConfigDigest, seqNr uint64, rwi ocr3types.ReportWithInfo[[]byte], sigs []types.AttributedOnchainSignature) error {
events, err := decodeReport(rwi)
if err != nil {
t.lggr.Errorw("Failed to decode trigger queue report", "err", err, "seqNr", seqNr)
return err
}
for _, ev := range events {
enqueued := v2.NewEnqueuedTriggerEvent(ev.triggerCapID, ev.triggerIndex, ev.timestamp, ev.event)
if err := t.queue.Put(ctx, enqueued); err != nil {
t.lggr.Errorw("Failed to enqueue consensus event", "triggerCapID", ev.triggerCapID, "err", err)
return err
}
t.lggr.Debugw("Enqueued consensus event", "triggerCapID", ev.triggerCapID, "triggerIndex", ev.triggerIndex)
}
return nil
}

func (t *Transmitter) FromAccount(_ context.Context) (types.Account, error) {
return types.Account(""), nil
}
Loading
Loading