diff --git a/Makefile b/Makefile index bf9930e7..adbb2d99 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,7 @@ mocks-$(strip $(1))-$(strip $(2)): ${MOCKERY} endef $(eval $(call makemock, pkg/ffcapi, API, ffcapimocks)) +$(eval $(call makemock, pkg/fftm, ModuleFunctions, fftmmocks)) $(eval $(call makemock, pkg/txhandler, TransactionHandler, txhandlermocks)) $(eval $(call makemock, pkg/txhandler, ManagedTxEventHandler, txhandlermocks)) $(eval $(call makemock, internal/metrics, TransactionHandlerMetrics, metricsmocks)) @@ -41,6 +42,7 @@ $(eval $(call makemock, internal/persistence, RichQuery, per $(eval $(call makemock, internal/ws, WebSocketChannels, wsmocks)) $(eval $(call makemock, internal/ws, WebSocketServer, wsmocks)) $(eval $(call makemock, internal/events, Stream, eventsmocks)) +$(eval $(call makemock, internal/events, InternalEventsDispatcher, eventsmocks)) $(eval $(call makemock, internal/apiclient, FFTMClient, apiclientmocks)) go-mod-tidy: .ALWAYS diff --git a/internal/events/eventstream.go b/internal/events/eventstream.go index 9de8b39e..81ae0161 100644 --- a/internal/events/eventstream.go +++ b/internal/events/eventstream.go @@ -79,7 +79,9 @@ func InitDefaults() { } } -type eventStreamAction func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error +type InternalEventsDispatcher interface { + ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error +} type eventStreamBatch struct { number int64 @@ -92,7 +94,7 @@ type startedStreamState struct { ctx context.Context cancelCtx func() startTime *fftypes.FFTime - action eventStreamAction + action func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error eventLoopDone chan struct{} batchLoopDone chan struct{} blockListenerDone chan struct{} @@ -110,6 +112,7 @@ type eventStream struct { confirmations confirmations.Manager confirmationsRequired int listeners map[fftypes.UUID]*listener + internalDispatcher InternalEventsDispatcher wsChannels ws.WebSocketChannels retry *retry.Retry currentState *startedStreamState @@ -125,6 +128,7 @@ func NewEventStream( wsChannels ws.WebSocketChannels, initialListeners []*apitypes.Listener, eme metrics.EventMetricsEmitter, + internalDispatcher InternalEventsDispatcher, ) (ees Stream, err error) { esCtx := log.WithLogField(bgCtx, "eventstream", persistedSpec.ID.String()) es := &eventStream{ @@ -134,6 +138,7 @@ func NewEventStream( connector: connector, persistence: persistence, listeners: make(map[fftypes.UUID]*listener), + internalDispatcher: internalDispatcher, wsChannels: wsChannels, retry: esDefaults.retry, checkpointInterval: config.GetDuration(tmconfig.EventStreamsCheckpointInterval), @@ -162,18 +167,26 @@ func NewEventStream( func (es *eventStream) initAction(startedState *startedStreamState) error { ctx := startedState.ctx - switch *es.spec.Type { - case apitypes.EventStreamTypeWebhook: - wa, err := newWebhookAction(ctx, es.spec.Webhook) - if err != nil { - return err + if es.internalDispatcher != nil { + if es.spec.Type != &apitypes.EventStreamTypeInternal { + // TODO: need to understand why this should be panic, copied from the default switch case + panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamTypeForModuleMode, *es.spec.Type)) + } + startedState.action = es.internalDispatcher.ProcessBatchedEvents + } else { + switch *es.spec.Type { + case apitypes.EventStreamTypeWebhook: + wa, err := newWebhookAction(ctx, es.spec.Webhook) + if err != nil { + return err + } + startedState.action = wa.attemptBatch + case apitypes.EventStreamTypeWebSocket: + startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch + default: + // mergeValidateEsConfig always be called previous to this + panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) } - startedState.action = wa.attemptBatch - case apitypes.EventStreamTypeWebSocket: - startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch - default: - // mergeValidateEsConfig always be called previous to this - panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) } return nil } @@ -251,6 +264,8 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda if merged.Webhook, changed, err = mergeValidateWhConfig(ctx, changed, base.Webhook, updates.Webhook); err != nil { return nil, false, err } + case apitypes.EventStreamTypeInternal: + // no checks are required for internal listener default: return nil, false, i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *merged.Type) } diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 290d0924..225e6487 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -91,6 +91,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str &wsmocks.WebSocketChannels{}, listeners, emm, + nil, ) mfc.On("EventStreamNewCheckpointStruct").Return(&utCheckpointType{}).Maybe() if err != nil { @@ -125,12 +126,14 @@ func TestNewTestEventStreamMissingID(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + _, err := NewEventStream(context.Background(), &apitypes.EventStream{}, &ffcapimocks.API{}, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, emm, + nil, ) assert.Regexp(t, "FF21048", err) } @@ -139,12 +142,14 @@ func TestNewTestEventStreamBadConfig(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + _, err := NewEventStream(context.Background(), testESConf(t, `{}`), &ffcapimocks.API{}, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, emm, + nil, ) assert.Regexp(t, "FF21028", err) } diff --git a/internal/tmmsgs/en_error_messages.go b/internal/tmmsgs/en_error_messages.go index 3d358316..ac6cd362 100644 --- a/internal/tmmsgs/en_error_messages.go +++ b/internal/tmmsgs/en_error_messages.go @@ -105,4 +105,5 @@ var ( MsgBlockListenerNotStarted = ffe("FF21088", "Block listener %s not started", http.StatusConflict) MsgBadListenerType = ffe("FF21089", "Invalid listener type: %s", http.StatusBadRequest) MsgFromBlockInvalid = ffe("FF21090", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest) + MsgInvalidStreamTypeForModuleMode = ffe("FF21091", "Invalid event stream type '%s', only 'internal' type is supported for module mode", http.StatusBadRequest) ) diff --git a/mocks/eventsmocks/internal_events_dispatcher.go b/mocks/eventsmocks/internal_events_dispatcher.go new file mode 100644 index 00000000..daccb299 --- /dev/null +++ b/mocks/eventsmocks/internal_events_dispatcher.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.40.2. DO NOT EDIT. + +package eventsmocks + +import ( + context "context" + + apitypes "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + + mock "github.com/stretchr/testify/mock" +) + +// InternalEventsDispatcher is an autogenerated mock type for the InternalEventsDispatcher type +type InternalEventsDispatcher struct { + mock.Mock +} + +// ProcessBatchedEvents provides a mock function with given fields: ctx, batchNumber, attempt, _a3 +func (_m *InternalEventsDispatcher) ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, _a3 []*apitypes.EventWithContext) error { + ret := _m.Called(ctx, batchNumber, attempt, _a3) + + if len(ret) == 0 { + panic("no return value specified for ProcessBatchedEvents") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int, []*apitypes.EventWithContext) error); ok { + r0 = rf(ctx, batchNumber, attempt, _a3) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewInternalEventsDispatcher creates a new instance of InternalEventsDispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewInternalEventsDispatcher(t interface { + mock.TestingT + Cleanup(func()) +}) *InternalEventsDispatcher { + mock := &InternalEventsDispatcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/fftmmocks/module_functions.go b/mocks/fftmmocks/module_functions.go new file mode 100644 index 00000000..e68defc1 --- /dev/null +++ b/mocks/fftmmocks/module_functions.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.40.2. DO NOT EDIT. + +package fftmmocks + +import ( + context "context" + + apitypes "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + + mock "github.com/stretchr/testify/mock" +) + +// ModuleFunctions is an autogenerated mock type for the ModuleFunctions type +type ModuleFunctions struct { + mock.Mock +} + +// ProcessBatchedEvents provides a mock function with given fields: ctx, batchNumber, attempt, events +func (_m *ModuleFunctions) ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error { + ret := _m.Called(ctx, batchNumber, attempt, events) + + if len(ret) == 0 { + panic("no return value specified for ProcessBatchedEvents") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int, []*apitypes.EventWithContext) error); ok { + r0 = rf(ctx, batchNumber, attempt, events) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewModuleFunctions creates a new instance of ModuleFunctions. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewModuleFunctions(t interface { + mock.TestingT + Cleanup(func()) +}) *ModuleFunctions { + mock := &ModuleFunctions{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/apitypes/api_types.go b/pkg/apitypes/api_types.go index d2f4b44a..10a40634 100644 --- a/pkg/apitypes/api_types.go +++ b/pkg/apitypes/api_types.go @@ -43,6 +43,7 @@ type EventStreamType = fftypes.FFEnum var ( EventStreamTypeWebhook = fftypes.FFEnumValue("estype", "webhook") EventStreamTypeWebSocket = fftypes.FFEnumValue("estype", "websocket") + EventStreamTypeInternal = fftypes.FFEnumValue("estype", "internal") ) type ErrorHandlingType = fftypes.FFEnum diff --git a/pkg/fftm/manager.go b/pkg/fftm/manager.go index 1ea07ce6..ccfd8e31 100644 --- a/pkg/fftm/manager.go +++ b/pkg/fftm/manager.go @@ -42,6 +42,8 @@ import ( type Manager interface { Start() error + StreamManager + txhandler.TransactionManager Close() } @@ -50,9 +52,6 @@ type manager struct { cancelCtx func() confirmations confirmations.Manager txHandler txhandler.TransactionHandler - apiServer httpserver.HTTPServer - metricsServer httpserver.HTTPServer - wsServer ws.WebSocketServer persistence persistence.Persistence richQueryEnabled bool @@ -65,20 +64,32 @@ type manager struct { blockListenerDone chan struct{} txHandlerDone <-chan struct{} started bool + + // configurations that are specific to FFTM running as a go module + moduleFunctions ModuleFunctions + + // configurations that are specific to FFTM running as an HTTP server + apiServer httpserver.HTTPServer + metricsServer httpserver.HTTPServer + wsServer ws.WebSocketServer apiServerDone chan error metricsServerDone chan error metricsEnabled bool metricsManager metrics.Metrics } +type ModuleFunctions interface { + events.InternalEventsDispatcher +} + func InitConfig() { tmconfig.Reset() events.InitDefaults() } -func NewManager(ctx context.Context, connector ffcapi.API) (Manager, error) { +func NewManager(ctx context.Context, connector ffcapi.API, mf ModuleFunctions) (Manager, error) { var err error - m := newManager(ctx, connector) + m := newManager(ctx, connector, mf) if err = m.initPersistence(ctx); err != nil { return nil, err } @@ -88,16 +99,18 @@ func NewManager(ctx context.Context, connector ffcapi.API) (Manager, error) { return m, nil } -func newManager(ctx context.Context, connector ffcapi.API) *manager { +func newManager(ctx context.Context, connector ffcapi.API, mf ModuleFunctions) *manager { m := &manager{ connector: connector, apiServerDone: make(chan error), metricsServerDone: make(chan error), - metricsEnabled: config.GetBool(tmconfig.MetricsEnabled), + metricsEnabled: config.GetBool(tmconfig.MetricsEnabled) && mf == nil, eventStreams: make(map[fftypes.UUID]events.Stream), streamsByName: make(map[string]*fftypes.UUID), metricsManager: metrics.NewMetricsManager(ctx), + moduleFunctions: mf, } + m.toolkit = &txhandler.Toolkit{ Connector: m.connector, MetricsManager: m.metricsManager, @@ -108,10 +121,12 @@ func newManager(ctx context.Context, connector ffcapi.API) *manager { func (m *manager) initServices(ctx context.Context) (err error) { m.confirmations = confirmations.NewBlockConfirmationManager(ctx, m.connector, "receipts", m.metricsManager) - m.wsServer = ws.NewWebSocketServer(ctx) - m.apiServer, err = httpserver.NewHTTPServer(ctx, "api", m.router(m.metricsEnabled), m.apiServerDone, tmconfig.APIConfig, tmconfig.CorsConfig) - if err != nil { - return err + if m.moduleFunctions == nil { + m.wsServer = ws.NewWebSocketServer(ctx) + m.apiServer, err = httpserver.NewHTTPServer(ctx, "api", m.router(m.metricsEnabled), m.apiServerDone, tmconfig.APIConfig, tmconfig.CorsConfig) + if err != nil { + return err + } } // check whether a policy engine name is provided @@ -129,13 +144,15 @@ func (m *manager) initServices(ctx context.Context) (err error) { m.toolkit.EventHandler = NewManagedTransactionEventHandler(ctx, m.confirmations, m.wsServer, m.txHandler) m.txHandler.Init(ctx, m.toolkit) - // metrics service must be initialized after transaction handler - // in case the transaction handler has logic in the Init function - // to add more metrics - if m.metricsEnabled { - m.metricsServer, err = httpserver.NewHTTPServer(ctx, "metrics", m.createMetricsMuxRouter(), m.metricsServerDone, tmconfig.MetricsConfig, tmconfig.CorsConfig) - if err != nil { - return err + if m.moduleFunctions == nil { + // metrics service must be initialized after transaction handler + // in case the transaction handler has logic in the Init function + // to add more metrics + if m.metricsEnabled { + m.metricsServer, err = httpserver.NewHTTPServer(ctx, "metrics", m.createMetricsMuxRouter(), m.metricsServerDone, tmconfig.MetricsConfig, tmconfig.CorsConfig) + if err != nil { + return err + } } } return nil @@ -166,9 +183,11 @@ func (m *manager) initPersistence(ctx context.Context) (err error) { } func (m *manager) Start() error { - go httpserver.RunDebugServer(m.ctx, tmconfig.DebugConfig) + if m.moduleFunctions == nil { + go httpserver.RunDebugServer(m.ctx, tmconfig.DebugConfig) + } - if err := m.restoreStreams(); err != nil { + if err := m._restoreStreams(); err != nil { return err } @@ -179,9 +198,11 @@ func (m *manager) Start() error { return err } - go m.runAPIServer() - if m.metricsEnabled { - go m.runMetricsServer() + if m.moduleFunctions == nil { + go m.runAPIServer() + if m.metricsEnabled { + go m.runMetricsServer() + } } go m.confirmations.Start() diff --git a/pkg/fftm/manager_test.go b/pkg/fftm/manager_test.go index bddffbc0..b7ab9bb0 100644 --- a/pkg/fftm/manager_test.go +++ b/pkg/fftm/manager_test.go @@ -86,7 +86,7 @@ func newTestManager(t *testing.T) (string, *manager, func()) { mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() - mm, err := NewManager(context.Background(), mca) + mm, err := NewManager(context.Background(), mca, nil) assert.NoError(t, err) m := mm.(*manager) @@ -105,9 +105,7 @@ func newTestManagerMockNoRichDB(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t, false) - mca := &ffcapimocks.API{} - - m := newManager(context.Background(), mca) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mpm := &persistencemocks.Persistence{} mpm.On("Close", mock.Anything).Return(nil) @@ -133,9 +131,7 @@ func newTestManagerMockRichDB(t *testing.T) (string, *manager, *persistencemocks url := testManagerCommonInit(t, false) - mca := &ffcapimocks.API{} - - m := newManager(context.Background(), mca) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mpm := &persistencemocks.Persistence{} mpm.On("Close", mock.Anything).Return(nil) @@ -170,7 +166,8 @@ func newTestManagerWithMetrics(t *testing.T) (string, *manager, func()) { mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() - mm, err := NewManager(context.Background(), mca) + + mm, err := NewManager(context.Background(), mca, nil) assert.NoError(t, err) m := mm.(*manager) @@ -188,8 +185,7 @@ func newTestManagerWithMetrics(t *testing.T) (string, *manager, func()) { func newTestManagerMockPersistence(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t, false) - - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -210,7 +206,7 @@ func TestNewManagerBadPersistencePathConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Error(t, err) assert.Regexp(t, "FF21050", err) @@ -233,7 +229,7 @@ func TestNewManagerWithLegacyConfiguration(t *testing.T) { tmconfig.DeprecatedPolicyEngineBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -253,7 +249,7 @@ func TestNewManagerBadHttpConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Error(t, err) assert.Regexp(t, "FF00151", err) @@ -272,7 +268,7 @@ func TestNewManagerBadLevelDBConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err = NewManager(context.Background(), nil) + _, err = NewManager(context.Background(), nil, nil) assert.Regexp(t, "FF21049", err) } @@ -286,7 +282,7 @@ func TestNewManagerBadPersistenceConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Regexp(t, "FF21043", err) } @@ -298,7 +294,7 @@ func TestNewManagerInvalidTransactionHandlerName(t *testing.T) { config.Set(tmconfig.PersistenceLevelDBPath, dir) config.Set(tmconfig.TransactionsHandlerName, "wrong") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Regexp(t, "FF21070", err) } @@ -307,7 +303,7 @@ func TestNewManagerMetricsOffByDefault(t *testing.T) { tmconfig.Reset() - m := newManager(context.Background(), nil) + m := newManager(context.Background(), nil, nil) assert.False(t, m.metricsEnabled) } @@ -333,7 +329,7 @@ func TestNewManagerWithMetricsBadConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Error(t, err) assert.Regexp(t, "FF00151", err) } @@ -381,7 +377,7 @@ func TestPSQLInitFail(t *testing.T) { _ = testManagerCommonInit(t, false) config.Set(tmconfig.PersistenceType, "postgres") - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) err := m.initPersistence(context.Background()) assert.Regexp(t, "FF21049", err) @@ -393,7 +389,7 @@ func TestPSQLInitRichQueryEnabled(t *testing.T) { config.Set(tmconfig.PersistenceType, "postgres") tmconfig.PostgresSection.Set(dbsql.SQLConfDatasourceURL, "unused") - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) err := m.initPersistence(context.Background()) assert.NoError(t, err) diff --git a/pkg/fftm/route_delete_eventstream.go b/pkg/fftm/route_delete_eventstream.go index 841a037c..0053acc7 100644 --- a/pkg/fftm/route_delete_eventstream.go +++ b/pkg/fftm/route_delete_eventstream.go @@ -37,7 +37,7 @@ var deleteEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: nil, JSONOutputCodes: []int{http.StatusNoContent}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - err = m.deleteStream(r.Req.Context(), r.PP["streamId"]) + err = m.DeleteStream(r.Req.Context(), r.PP["streamId"]) return nil, err }, } diff --git a/pkg/fftm/route_delete_eventstream_listener.go b/pkg/fftm/route_delete_eventstream_listener.go index 7dc890d8..32fbebcd 100644 --- a/pkg/fftm/route_delete_eventstream_listener.go +++ b/pkg/fftm/route_delete_eventstream_listener.go @@ -39,7 +39,7 @@ var deleteEventStreamListener = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusNoContent}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return nil, m.deleteListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) + return nil, m.DeleteListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_delete_subscription.go b/pkg/fftm/route_delete_subscription.go index fd097996..f1f7b86d 100644 --- a/pkg/fftm/route_delete_subscription.go +++ b/pkg/fftm/route_delete_subscription.go @@ -39,7 +39,7 @@ var deleteSubscription = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusNoContent}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return nil, m.deleteListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) + return nil, m.DeleteListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_get_eventstream.go b/pkg/fftm/route_get_eventstream.go index 19694c98..6c81ecdc 100644 --- a/pkg/fftm/route_get_eventstream.go +++ b/pkg/fftm/route_get_eventstream.go @@ -38,7 +38,7 @@ var getEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.EventStreamWithStatus{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getStream(r.Req.Context(), r.PP["streamId"]) + return m.GetStream(r.Req.Context(), r.PP["streamId"]) }, } } diff --git a/pkg/fftm/route_get_eventstream_listener.go b/pkg/fftm/route_get_eventstream_listener.go index 2e80ea76..4643d279 100644 --- a/pkg/fftm/route_get_eventstream_listener.go +++ b/pkg/fftm/route_get_eventstream_listener.go @@ -39,7 +39,7 @@ var getEventStreamListener = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) + return m.GetListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_get_eventstreams.go b/pkg/fftm/route_get_eventstreams.go index ee5f6c5a..f54be14f 100644 --- a/pkg/fftm/route_get_eventstreams.go +++ b/pkg/fftm/route_get_eventstreams.go @@ -48,7 +48,7 @@ var getEventStreams = func(m *manager) *ffapi.Route { {Name: "after", Description: tmmsgs.APIParamAfter}, } route.JSONHandler = func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getStreams(r.Req.Context(), r.QP["after"], r.QP["limit"]) + return m.GetStreams(r.Req.Context(), r.QP["after"], r.QP["limit"]) } } return route diff --git a/pkg/fftm/route_get_subscription.go b/pkg/fftm/route_get_subscription.go index 68d47e05..1ba040de 100644 --- a/pkg/fftm/route_get_subscription.go +++ b/pkg/fftm/route_get_subscription.go @@ -39,7 +39,7 @@ var getSubscription = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) + return m.GetListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_get_subscriptions.go b/pkg/fftm/route_get_subscriptions.go index ad7d4910..263a3816 100644 --- a/pkg/fftm/route_get_subscriptions.go +++ b/pkg/fftm/route_get_subscriptions.go @@ -50,7 +50,7 @@ var getSubscriptions = func(m *manager) *ffapi.Route { {Name: "after", Description: tmmsgs.APIParamAfter}, } route.JSONHandler = func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getListeners(r.Req.Context(), r.QP["after"], r.QP["limit"]) + return m.GetListeners(r.Req.Context(), r.QP["after"], r.QP["limit"]) } } return route diff --git a/pkg/fftm/route_patch_eventstream.go b/pkg/fftm/route_patch_eventstream.go index 8ebb395e..736c768e 100644 --- a/pkg/fftm/route_patch_eventstream.go +++ b/pkg/fftm/route_patch_eventstream.go @@ -38,7 +38,7 @@ var patchEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.EventStream{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateStream(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.EventStream)) + return m.UpdateStream(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.EventStream)) }, } } diff --git a/pkg/fftm/route_patch_eventstream_listener.go b/pkg/fftm/route_patch_eventstream_listener.go index 76302cc4..802df769 100644 --- a/pkg/fftm/route_patch_eventstream_listener.go +++ b/pkg/fftm/route_patch_eventstream_listener.go @@ -39,7 +39,7 @@ var patchEventStreamListener = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), false) + return m.UpdateListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), false) }, } } diff --git a/pkg/fftm/route_patch_subscription.go b/pkg/fftm/route_patch_subscription.go index 9ae7ce22..12a44e75 100644 --- a/pkg/fftm/route_patch_subscription.go +++ b/pkg/fftm/route_patch_subscription.go @@ -39,7 +39,7 @@ var patchSubscription = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), false) + return m.UpdateListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), false) }, } } diff --git a/pkg/fftm/route_post_eventstream.go b/pkg/fftm/route_post_eventstream.go index c41d70cb..8d7a69cd 100644 --- a/pkg/fftm/route_post_eventstream.go +++ b/pkg/fftm/route_post_eventstream.go @@ -36,7 +36,7 @@ var postEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.EventStream{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.createAndStoreNewStream(r.Req.Context(), r.Input.(*apitypes.EventStream)) + return m.CreateAndStoreNewStream(r.Req.Context(), r.Input.(*apitypes.EventStream)) }, } } diff --git a/pkg/fftm/route_post_eventstream_listener_reset.go b/pkg/fftm/route_post_eventstream_listener_reset.go index 39741977..2ca83d72 100644 --- a/pkg/fftm/route_post_eventstream_listener_reset.go +++ b/pkg/fftm/route_post_eventstream_listener_reset.go @@ -39,7 +39,7 @@ var postEventStreamListenerReset = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), true) + return m.UpdateListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), true) }, } } diff --git a/pkg/fftm/route_post_eventstream_listeners.go b/pkg/fftm/route_post_eventstream_listeners.go index c5e03f08..5d19c723 100644 --- a/pkg/fftm/route_post_eventstream_listeners.go +++ b/pkg/fftm/route_post_eventstream_listeners.go @@ -38,7 +38,7 @@ var postEventStreamListeners = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.createAndStoreNewStreamListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) + return m.CreateAndStoreNewStreamListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) }, } } diff --git a/pkg/fftm/route_post_eventstream_resume.go b/pkg/fftm/route_post_eventstream_resume.go index ce9c2931..64037c28 100644 --- a/pkg/fftm/route_post_eventstream_resume.go +++ b/pkg/fftm/route_post_eventstream_resume.go @@ -39,7 +39,7 @@ var postEventStreamResume = func(m *manager) *ffapi.Route { JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { falsy := false - _, err = m.updateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ + _, err = m.UpdateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ Suspended: &falsy, }) return &struct{}{}, err diff --git a/pkg/fftm/route_post_eventstream_suspend.go b/pkg/fftm/route_post_eventstream_suspend.go index 3551ee42..5a82160f 100644 --- a/pkg/fftm/route_post_eventstream_suspend.go +++ b/pkg/fftm/route_post_eventstream_suspend.go @@ -39,7 +39,7 @@ var postEventStreamSuspend = func(m *manager) *ffapi.Route { JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { truthy := true - _, err = m.updateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ + _, err = m.UpdateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ Suspended: &truthy, }) return &struct{}{}, err diff --git a/pkg/fftm/route_post_subscription_reset.go b/pkg/fftm/route_post_subscription_reset.go index d6f82f91..ced75c31 100644 --- a/pkg/fftm/route_post_subscription_reset.go +++ b/pkg/fftm/route_post_subscription_reset.go @@ -39,7 +39,7 @@ var postSubscriptionReset = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), true) + return m.UpdateListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), true) }, } } diff --git a/pkg/fftm/route_post_subscriptions.go b/pkg/fftm/route_post_subscriptions.go index 313a5fd9..0f4c5a30 100644 --- a/pkg/fftm/route_post_subscriptions.go +++ b/pkg/fftm/route_post_subscriptions.go @@ -37,7 +37,7 @@ var postSubscriptions = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.createAndStoreNewListener(r.Req.Context(), r.Input.(*apitypes.Listener)) + return m.createAndStoreNewListenerDeprecated(r.Req.Context(), r.Input.(*apitypes.Listener)) }, } } diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index 0a6e3df7..81161e60 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -36,7 +36,24 @@ const ( startupPaginationLimit = 25 ) -func (m *manager) restoreStreams() error { +type StreamManager interface { + CreateAndStoreNewStream(ctx context.Context, def *apitypes.EventStream) (*apitypes.EventStream, error) + GetStreams(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.EventStream, err error) + GetStream(ctx context.Context, idStr string) (*apitypes.EventStreamWithStatus, error) + UpdateStream(ctx context.Context, idStr string, updates *apitypes.EventStream) (*apitypes.EventStream, error) + DeleteStream(ctx context.Context, idStr string) error +} + +type StreamListenerManager interface { + CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) + GetListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) + GetListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) + UpdateListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) + DeleteListener(ctx context.Context, streamIDStr, listenerIDStr string) error +} + +// Event stream functions +func (m *manager) _restoreStreams() error { var lastInPage *fftypes.UUID for { streamDefs, err := m.persistence.ListStreamsByCreateTime(m.ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending) @@ -54,10 +71,10 @@ func (m *manager) restoreStreams() error { } // check to see if it's already started if _, ok := m.eventStreams[*def.ID]; !ok { - closeoutName, err := m.reserveStreamName(m.ctx, *def.Name, def.ID) + closeoutName, err := m._reserveStreamName(m.ctx, *def.Name, def.ID) var s events.Stream if err == nil { - s, err = m.addRuntimeStream(def, streamListeners) + s, err = m._addRuntimeStream(def, streamListeners) } if err == nil && !*def.Suspended { err = s.Start(m.ctx) @@ -72,7 +89,7 @@ func (m *manager) restoreStreams() error { return nil } -func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error { +func (m *manager) _deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error { for { // Do not specify after as we just delete everything listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, nil, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) @@ -91,8 +108,8 @@ func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftype return nil } -func (m *manager) addRuntimeStream(def *apitypes.EventStream, listeners []*apitypes.Listener) (events.Stream, error) { - s, err := events.NewEventStream(m.ctx, def, m.connector, m.persistence, m.wsServer, listeners, m.metricsManager) +func (m *manager) _addRuntimeStream(def *apitypes.EventStream, listeners []*apitypes.Listener) (events.Stream, error) { + s, err := events.NewEventStream(m.ctx, def, m.connector, m.persistence, m.wsServer, listeners, m.metricsManager, m.moduleFunctions) if err != nil { return nil, err } @@ -103,31 +120,7 @@ func (m *manager) addRuntimeStream(def *apitypes.EventStream, listeners []*apity return s, nil } -func (m *manager) deleteStream(ctx context.Context, idStr string) error { - id, err := fftypes.ParseUUID(ctx, idStr) - if err != nil { - return err - } - m.mux.Lock() - s := m.eventStreams[*id] - delete(m.eventStreams, *id) - if s != nil { - delete(m.streamsByName, *s.Spec().Name) - } - m.mux.Unlock() - if err := m.deleteAllStreamListeners(ctx, id); err != nil { - return err - } - if err := m.persistence.DeleteStream(ctx, id); err != nil { - return err - } - if s != nil { - return s.Delete(ctx) - } - return nil -} - -func (m *manager) reserveStreamName(ctx context.Context, name string, id *fftypes.UUID) (func(bool), error) { +func (m *manager) _reserveStreamName(ctx context.Context, name string, id *fftypes.UUID) (func(bool), error) { m.mux.Lock() defer m.mux.Unlock() @@ -157,7 +150,7 @@ func (m *manager) reserveStreamName(ctx context.Context, name string, id *fftype }, nil } -func (m *manager) createAndStoreNewStream(ctx context.Context, def *apitypes.EventStream) (*apitypes.EventStream, error) { +func (m *manager) CreateAndStoreNewStream(ctx context.Context, def *apitypes.EventStream) (*apitypes.EventStream, error) { def.ID = apitypes.NewULID() def.Created = nil // set to updated time by events.NewEventStream if def.Name == nil || *def.Name == "" { @@ -165,13 +158,13 @@ func (m *manager) createAndStoreNewStream(ctx context.Context, def *apitypes.Eve } stored := false - closeoutName, err := m.reserveStreamName(ctx, *def.Name, def.ID) + closeoutName, err := m._reserveStreamName(ctx, *def.Name, def.ID) if err != nil { return nil, err } defer func() { closeoutName(stored) }() - s, err := m.addRuntimeStream(def, nil /* no listeners when a new stream is first created */) + s, err := m._addRuntimeStream(def, nil /* no listeners when a new stream is first created */) if err != nil { return nil, err } @@ -192,71 +185,32 @@ func (m *manager) createAndStoreNewStream(ctx context.Context, def *apitypes.Eve return spec, nil } -func (m *manager) createAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { - streamID, err := fftypes.ParseUUID(ctx, idStr) +func (m *manager) GetStreams(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.EventStream, err error) { + after, limit, err := m._parseAfterAndLimit(ctx, afterStr, limitStr) if err != nil { return nil, err } - def.StreamID = streamID - return m.createAndStoreNewListener(ctx, def) -} - -func (m *manager) createAndStoreNewListener(ctx context.Context, def *apitypes.Listener) (*apitypes.Listener, error) { - return m.createOrUpdateListener(ctx, apitypes.NewULID(), def, false) -} - -func (m *manager) updateExistingListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { - l, err := m.getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage - if err != nil { - return nil, err - } - updates.StreamID = l.StreamID - return m.createOrUpdateListener(ctx, l.ID, updates, reset) + return m.persistence.ListStreamsByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) } -func (m *manager) createOrUpdateListener(ctx context.Context, id *fftypes.UUID, newOrUpdates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { - if err := mergeEthCompatMethods(ctx, newOrUpdates); err != nil { - return nil, err - } - var s events.Stream - if newOrUpdates.StreamID != nil { - m.mux.Lock() - s = m.eventStreams[*newOrUpdates.StreamID] - m.mux.Unlock() - } - if s == nil { - return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, newOrUpdates.StreamID) - } - def, err := s.AddOrUpdateListener(ctx, id, newOrUpdates, reset) +func (m *manager) GetStream(ctx context.Context, idStr string) (*apitypes.EventStreamWithStatus, error) { + id, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err } - if err := m.persistence.WriteListener(ctx, def); err != nil { - err1 := s.RemoveListener(ctx, def.ID) - log.L(ctx).Infof("Cleaned up runtime listener after write failed (err?=%v)", err1) - return nil, err - } - return def, nil -} - -func (m *manager) deleteListener(ctx context.Context, streamIDStr, listenerIDStr string) error { - spec, err := m.getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage - if err != nil { - return err - } m.mux.Lock() - s := m.eventStreams[*spec.StreamID] + s := m.eventStreams[*id] m.mux.Unlock() if s == nil { - return i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, spec.StreamID) - } - if err := s.RemoveListener(ctx, spec.ID); err != nil { - return err + return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, idStr) } - return m.persistence.DeleteListener(ctx, spec.ID) + return &apitypes.EventStreamWithStatus{ + EventStream: *s.Spec(), + Status: s.Status(), + }, nil } -func (m *manager) updateStream(ctx context.Context, idStr string, updates *apitypes.EventStream) (*apitypes.EventStream, error) { +func (m *manager) UpdateStream(ctx context.Context, idStr string, updates *apitypes.EventStream) (*apitypes.EventStream, error) { id, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err @@ -270,7 +224,7 @@ func (m *manager) updateStream(ctx context.Context, idStr string, updates *apity nameChanged := false if updates.Name != nil && *updates.Name != "" { - closeoutName, err := m.reserveStreamName(ctx, *updates.Name, id) + closeoutName, err := m._reserveStreamName(ctx, *updates.Name, id) if err != nil { return nil, err } @@ -297,53 +251,71 @@ func (m *manager) updateStream(ctx context.Context, idStr string, updates *apity return spec, nil } -func (m *manager) getStream(ctx context.Context, idStr string) (*apitypes.EventStreamWithStatus, error) { +func (m *manager) DeleteStream(ctx context.Context, idStr string) error { id, err := fftypes.ParseUUID(ctx, idStr) if err != nil { - return nil, err + return err } m.mux.Lock() s := m.eventStreams[*id] + delete(m.eventStreams, *id) + if s != nil { + delete(m.streamsByName, *s.Spec().Name) + } m.mux.Unlock() - if s == nil { - return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, idStr) + if err := m._deleteAllStreamListeners(ctx, id); err != nil { + return err } - return &apitypes.EventStreamWithStatus{ - EventStream: *s.Spec(), - Status: s.Status(), - }, nil -} - -func (m *manager) parseLimit(ctx context.Context, limitStr string) (limit int, err error) { - if limitStr != "" { - if limit, err = strconv.Atoi(limitStr); err != nil { - return -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) - } + if err := m.persistence.DeleteStream(ctx, id); err != nil { + return err } - return limit, nil + if s != nil { + return s.Delete(ctx) + } + return nil } -func (m *manager) parseAfterAndLimit(ctx context.Context, afterStr, limitStr string) (after *fftypes.UUID, limit int, err error) { - if limit, err = m.parseLimit(ctx, limitStr); err != nil { - return nil, -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) +// Stream listener functions + +func (m *manager) _createOrUpdateListener(ctx context.Context, id *fftypes.UUID, newOrUpdates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { + if err := _mergeEthCompatMethods(ctx, newOrUpdates); err != nil { + return nil, err } - if afterStr != "" { - if after, err = fftypes.ParseUUID(ctx, afterStr); err != nil { - return nil, -1, err - } + var s events.Stream + if newOrUpdates.StreamID != nil { + m.mux.Lock() + s = m.eventStreams[*newOrUpdates.StreamID] + m.mux.Unlock() } - return after, limit, nil + if s == nil { + return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, newOrUpdates.StreamID) + } + def, err := s.AddOrUpdateListener(ctx, id, newOrUpdates, reset) + if err != nil { + return nil, err + } + if err := m.persistence.WriteListener(ctx, def); err != nil { + err1 := s.RemoveListener(ctx, def.ID) + log.L(ctx).Infof("Cleaned up runtime listener after write failed (err?=%v)", err1) + return nil, err + } + return def, nil +} + +func (m *manager) createAndStoreNewListenerDeprecated(ctx context.Context, def *apitypes.Listener) (*apitypes.Listener, error) { + return m._createOrUpdateListener(ctx, apitypes.NewULID(), def, false) } -func (m *manager) getStreams(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.EventStream, err error) { - after, limit, err := m.parseAfterAndLimit(ctx, afterStr, limitStr) +func (m *manager) CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { + streamID, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err } - return m.persistence.ListStreamsByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) + def.StreamID = streamID + return m.createAndStoreNewListenerDeprecated(ctx, def) } -func (m *manager) getListenerSpec(ctx context.Context, streamIDStr, listenerIDStr string) (spec *apitypes.Listener, err error) { +func (m *manager) _getListenerSpec(ctx context.Context, streamIDStr, listenerIDStr string) (spec *apitypes.Listener, err error) { var streamID *fftypes.UUID if streamIDStr != "" { streamID, err = fftypes.ParseUUID(ctx, streamIDStr) @@ -367,8 +339,36 @@ func (m *manager) getListenerSpec(ctx context.Context, streamIDStr, listenerIDSt return spec, nil } -func (m *manager) getListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) { - spec, err := m.getListenerSpec(ctx, streamIDStr, listenerIDStr) +func (m *manager) getStreamListenersByCreateTime(ctx context.Context, afterStr, limitStr, idStr string) (streams []*apitypes.Listener, err error) { + after, limit, err := m._parseAfterAndLimit(ctx, afterStr, limitStr) + if err != nil { + return nil, err + } + id, err := fftypes.ParseUUID(ctx, idStr) + if err != nil { + return nil, err + } + return m.persistence.ListStreamListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending, id) +} + +func (m *manager) getStreamListenersRich(ctx context.Context, streamID string, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error) { + id, err := fftypes.ParseUUID(ctx, streamID) + if err != nil { + return nil, nil, err + } + return m.persistence.RichQuery().ListStreamListeners(ctx, id, filter) +} + +func (m *manager) GetListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) { + after, limit, err := m._parseAfterAndLimit(ctx, afterStr, limitStr) + if err != nil { + return nil, err + } + return m.persistence.ListListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) +} + +func (m *manager) GetListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) { + spec, err := m._getListenerSpec(ctx, streamIDStr, listenerIDStr) if err != nil { return nil, err } @@ -385,35 +385,35 @@ func (m *manager) getListener(ctx context.Context, streamIDStr, listenerIDStr st return l, nil } -func (m *manager) getListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) { - after, limit, err := m.parseAfterAndLimit(ctx, afterStr, limitStr) +func (m *manager) UpdateListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { + l, err := m._getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage if err != nil { return nil, err } - return m.persistence.ListListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) + updates.StreamID = l.StreamID + return m._createOrUpdateListener(ctx, l.ID, updates, reset) } -func (m *manager) getStreamListenersByCreateTime(ctx context.Context, afterStr, limitStr, idStr string) (streams []*apitypes.Listener, err error) { - after, limit, err := m.parseAfterAndLimit(ctx, afterStr, limitStr) +func (m *manager) DeleteListener(ctx context.Context, streamIDStr, listenerIDStr string) error { + spec, err := m._getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage if err != nil { - return nil, err + return err } - id, err := fftypes.ParseUUID(ctx, idStr) - if err != nil { - return nil, err + m.mux.Lock() + s := m.eventStreams[*spec.StreamID] + m.mux.Unlock() + if s == nil { + return i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, spec.StreamID) } - return m.persistence.ListStreamListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending, id) -} - -func (m *manager) getStreamListenersRich(ctx context.Context, streamID string, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error) { - id, err := fftypes.ParseUUID(ctx, streamID) - if err != nil { - return nil, nil, err + if err := s.RemoveListener(ctx, spec.ID); err != nil { + return err } - return m.persistence.RichQuery().ListStreamListeners(ctx, id, filter) + return m.persistence.DeleteListener(ctx, spec.ID) } -func mergeEthCompatMethods(ctx context.Context, listener *apitypes.Listener) error { +// other internal functions + +func _mergeEthCompatMethods(ctx context.Context, listener *apitypes.Listener) error { if listener.EthCompatMethods != nil { if listener.Options == nil { listener.Options = fftypes.JSONAnyPtr("{}") @@ -434,3 +434,24 @@ func mergeEthCompatMethods(ctx context.Context, listener *apitypes.Listener) err } return nil } + +func (m *manager) _parseLimit(ctx context.Context, limitStr string) (limit int, err error) { + if limitStr != "" { + if limit, err = strconv.Atoi(limitStr); err != nil { + return -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) + } + } + return limit, nil +} + +func (m *manager) _parseAfterAndLimit(ctx context.Context, afterStr, limitStr string) (after *fftypes.UUID, limit int, err error) { + if limit, err = m._parseLimit(ctx, limitStr); err != nil { + return nil, -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) + } + if afterStr != "" { + if after, err = fftypes.ParseUUID(ctx, afterStr); err != nil { + return nil, -1, err + } + } + return after, limit, nil +} diff --git a/pkg/fftm/stream_management_test.go b/pkg/fftm/stream_management_test.go index 054f458a..2e8f19a1 100644 --- a/pkg/fftm/stream_management_test.go +++ b/pkg/fftm/stream_management_test.go @@ -83,7 +83,7 @@ func TestRestoreStreamsReadFailed(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("ListStreamsByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending).Return(nil, fmt.Errorf("pop")) - err := m.restoreStreams() + err := m._restoreStreams() assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -100,7 +100,7 @@ func TestRestoreListenersReadFailed(t *testing.T) { }, nil) mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), 0, txhandler.SortDirectionAscending, mock.Anything).Return(nil, fmt.Errorf("pop")) - err := m.restoreStreams() + err := m._restoreStreams() assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -116,7 +116,7 @@ func TestRestoreStreamsValidateFail(t *testing.T) { err := m.persistence.WriteStream(m.ctx, es1) assert.NoError(t, err) - err = m.restoreStreams() + err = m._restoreStreams() assert.Regexp(t, "FF21028", err) } @@ -139,7 +139,7 @@ func TestRestoreListenersStartFail(t *testing.T) { err = m.persistence.WriteListener(m.ctx, e1l1) assert.NoError(t, err) - err = m.restoreStreams() + err = m._restoreStreams() assert.Regexp(t, "pop", err) mfc.AssertExpectations(t) @@ -168,7 +168,7 @@ func TestDeleteStartedListener(t *testing.T) { err = m.Start() assert.NoError(t, err) - err = m.deleteStream(m.ctx, es1.ID.String()) + err = m.DeleteStream(m.ctx, es1.ID.String()) assert.NoError(t, err) mfc.AssertExpectations(t) @@ -188,7 +188,7 @@ func TestDeleteStartedListenerFail(t *testing.T) { }, nil) mp.On("DeleteListener", m.ctx, lID).Return(fmt.Errorf("pop")) - err := m.deleteAllStreamListeners(m.ctx, esID) + err := m._deleteAllStreamListeners(m.ctx, esID) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -219,7 +219,7 @@ func TestDeleteStartedListenerWithPagination(t *testing.T) { mp.On("DeleteListener", m.ctx, secondID).Return(nil) mp.On("DeleteListener", m.ctx, thirdID).Return(nil) - err := m.deleteAllStreamListeners(m.ctx, esID) + err := m._deleteAllStreamListeners(m.ctx, esID) assert.NoError(t, err) mp.AssertExpectations(t) @@ -230,7 +230,7 @@ func TestDeleteStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - err := m.deleteStream(m.ctx, "Bad ID") + err := m.DeleteStream(m.ctx, "Bad ID") assert.Regexp(t, "FF00138", err) } @@ -244,7 +244,7 @@ func TestDeleteStreamListenerPersistenceFail(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return(nil, fmt.Errorf("pop")) - err := m.deleteStream(m.ctx, esID.String()) + err := m.DeleteStream(m.ctx, esID.String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -260,7 +260,7 @@ func TestDeleteStreamPersistenceFail(t *testing.T) { mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{}, nil) mp.On("DeleteStream", m.ctx, esID).Return(fmt.Errorf("pop")) - err := m.deleteStream(m.ctx, esID.String()) + err := m.DeleteStream(m.ctx, esID.String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -276,7 +276,7 @@ func TestDeleteStreamNotInitialized(t *testing.T) { mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{}, nil) mp.On("DeleteStream", m.ctx, esID).Return(nil) - err := m.deleteStream(m.ctx, esID.String()) + err := m.DeleteStream(m.ctx, esID.String()) assert.NoError(t, err) mp.AssertExpectations(t) @@ -297,31 +297,31 @@ func TestCreateRenameStreamNameReservation(t *testing.T) { mp.On("GetCheckpoint", m.ctx, mock.Anything).Return(nil, nil) // Reject missing name - _, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{}) + _, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{}) assert.Regexp(t, "FF21028", err) // Attempt to start and encounter a temporary error - _, err = m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) + _, err = m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) assert.Regexp(t, "temporary", err) // Ensure we still allow use of the name after the glitch is fixed - es1, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) + es1, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) assert.NoError(t, err) // Ensure we can't create another stream of same name - _, err = m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) + _, err = m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) assert.Regexp(t, "FF21047", err) // Create a second stream to test clash on rename - es2, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name2")}) + es2, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name2")}) assert.NoError(t, err) // Check for clash - _, err = m.updateStream(m.ctx, es1.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) + _, err = m.UpdateStream(m.ctx, es1.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) assert.Regexp(t, "FF21047", err) // Check for no-op rename to self - _, err = m.updateStream(m.ctx, es2.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) + _, err = m.UpdateStream(m.ctx, es2.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) assert.NoError(t, err) mp.AssertExpectations(t) @@ -333,7 +333,7 @@ func TestCreateStreamValidateFail(t *testing.T) { defer close() wrongType := apitypes.DistributionMode("wrong") - _, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1"), Type: &wrongType}) + _, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1"), Type: &wrongType}) assert.Regexp(t, "FF21029", err) } @@ -342,7 +342,7 @@ func TestCreateAndStoreNewStreamListenerBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.createAndStoreNewStreamListener(m.ctx, "bad", nil) + _, err := m.CreateAndStoreNewStreamListener(m.ctx, "bad", nil) assert.Regexp(t, "FF00138", err) } @@ -353,7 +353,7 @@ func TestUpdateExistingListenerNotFound(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(nil, nil) - _, err := m.updateExistingListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String(), &apitypes.Listener{}, false) + _, err := m.UpdateListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String(), &apitypes.Listener{}, false) assert.Regexp(t, "FF21046", err) mp.AssertExpectations(t) @@ -363,7 +363,7 @@ func TestCreateOrUpdateListenerNotFound(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: apitypes.NewULID()}, false) + _, err := m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: apitypes.NewULID()}, false) assert.Regexp(t, "FF21045", err) } @@ -381,9 +381,9 @@ func TestCreateOrUpdateListenerFail(t *testing.T) { mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - _, err = m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) + _, err = m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -402,14 +402,14 @@ func TestCreateOrUpdateListenerFailMergeEthCompatMethods(t *testing.T) { mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) l := &apitypes.Listener{ StreamID: es.ID, EthCompatMethods: fftypes.JSONAnyPtr(`{}`), } - _, err = m.createOrUpdateListener(m.ctx, apitypes.NewULID(), l, false) + _, err = m._createOrUpdateListener(m.ctx, apitypes.NewULID(), l, false) assert.Error(t, err) mp.AssertExpectations(t) @@ -430,9 +430,9 @@ func TestCreateOrUpdateListenerWriteFail(t *testing.T) { mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerRemove", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerRemoveResponse{}, ffcapi.ErrorReason(""), nil) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - _, err = m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) + _, err = m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -442,7 +442,7 @@ func TestDeleteListenerBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - err := m.deleteListener(m.ctx, "bad ID", "bad ID") + err := m.DeleteListener(m.ctx, "bad ID", "bad ID") assert.Regexp(t, "FF00138", err) } @@ -455,7 +455,7 @@ func TestDeleteListenerStreamNotFound(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(l1, nil) - err := m.deleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) + err := m.DeleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) assert.Regexp(t, "FF21045", err) mp.AssertExpectations(t) @@ -477,14 +477,14 @@ func TestDeleteListenerFail(t *testing.T) { mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerRemove", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - l1, err := m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) + l1, err := m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) assert.NoError(t, err) mp.On("GetListener", m.ctx, mock.Anything).Return(l1, nil) - err = m.deleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) + err = m.DeleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -495,7 +495,7 @@ func TestUpdateStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.updateStream(m.ctx, "bad ID", &apitypes.EventStream{}) + _, err := m.UpdateStream(m.ctx, "bad ID", &apitypes.EventStream{}) assert.Regexp(t, "FF00138", err) } @@ -504,7 +504,7 @@ func TestUpdateStreamNotFound(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.updateStream(m.ctx, apitypes.NewULID().String(), &apitypes.EventStream{}) + _, err := m.UpdateStream(m.ctx, apitypes.NewULID().String(), &apitypes.EventStream{}) assert.Regexp(t, "FF21045", err) } @@ -520,10 +520,10 @@ func TestUpdateStreamBadChanges(t *testing.T) { mp.On("WriteStream", m.ctx, mock.Anything).Return(nil) mp.On("GetCheckpoint", m.ctx, mock.Anything).Return(nil, nil) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) wrongType := apitypes.DistributionMode("wrong") - _, err = m.updateStream(m.ctx, es.ID.String(), &apitypes.EventStream{Type: &wrongType}) + _, err = m.UpdateStream(m.ctx, es.ID.String(), &apitypes.EventStream{Type: &wrongType}) assert.Regexp(t, "FF21029", err) } @@ -539,9 +539,9 @@ func TestUpdateStreamWriteFail(t *testing.T) { mp.On("WriteStream", m.ctx, mock.Anything).Return(fmt.Errorf("pop")) mp.On("GetCheckpoint", m.ctx, mock.Anything).Return(nil, nil) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - _, err = m.updateStream(m.ctx, es.ID.String(), &apitypes.EventStream{}) + _, err = m.UpdateStream(m.ctx, es.ID.String(), &apitypes.EventStream{}) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -552,7 +552,7 @@ func TestGetStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getStream(m.ctx, "bad ID") + _, err := m.GetStream(m.ctx, "bad ID") assert.Regexp(t, "FF00138", err) } @@ -561,7 +561,7 @@ func TestGetStreamNotFound(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getStream(m.ctx, apitypes.NewULID().String()) + _, err := m.GetStream(m.ctx, apitypes.NewULID().String()) assert.Regexp(t, "FF21045", err) } @@ -570,7 +570,7 @@ func TestGetStreamsBadLimit(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getStreams(m.ctx, "", "wrong") + _, err := m.GetStreams(m.ctx, "", "wrong") assert.Regexp(t, "FF21044", err) } @@ -579,7 +579,7 @@ func TestGetListenerBadAfter(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getListeners(m.ctx, "!bad UUID", "") + _, err := m.GetListeners(m.ctx, "!bad UUID", "") assert.Regexp(t, "FF00138", err) } @@ -588,7 +588,7 @@ func TestGetListenerBadStreamID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getListener(m.ctx, "bad ID", apitypes.NewULID().String()) + _, err := m.GetListener(m.ctx, "bad ID", apitypes.NewULID().String()) assert.Regexp(t, "FF00138", err) } @@ -597,7 +597,7 @@ func TestGetListenerBadListenerID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getListener(m.ctx, apitypes.NewULID().String(), "bad ID") + _, err := m.GetListener(m.ctx, apitypes.NewULID().String(), "bad ID") assert.Regexp(t, "FF00138", err) } @@ -609,7 +609,7 @@ func TestGetListenerLookupErr(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) - _, err := m.getListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) + _, err := m.GetListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -623,7 +623,7 @@ func TestGetListenerNotFound(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(nil, nil) - _, err := m.getListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) + _, err := m.GetListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) assert.Regexp(t, "FF21046", err) mp.AssertExpectations(t) @@ -662,7 +662,7 @@ func TestMergeEthCompatMethods(t *testing.T) { EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}]`), Options: fftypes.JSONAnyPtr(`{"otherOption": "otherValue"}`), } - err := mergeEthCompatMethods(context.Background(), l) + err := _mergeEthCompatMethods(context.Background(), l) assert.NoError(t, err) b, err := json.Marshal(l.Options) assert.NoError(t, err) @@ -673,7 +673,7 @@ func TestMergeEthCompatMethods(t *testing.T) { EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}]`), Options: nil, } - err = mergeEthCompatMethods(context.Background(), l) + err = _mergeEthCompatMethods(context.Background(), l) assert.NoError(t, err) b, err = json.Marshal(l.Options) assert.NoError(t, err) @@ -686,14 +686,14 @@ func TestMergeEthCompatMethodsFail(t *testing.T) { EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}`), Options: fftypes.JSONAnyPtr(`{"otherOption": "otherValue"}`), } - err := mergeEthCompatMethods(context.Background(), l) + err := _mergeEthCompatMethods(context.Background(), l) assert.Error(t, err) l = &apitypes.Listener{ EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}]`), Options: fftypes.JSONAnyPtr(`{"otherOption": "otherValue"`), } - err = mergeEthCompatMethods(context.Background(), l) + err = _mergeEthCompatMethods(context.Background(), l) assert.Error(t, err) } @@ -708,7 +708,7 @@ func TestGetListenerStatusFailStillReturn(t *testing.T) { mfc := m.connector.(*ffcapimocks.API) mfc.On("EventListenerHWM", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")).Maybe() - l, err := m.getListener(m.ctx, l1.StreamID.String(), l1.ID.String()) + l, err := m.GetListener(m.ctx, l1.StreamID.String(), l1.ID.String()) assert.NoError(t, err) assert.Nil(t, l.Checkpoint) assert.False(t, l.Catchup) diff --git a/pkg/fftm/transaction_management.go b/pkg/fftm/transaction_management.go index c8d4ebc1..4dcaa0a4 100644 --- a/pkg/fftm/transaction_management.go +++ b/pkg/fftm/transaction_management.go @@ -40,7 +40,7 @@ func (m *manager) getTransactionByIDWithStatus(ctx context.Context, txID string, } func (m *manager) getTransactions(ctx context.Context, afterStr, limitStr, signer string, pending bool, dirString string) (transactions []*apitypes.ManagedTX, err error) { - limit, err := m.parseLimit(ctx, limitStr) + limit, err := m._parseLimit(ctx, limitStr) if err != nil { return nil, err } @@ -120,3 +120,24 @@ func (m *manager) requestTransactionResume(ctx context.Context, txID string) (st return http.StatusAccepted, canceledTx, nil } + +// exposing txhandler functions through manager +func (m *manager) HandleNewTransaction(ctx context.Context, txReq *apitypes.TransactionRequest) (mtx *apitypes.ManagedTX, submissionRejected bool, err error) { + return m.txHandler.HandleNewTransaction(ctx, txReq) +} + +func (m *manager) HandleNewContractDeployment(ctx context.Context, txReq *apitypes.ContractDeployRequest) (mtx *apitypes.ManagedTX, submissionRejected bool, err error) { + return m.txHandler.HandleNewContractDeployment(ctx, txReq) +} + +func (m *manager) HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + return m.txHandler.HandleCancelTransaction(ctx, txID) +} + +func (m *manager) HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + return m.txHandler.HandleSuspendTransaction(ctx, txID) +} + +func (m *manager) HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + return m.txHandler.HandleResumeTransaction(ctx, txID) +} diff --git a/pkg/txhandler/txhandler.go b/pkg/txhandler/txhandler.go index 5802456f..882fa04f 100644 --- a/pkg/txhandler/txhandler.go +++ b/pkg/txhandler/txhandler.go @@ -154,7 +154,18 @@ type TransactionHandler interface { Start(ctx context.Context) (done <-chan struct{}, err error) // Event handling functions + // Instructional events: + TransactionManager + + // Informational events: + // HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction + HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) (err error) + // HandleTransactionReceiptReceived - handles receipt of blockchain transactions for a managed transaction + HandleTransactionReceiptReceived(ctx context.Context, txID string, receipt *ffcapi.TransactionReceiptResponse) (err error) +} + +type TransactionManager interface { // HandleNewTransaction - handles event of adding new transactions onto blockchain HandleNewTransaction(ctx context.Context, txReq *apitypes.TransactionRequest) (mtx *apitypes.ManagedTX, submissionRejected bool, err error) // HandleNewContractDeployment - handles event of adding new smart contract deployment onto blockchain @@ -165,10 +176,4 @@ type TransactionHandler interface { HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) // HandleResumeTransaction - handles event of resuming a suspended managed transaction HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) - - // Informational events: - // HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction - HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) (err error) - // HandleTransactionReceiptReceived - handles receipt of blockchain transactions for a managed transaction - HandleTransactionReceiptReceived(ctx context.Context, txID string, receipt *ffcapi.TransactionReceiptResponse) (err error) }