diff --git a/internal/events/eventstream.go b/internal/events/eventstream.go index 81ae016..2c45192 100644 --- a/internal/events/eventstream.go +++ b/internal/events/eventstream.go @@ -131,6 +131,9 @@ func NewEventStream( internalDispatcher InternalEventsDispatcher, ) (ees Stream, err error) { esCtx := log.WithLogField(bgCtx, "eventstream", persistedSpec.ID.String()) + if persistedSpec.Type != nil && *persistedSpec.Type == apitypes.EventStreamTypeInternal && internalDispatcher == nil { + return nil, i18n.NewError(esCtx, tmmsgs.MsgMissingInternalDispatcher) + } es := &eventStream{ bgCtx: esCtx, status: apitypes.EventStreamStatusStopped, @@ -167,26 +170,21 @@ func NewEventStream( func (es *eventStream) initAction(startedState *startedStreamState) error { ctx := startedState.ctx - if es.internalDispatcher != nil { - if es.spec.Type != &apitypes.EventStreamTypeInternal { - // TODO: need to understand why this should be panic, copied from the default switch case - panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamTypeForModuleMode, *es.spec.Type)) + + switch *es.spec.Type { + case apitypes.EventStreamTypeWebhook: + wa, err := newWebhookAction(ctx, es.spec.Webhook) + if err != nil { + return err } + startedState.action = wa.attemptBatch + case apitypes.EventStreamTypeWebSocket: + startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch + case apitypes.EventStreamTypeInternal: startedState.action = es.internalDispatcher.ProcessBatchedEvents - } else { - switch *es.spec.Type { - case apitypes.EventStreamTypeWebhook: - wa, err := newWebhookAction(ctx, es.spec.Webhook) - if err != nil { - return err - } - startedState.action = wa.attemptBatch - case apitypes.EventStreamTypeWebSocket: - startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch - default: - // mergeValidateEsConfig always be called previous to this - panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) - } + default: + // mergeValidateEsConfig always be called previous to this + panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) } return nil } diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 225e648..d465d58 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -34,6 +34,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/internal/tmconfig" "github.com/hyperledger/firefly-transaction-manager/internal/ws" "github.com/hyperledger/firefly-transaction-manager/mocks/confirmationsmocks" + "github.com/hyperledger/firefly-transaction-manager/mocks/eventsmocks" "github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks" "github.com/hyperledger/firefly-transaction-manager/mocks/metricsmocks" "github.com/hyperledger/firefly-transaction-manager/mocks/persistencemocks" @@ -69,12 +70,18 @@ func testESConf(t *testing.T, j string) (spec *apitypes.EventStream) { func newTestEventStream(t *testing.T, conf string) (es *eventStream) { tmconfig.Reset() - es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf) + es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, nil) + assert.NoError(t, err) + return es +} +func newTestInternalEventStream(t *testing.T, conf string, iedm InternalEventsDispatcher) (es *eventStream) { + tmconfig.Reset() + es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, iedm) assert.NoError(t, err) return es } -func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, listeners ...*apitypes.Listener) (es *eventStream, err error) { +func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, iedm InternalEventsDispatcher, listeners ...*apitypes.Listener) (es *eventStream, err error) { tmconfig.Reset() config.Set(tmconfig.EventStreamsDefaultsBatchTimeout, "1us") InitDefaults() @@ -91,7 +98,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str &wsmocks.WebSocketChannels{}, listeners, emm, - nil, + iedm, ) mfc.On("EventStreamNewCheckpointStruct").Return(&utCheckpointType{}).Maybe() if err != nil { @@ -138,6 +145,24 @@ func TestNewTestEventStreamMissingID(t *testing.T) { assert.Regexp(t, "FF21048", err) } +func TestNewTestEventStreamMissingInternalDispatcher(t *testing.T) { + tmconfig.Reset() + InitDefaults() + emm := &metricsmocks.EventMetricsEmitter{} + + _, err := NewEventStream(context.Background(), &apitypes.EventStream{ + Type: &apitypes.EventStreamTypeInternal, + }, + &ffcapimocks.API{}, + &persistencemocks.Persistence{}, + &wsmocks.WebSocketChannels{}, + []*apitypes.Listener{}, + emm, + nil, + ) + assert.Regexp(t, "FF21091", err) +} + func TestNewTestEventStreamBadConfig(t *testing.T) { tmconfig.Reset() InitDefaults() @@ -536,6 +561,101 @@ func TestWebSocketEventStreamsE2EBlocks(t *testing.T) { mfc.AssertExpectations(t) } +func TestInternalEventStreamsE2EBlocks(t *testing.T) { + idem := &eventsmocks.InternalEventsDispatcher{} + + es := newTestInternalEventStream(t, `{ + "name": "ut_stream", + "type": "internal" + }`, idem) + + l := &apitypes.Listener{ + ID: apitypes.NewULID(), + Name: strPtr("ut_listener"), + Type: &apitypes.ListenerTypeBlocks, + FromBlock: strPtr(ffcapi.FromBlockLatest), + } + + started := make(chan (chan<- *ffcapi.ListenerEvent), 1) + mfc := es.connector.(*ffcapimocks.API) + + mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { + return r.ID.Equals(es.spec.ID) + })).Run(func(args mock.Arguments) { + r := args[1].(*ffcapi.EventStreamStartRequest) + assert.Empty(t, r.InitialListeners) + }).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil) + + mfc.On("EventStreamStopped", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStoppedRequest) bool { + return r.ID.Equals(es.spec.ID) + })).Return(&ffcapi.EventStreamStoppedResponse{}, ffcapi.ErrorReason(""), nil) + + mcm := es.confirmations.(*confirmationsmocks.Manager) + mcm.On("StartConfirmedBlockListener", mock.Anything, l.ID, "latest", mock.MatchedBy(func(cp *ffcapi.BlockListenerCheckpoint) bool { + return cp.Block == 10000 + }), mock.Anything).Run(func(args mock.Arguments) { + started <- args[4].(chan<- *ffcapi.ListenerEvent) + }).Return(nil) + mcm.On("StopConfirmedBlockListener", mock.Anything, l.ID).Return(nil) + + msp := es.persistence.(*persistencemocks.Persistence) + // load existing checkpoint on start + msp.On("GetCheckpoint", mock.Anything, mock.Anything).Return(&apitypes.EventStreamCheckpoint{ + StreamID: es.spec.ID, + Time: fftypes.Now(), + Listeners: map[fftypes.UUID]json.RawMessage{ + *l.ID: []byte(`{"block":10000}`), + }, + }, nil) + // write a valid checkpoint + msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool { + return cp.StreamID.Equals(es.spec.ID) && string(cp.Listeners[*l.ID]) == `{"block":10001}` + })).Return(nil) + // write a checkpoint when we delete + msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool { + return cp.StreamID.Equals(es.spec.ID) && cp.Listeners[*l.ID] == nil + })).Return(nil) + + _, err := es.AddOrUpdateListener(es.bgCtx, l.ID, l, false) + assert.NoError(t, err) + + err = es.Start(es.bgCtx) + assert.NoError(t, err) + + assert.Equal(t, apitypes.EventStreamStatusStarted, es.Status()) + + err = es.Start(es.bgCtx) // double start is error + assert.Regexp(t, "FF21027", err) + + r := <-started + + mockListenerEvent := &ffcapi.ListenerEvent{ + Checkpoint: &ffcapi.BlockListenerCheckpoint{Block: 10001}, + BlockEvent: &ffcapi.BlockEvent{ + ListenerID: l.ID, + BlockInfo: ffcapi.BlockInfo{ + BlockNumber: fftypes.NewFFBigInt(10001), + BlockHash: fftypes.NewRandB32().String(), + ParentHash: fftypes.NewRandB32().String(), + }, + }, + } + + idem.On("ProcessBatchedEvents", mock.Anything, 1, 1, mock.MatchedBy(func(events []*ffcapi.ListenerEvent) bool { + return events[0] == mockListenerEvent + })).Return() + + r <- mockListenerEvent + + err = es.RemoveListener(es.bgCtx, l.ID) + assert.NoError(t, err) + + err = es.Stop(es.bgCtx) + assert.NoError(t, err) + + mfc.AssertExpectations(t) +} + func TestStartEventStreamCheckpointReadFail(t *testing.T) { es := newTestEventStream(t, `{ @@ -740,7 +860,7 @@ func TestStartWithExistingStreamOk(t *testing.T) { _, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`, l) + }`, nil, l) assert.NoError(t, err) mfc.AssertExpectations(t) @@ -760,7 +880,7 @@ func TestStartWithExistingStreamFail(t *testing.T) { _, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`, l) + }`, nil, l) assert.Regexp(t, "pop", err) mfc.AssertExpectations(t) @@ -1166,7 +1286,7 @@ func TestStartWithExistingBlockListener(t *testing.T) { _, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`, l) + }`, nil, l) assert.NoError(t, err) mfc.AssertExpectations(t) @@ -1184,7 +1304,7 @@ func TestStartAndAddBadListenerType(t *testing.T) { es, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`) + }`, nil) assert.NoError(t, err) _, err = es.AddOrUpdateListener(es.bgCtx, l.ID, l, false) diff --git a/internal/tmmsgs/en_error_messages.go b/internal/tmmsgs/en_error_messages.go index ac6cd36..be1d4e3 100644 --- a/internal/tmmsgs/en_error_messages.go +++ b/internal/tmmsgs/en_error_messages.go @@ -105,5 +105,5 @@ var ( MsgBlockListenerNotStarted = ffe("FF21088", "Block listener %s not started", http.StatusConflict) MsgBadListenerType = ffe("FF21089", "Invalid listener type: %s", http.StatusBadRequest) MsgFromBlockInvalid = ffe("FF21090", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest) - MsgInvalidStreamTypeForModuleMode = ffe("FF21091", "Invalid event stream type '%s', only 'internal' type is supported for module mode", http.StatusBadRequest) + MsgMissingInternalDispatcher = ffe("FF21091", "'internal' type is supported for module mode") ) diff --git a/pkg/fftm/manager.go b/pkg/fftm/manager.go index ccfd8e3..cab8d74 100644 --- a/pkg/fftm/manager.go +++ b/pkg/fftm/manager.go @@ -43,6 +43,7 @@ import ( type Manager interface { Start() error StreamManager + ListenerManager txhandler.TransactionManager Close() } diff --git a/pkg/fftm/route_post_eventstream_listeners.go b/pkg/fftm/route_post_eventstream_listeners.go index 5d19c72..f401c4d 100644 --- a/pkg/fftm/route_post_eventstream_listeners.go +++ b/pkg/fftm/route_post_eventstream_listeners.go @@ -38,7 +38,7 @@ var postEventStreamListeners = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.CreateAndStoreNewStreamListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) + return m.CreateAndStoreNewListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) }, } } diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index 81161e6..8351d01 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -44,8 +44,8 @@ type StreamManager interface { DeleteStream(ctx context.Context, idStr string) error } -type StreamListenerManager interface { - CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) +type ListenerManager interface { + CreateAndStoreNewListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) GetListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) GetListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) UpdateListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) @@ -306,7 +306,7 @@ func (m *manager) createAndStoreNewListenerDeprecated(ctx context.Context, def * return m._createOrUpdateListener(ctx, apitypes.NewULID(), def, false) } -func (m *manager) CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { +func (m *manager) CreateAndStoreNewListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { streamID, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err diff --git a/pkg/fftm/stream_management_test.go b/pkg/fftm/stream_management_test.go index 2e8f19a..83d3953 100644 --- a/pkg/fftm/stream_management_test.go +++ b/pkg/fftm/stream_management_test.go @@ -338,11 +338,11 @@ func TestCreateStreamValidateFail(t *testing.T) { } -func TestCreateAndStoreNewStreamListenerBadID(t *testing.T) { +func TestCreateAndStoreNewListenerBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.CreateAndStoreNewStreamListener(m.ctx, "bad", nil) + _, err := m.CreateAndStoreNewListener(m.ctx, "bad", nil) assert.Regexp(t, "FF00138", err) }