Skip to content

Commit

Permalink
test internal event stream
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <chengxuan.xing@kaleido.io>
  • Loading branch information
Chengxuan committed Jul 9, 2024
1 parent 6373034 commit ddfae10
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 32 deletions.
34 changes: 16 additions & 18 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func NewEventStream(
internalDispatcher InternalEventsDispatcher,
) (ees Stream, err error) {
esCtx := log.WithLogField(bgCtx, "eventstream", persistedSpec.ID.String())
if persistedSpec.Type != nil && *persistedSpec.Type == apitypes.EventStreamTypeInternal && internalDispatcher == nil {
return nil, i18n.NewError(esCtx, tmmsgs.MsgMissingInternalDispatcher)
}
es := &eventStream{
bgCtx: esCtx,
status: apitypes.EventStreamStatusStopped,
Expand Down Expand Up @@ -167,26 +170,21 @@ func NewEventStream(

func (es *eventStream) initAction(startedState *startedStreamState) error {
ctx := startedState.ctx
if es.internalDispatcher != nil {
if es.spec.Type != &apitypes.EventStreamTypeInternal {
// TODO: need to understand why this should be panic, copied from the default switch case
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamTypeForModuleMode, *es.spec.Type))

switch *es.spec.Type {
case apitypes.EventStreamTypeWebhook:
wa, err := newWebhookAction(ctx, es.spec.Webhook)
if err != nil {
return err
}
startedState.action = wa.attemptBatch
case apitypes.EventStreamTypeWebSocket:
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch
case apitypes.EventStreamTypeInternal:
startedState.action = es.internalDispatcher.ProcessBatchedEvents
} else {
switch *es.spec.Type {
case apitypes.EventStreamTypeWebhook:
wa, err := newWebhookAction(ctx, es.spec.Webhook)
if err != nil {
return err
}
startedState.action = wa.attemptBatch
case apitypes.EventStreamTypeWebSocket:
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch
default:
// mergeValidateEsConfig always be called previous to this
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type))
}
default:
// mergeValidateEsConfig always be called previous to this
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type))
}
return nil
}
Expand Down
134 changes: 127 additions & 7 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/internal/tmconfig"
"github.com/hyperledger/firefly-transaction-manager/internal/ws"
"github.com/hyperledger/firefly-transaction-manager/mocks/confirmationsmocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/eventsmocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/metricsmocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/persistencemocks"
Expand Down Expand Up @@ -69,12 +70,18 @@ func testESConf(t *testing.T, j string) (spec *apitypes.EventStream) {

func newTestEventStream(t *testing.T, conf string) (es *eventStream) {
tmconfig.Reset()
es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf)
es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, nil)
assert.NoError(t, err)
return es
}
func newTestInternalEventStream(t *testing.T, conf string, iedm InternalEventsDispatcher) (es *eventStream) {
tmconfig.Reset()
es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, iedm)
assert.NoError(t, err)
return es
}

