Skip to content

Commit

Permalink
feat: add e2e testing (#243)
Browse files Browse the repository at this point in the history
* feat: add e2e recipe testing

* refactor: shift e2e to test folder

* refactor: restructure test folder

* fix: close method to close any connection

* refactor: e2e test

* feat: add make test-e2e command

* chore: update test yaml

* refactor: change build tag

* chore: add count
  • Loading branch information
scortier authored Oct 8, 2021
1 parent 8a5bc4c commit 97e3e44
Show file tree
Hide file tree
Showing 31 changed files with 509 additions and 104 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ jobs:
- name: Install packages
run: go mod tidy
- name: Run Test
run: make test
run: make test
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ test:
test-coverage: test
go tool cover -html=coverage.out

test-e2e:
go test ./test/e2e -tags=integration -count=1

generate-proto: ## regenerate protos
@echo " > cloning protobuf from odpf/proton"
@echo " > generating protobuf"
Expand Down
6 changes: 6 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *str
return err
}, defaultBatchSize)

stream.onClose(func() {
if err = sink.Close(); err != nil {
r.logger.Warn("error closing sink", "sink", sr.Name, "error", err)
}
})

return
}

Expand Down
51 changes: 30 additions & 21 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package agent_test
import (
"context"
"errors"
"github.com/odpf/meteor/test"
"testing"
"time"

"github.com/odpf/meteor/agent"
"github.com/odpf/meteor/models"
"github.com/odpf/meteor/models/odpf/assets"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/recipe"
"github.com/odpf/meteor/registry"
"github.com/odpf/meteor/test/mocks"
"github.com/odpf/meteor/test/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"testing"
"time"
)

var mockCtx = mock.AnythingOfType("*context.emptyCtx")
Expand Down Expand Up @@ -43,7 +42,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: registry.NewExtractorFactory(),
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: registry.NewSinkFactory(),
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.IsType(t, agent.Run{}, run)
Expand All @@ -67,7 +66,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: registry.NewExtractorFactory(),
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -92,7 +91,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -119,7 +118,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: registry.NewSinkFactory(),
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -150,7 +149,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -183,7 +182,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -218,7 +217,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -243,6 +242,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -253,7 +253,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -277,6 +277,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -287,7 +288,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -318,6 +319,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -328,7 +330,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -358,6 +360,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -368,7 +371,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -400,6 +403,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(errors.New("some error"))
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -410,7 +414,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -442,6 +446,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(errors.New("some error"))
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -452,7 +457,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
StopOnSinkError: true,
})
run := r.Run(validRecipe)
Expand Down Expand Up @@ -485,6 +490,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -495,7 +501,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -528,6 +534,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -544,7 +551,7 @@ func TestRunnerRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Monitor: monitor,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -579,6 +586,7 @@ func TestRunnerRun(t *testing.T) {
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(plugins.NewRetryError(err)).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -589,7 +597,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
Expand Down Expand Up @@ -628,6 +636,7 @@ func TestRunnerRunMultiple(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil)
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -638,7 +647,7 @@ func TestRunnerRunMultiple(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
runs := r.RunMultiple(recipeList)

Expand Down
14 changes: 13 additions & 1 deletion agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type subscriber struct {
type stream struct {
middlewares []streamMiddleware
subscribers []*subscriber
onCloses []func()
closed bool
err error
}
Expand All @@ -38,6 +39,13 @@ func (s *stream) subscribe(callback func(batchedData []models.Record) error, bat
return s
}

// onClose() is used to register callback for after stream is closed.
func (s *stream) onClose(callback func()) *stream {
s.onCloses = append(s.onCloses, callback)

return s
}

// broadcast() will start listening to emitter for any pushed data.
// This process is blocking, so most times you would want to call this inside a goroutine.
func (s *stream) broadcast() error {
Expand All @@ -55,7 +63,7 @@ func (s *stream) broadcast() error {
batch := newBatch(l.batchSize)
// listen to channel and emit data to subscriber callback if batch is full
for d := range l.channel {
if err := batch.add(d); err != nil {
if err := batch.add(d); err != nil {
s.closeWithError(err)
}
if batch.isFull() {
Expand Down Expand Up @@ -116,6 +124,10 @@ func (s *stream) Close() {
close(l.channel)
}
s.closed = true

for _, onClose := range s.onCloses {
onClose()
}
}

func (s *stream) runMiddlewares(d models.Record) (res models.Record, err error) {
Expand Down
6 changes: 3 additions & 3 deletions plugins/extractors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package bigquery_test

import (
"context"
"github.com/odpf/meteor/test/utils"
"testing"

"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/plugins/extractors/bigquery"
"github.com/odpf/meteor/test"
"github.com/stretchr/testify/assert"
)

func TestInit(t *testing.T) {
t.Run("should return error if config is invalid", func(t *testing.T) {
extr := bigquery.New(test.Logger)
extr := bigquery.New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, map[string]interface{}{
Expand All @@ -24,7 +24,7 @@ func TestInit(t *testing.T) {
assert.Equal(t, plugins.InvalidConfigError{}, err)
})
t.Run("should not return invalid config error if config is valid", func(t *testing.T) {
extr := bigquery.New(test.Logger)
extr := bigquery.New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, map[string]interface{}{
Expand Down
Loading

0 comments on commit 97e3e44

Please sign in to comment.