Skip to content

Commit

Permalink
add module mode and extract public interface
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <chengxuan.xing@kaleido.io>
  • Loading branch information
Chengxuan committed Jul 8, 2024
1 parent 1ac00b5 commit 6373034
Show file tree
Hide file tree
Showing 31 changed files with 451 additions and 267 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mocks-$(strip $(1))-$(strip $(2)): ${MOCKERY}
endef

$(eval $(call makemock, pkg/ffcapi, API, ffcapimocks))
$(eval $(call makemock, pkg/fftm, ModuleFunctions, fftmmocks))
$(eval $(call makemock, pkg/txhandler, TransactionHandler, txhandlermocks))
$(eval $(call makemock, pkg/txhandler, ManagedTxEventHandler, txhandlermocks))
$(eval $(call makemock, internal/metrics, TransactionHandlerMetrics, metricsmocks))
Expand All @@ -41,6 +42,7 @@ $(eval $(call makemock, internal/persistence, RichQuery, per
$(eval $(call makemock, internal/ws, WebSocketChannels, wsmocks))
$(eval $(call makemock, internal/ws, WebSocketServer, wsmocks))
$(eval $(call makemock, internal/events, Stream, eventsmocks))
$(eval $(call makemock, internal/events, InternalEventsDispatcher, eventsmocks))
$(eval $(call makemock, internal/apiclient, FFTMClient, apiclientmocks))

go-mod-tidy: .ALWAYS
Expand Down
41 changes: 28 additions & 13 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func InitDefaults() {
}
}

type eventStreamAction func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error
type InternalEventsDispatcher interface {
ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error
}

type eventStreamBatch struct {
number int64
Expand All @@ -92,7 +94,7 @@ type startedStreamState struct {
ctx context.Context
cancelCtx func()
startTime *fftypes.FFTime
action eventStreamAction
action func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error
eventLoopDone chan struct{}
batchLoopDone chan struct{}
blockListenerDone chan struct{}
Expand All @@ -110,6 +112,7 @@ type eventStream struct {
confirmations confirmations.Manager
confirmationsRequired int
listeners map[fftypes.UUID]*listener
internalDispatcher InternalEventsDispatcher
wsChannels ws.WebSocketChannels
retry *retry.Retry
currentState *startedStreamState
Expand All @@ -125,6 +128,7 @@ func NewEventStream(
wsChannels ws.WebSocketChannels,
initialListeners []*apitypes.Listener,
eme metrics.EventMetricsEmitter,
internalDispatcher InternalEventsDispatcher,
) (ees Stream, err error) {
esCtx := log.WithLogField(bgCtx, "eventstream", persistedSpec.ID.String())
es := &eventStream{
Expand All @@ -134,6 +138,7 @@ func NewEventStream(
connector: connector,
persistence: persistence,
listeners: make(map[fftypes.UUID]*listener),
internalDispatcher: internalDispatcher,
wsChannels: wsChannels,
retry: esDefaults.retry,
checkpointInterval: config.GetDuration(tmconfig.EventStreamsCheckpointInterval),
Expand Down Expand Up @@ -162,18 +167,26 @@ func NewEventStream(

func (es *eventStream) initAction(startedState *startedStreamState) error {
ctx := startedState.ctx
switch *es.spec.Type {
case apitypes.EventStreamTypeWebhook:
wa, err := newWebhookAction(ctx, es.spec.Webhook)
if err != nil {
return err
if es.internalDispatcher != nil {
if es.spec.Type != &apitypes.EventStreamTypeInternal {
// TODO: need to understand why this should be panic, copied from the default switch case
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamTypeForModuleMode, *es.spec.Type))
}
startedState.action = es.internalDispatcher.ProcessBatchedEvents
} else {
switch *es.spec.Type {
case apitypes.EventStreamTypeWebhook:
wa, err := newWebhookAction(ctx, es.spec.Webhook)
if err != nil {
return err
}
startedState.action = wa.attemptBatch
case apitypes.EventStreamTypeWebSocket:
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch
default:
// mergeValidateEsConfig always be called previous to this
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type))
}
startedState.action = wa.attemptBatch
case apitypes.EventStreamTypeWebSocket:
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch
default:
// mergeValidateEsConfig always be called previous to this
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type))
}
return nil
}
Expand Down Expand Up @@ -251,6 +264,8 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda
if merged.Webhook, changed, err = mergeValidateWhConfig(ctx, changed, base.Webhook, updates.Webhook); err != nil {
return nil, false, err
}
case apitypes.EventStreamTypeInternal:
// no checks are required for internal listener
default:
return nil, false, i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *merged.Type)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str
&wsmocks.WebSocketChannels{},
listeners,
emm,
nil,
)
mfc.On("EventStreamNewCheckpointStruct").Return(&utCheckpointType{}).Maybe()
if err != nil {
Expand Down Expand Up @@ -125,12 +126,14 @@ func TestNewTestEventStreamMissingID(t *testing.T) {
tmconfig.Reset()
InitDefaults()
emm := &metricsmocks.EventMetricsEmitter{}

_, err := NewEventStream(context.Background(), &apitypes.EventStream{},
&ffcapimocks.API{},
&persistencemocks.Persistence{},
&wsmocks.WebSocketChannels{},
[]*apitypes.Listener{},
emm,
nil,
)
assert.Regexp(t, "FF21048", err)
}
Expand All @@ -139,12 +142,14 @@ func TestNewTestEventStreamBadConfig(t *testing.T) {
tmconfig.Reset()
InitDefaults()
emm := &metricsmocks.EventMetricsEmitter{}

_, err := NewEventStream(context.Background(), testESConf(t, `{}`),
&ffcapimocks.API{},
&persistencemocks.Persistence{},
&wsmocks.WebSocketChannels{},
[]*apitypes.Listener{},
emm,
nil,
)
assert.Regexp(t, "FF21028", err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/tmmsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,5 @@ var (
MsgBlockListenerNotStarted = ffe("FF21088", "Block listener %s not started", http.StatusConflict)
MsgBadListenerType = ffe("FF21089", "Invalid listener type: %s", http.StatusBadRequest)
MsgFromBlockInvalid = ffe("FF21090", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest)
MsgInvalidStreamTypeForModuleMode = ffe("FF21091", "Invalid event stream type '%s', only 'internal' type is supported for module mode", http.StatusBadRequest)
)
48 changes: 48 additions & 0 deletions mocks/eventsmocks/internal_events_dispatcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions mocks/fftmmocks/module_functions.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apitypes/api_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type EventStreamType = fftypes.FFEnum
var (
EventStreamTypeWebhook = fftypes.FFEnumValue("estype", "webhook")
EventStreamTypeWebSocket = fftypes.FFEnumValue("estype", "websocket")
EventStreamTypeInternal = fftypes.FFEnumValue("estype", "internal")
)

type ErrorHandlingType = fftypes.FFEnum
Expand Down
Loading

0 comments on commit 6373034

Please sign in to comment.