func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, listeners ...*apitypes.Listener) (es *eventStream, err error) {
func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, iedm InternalEventsDispatcher, listeners ...*apitypes.Listener) (es *eventStream, err error) {
tmconfig.Reset()
config.Set(tmconfig.EventStreamsDefaultsBatchTimeout, "1us")
InitDefaults()
Expand All @@ -91,7 +98,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str
&wsmocks.WebSocketChannels{},
listeners,
emm,
nil,
iedm,
)
mfc.On("EventStreamNewCheckpointStruct").Return(&utCheckpointType{}).Maybe()
if err != nil {
Expand Down Expand Up @@ -138,6 +145,24 @@ func TestNewTestEventStreamMissingID(t *testing.T) {
assert.Regexp(t, "FF21048", err)
}

func TestNewTestEventStreamMissingInternalDispatcher(t *testing.T) {
tmconfig.Reset()
InitDefaults()
emm := &metricsmocks.EventMetricsEmitter{}

_, err := NewEventStream(context.Background(), &apitypes.EventStream{
Type: &apitypes.EventStreamTypeInternal,
},
&ffcapimocks.API{},
&persistencemocks.Persistence{},
&wsmocks.WebSocketChannels{},
[]*apitypes.Listener{},
emm,
nil,
)
assert.Regexp(t, "FF21091", err)
}

func TestNewTestEventStreamBadConfig(t *testing.T) {
tmconfig.Reset()
InitDefaults()
Expand Down Expand Up @@ -536,6 +561,101 @@ func TestWebSocketEventStreamsE2EBlocks(t *testing.T) {
mfc.AssertExpectations(t)
}

func TestInternalEventStreamsE2EBlocks(t *testing.T) {
idem := &eventsmocks.InternalEventsDispatcher{}

es := newTestInternalEventStream(t, `{
"name": "ut_stream",
"type": "internal"
}`, idem)

l := &apitypes.Listener{
ID: apitypes.NewULID(),
Name: strPtr("ut_listener"),
Type: &apitypes.ListenerTypeBlocks,
FromBlock: strPtr(ffcapi.FromBlockLatest),
}

started := make(chan (chan<- *ffcapi.ListenerEvent), 1)
mfc := es.connector.(*ffcapimocks.API)

mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool {
return r.ID.Equals(es.spec.ID)
})).Run(func(args mock.Arguments) {
r := args[1].(*ffcapi.EventStreamStartRequest)
assert.Empty(t, r.InitialListeners)
}).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil)

mfc.On("EventStreamStopped", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStoppedRequest) bool {
return r.ID.Equals(es.spec.ID)
})).Return(&ffcapi.EventStreamStoppedResponse{}, ffcapi.ErrorReason(""), nil)

mcm := es.confirmations.(*confirmationsmocks.Manager)
mcm.On("StartConfirmedBlockListener", mock.Anything, l.ID, "latest", mock.MatchedBy(func(cp *ffcapi.BlockListenerCheckpoint) bool {
return cp.Block == 10000
}), mock.Anything).Run(func(args mock.Arguments) {
started <- args[4].(chan<- *ffcapi.ListenerEvent)
}).Return(nil)
mcm.On("StopConfirmedBlockListener", mock.Anything, l.ID).Return(nil)

msp := es.persistence.(*persistencemocks.Persistence)
// load existing checkpoint on start
msp.On("GetCheckpoint", mock.Anything, mock.Anything).Return(&apitypes.EventStreamCheckpoint{
StreamID: es.spec.ID,
Time: fftypes.Now(),
Listeners: map[fftypes.UUID]json.RawMessage{
*l.ID: []byte(`{"block":10000}`),
},
}, nil)
// write a valid checkpoint
msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool {
return cp.StreamID.Equals(es.spec.ID) && string(cp.Listeners[*l.ID]) == `{"block":10001}`
})).Return(nil)
// write a checkpoint when we delete
msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool {
return cp.StreamID.Equals(es.spec.ID) && cp.Listeners[*l.ID] == nil
})).Return(nil)

_, err := es.AddOrUpdateListener(es.bgCtx, l.ID, l, false)
assert.NoError(t, err)

err = es.Start(es.bgCtx)
assert.NoError(t, err)

assert.Equal(t, apitypes.EventStreamStatusStarted, es.Status())

err = es.Start(es.bgCtx) // double start is error
assert.Regexp(t, "FF21027", err)

r := <-started

mockListenerEvent := &ffcapi.ListenerEvent{
Checkpoint: &ffcapi.BlockListenerCheckpoint{Block: 10001},
BlockEvent: &ffcapi.BlockEvent{
ListenerID: l.ID,
BlockInfo: ffcapi.BlockInfo{
BlockNumber: fftypes.NewFFBigInt(10001),
BlockHash: fftypes.NewRandB32().String(),
ParentHash: fftypes.NewRandB32().String(),
},
},
}

idem.On("ProcessBatchedEvents", mock.Anything, 1, 1, mock.MatchedBy(func(events []*ffcapi.ListenerEvent) bool {
return events[0] == mockListenerEvent
})).Return()

r <- mockListenerEvent

err = es.RemoveListener(es.bgCtx, l.ID)
assert.NoError(t, err)

err = es.Stop(es.bgCtx)
assert.NoError(t, err)

mfc.AssertExpectations(t)
}

func TestStartEventStreamCheckpointReadFail(t *testing.T) {

es := newTestEventStream(t, `{
Expand Down Expand Up @@ -740,7 +860,7 @@ func TestStartWithExistingStreamOk(t *testing.T) {

_, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`, l)
}`, nil, l)
assert.NoError(t, err)

mfc.AssertExpectations(t)
Expand All @@ -760,7 +880,7 @@ func TestStartWithExistingStreamFail(t *testing.T) {

_, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`, l)
}`, nil, l)
assert.Regexp(t, "pop", err)

mfc.AssertExpectations(t)
Expand Down Expand Up @@ -1166,7 +1286,7 @@ func TestStartWithExistingBlockListener(t *testing.T) {

_, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`, l)
}`, nil, l)
assert.NoError(t, err)

mfc.AssertExpectations(t)
Expand All @@ -1184,7 +1304,7 @@ func TestStartAndAddBadListenerType(t *testing.T) {

es, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`)
}`, nil)
assert.NoError(t, err)

_, err = es.AddOrUpdateListener(es.bgCtx, l.ID, l, false)
Expand Down
2 changes: 1 addition & 1 deletion internal/tmmsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,5 @@ var (
MsgBlockListenerNotStarted = ffe("FF21088", "Block listener %s not started", http.StatusConflict)
MsgBadListenerType = ffe("FF21089", "Invalid listener type: %s", http.StatusBadRequest)
MsgFromBlockInvalid = ffe("FF21090", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest)
MsgInvalidStreamTypeForModuleMode = ffe("FF21091", "Invalid event stream type '%s', only 'internal' type is supported for module mode", http.StatusBadRequest)
MsgMissingInternalDispatcher = ffe("FF21091", "'internal' type is supported for module mode")
)
1 change: 1 addition & 0 deletions pkg/fftm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
type Manager interface {
Start() error
StreamManager
ListenerManager
txhandler.TransactionManager
Close()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fftm/route_post_eventstream_listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var postEventStreamListeners = func(m *manager) *ffapi.Route {
JSONOutputValue: func() interface{} { return &apitypes.Listener{} },
JSONOutputCodes: []int{http.StatusOK},
JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) {
return m.CreateAndStoreNewStreamListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener))
return m.CreateAndStoreNewListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener))
},
}
}
6 changes: 3 additions & 3 deletions pkg/fftm/stream_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type StreamManager interface {
DeleteStream(ctx context.Context, idStr string) error
}

type StreamListenerManager interface {
CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error)
type ListenerManager interface {
CreateAndStoreNewListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error)
GetListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error)
GetListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error)
UpdateListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error)
Expand Down Expand Up @@ -306,7 +306,7 @@ func (m *manager) createAndStoreNewListenerDeprecated(ctx context.Context, def *
return m._createOrUpdateListener(ctx, apitypes.NewULID(), def, false)
}

func (m *manager) CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) {
func (m *manager) CreateAndStoreNewListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) {
streamID, err := fftypes.ParseUUID(ctx, idStr)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/fftm/stream_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,11 @@ func TestCreateStreamValidateFail(t *testing.T) {

}

func TestCreateAndStoreNewStreamListenerBadID(t *testing.T) {
func TestCreateAndStoreNewListenerBadID(t *testing.T) {
_, m, close := newTestManagerMockPersistence(t)
defer close()

_, err := m.CreateAndStoreNewStreamListener(m.ctx, "bad", nil)
_, err := m.CreateAndStoreNewListener(m.ctx, "bad", nil)
assert.Regexp(t, "FF00138", err)
}

Expand Down

0 comments on commit ddfae10

Please sign in to comment.