Skip to content

Commit

Permalink
feat(agent): add retry mechanism (#245)
Browse files Browse the repository at this point in the history
* refactor: move Config from agent to config package

* refactor(agent): create a config struct to store agent's configurations

* feat(agent): add retry mechanism for sink

* feat(agent): log retry attempts

* feat(agent): change retry interval strategy to become exponential

* feat: make agent retry configurable

* feat(columbus): retry if columbus returns 5** status code

* feat(agent): skip record instead of returning error on failed sink

* refactor: change RetryTimes to MaxRetries

* refactor(agent): move retry func to a struct

* feat(agent): make skipping record on sink error configurable
  • Loading branch information
StewartJingga authored Oct 4, 2021
1 parent b343574 commit 93e110a
Show file tree
Hide file tree
Showing 13 changed files with 400 additions and 55 deletions.
42 changes: 32 additions & 10 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,26 @@ type Agent struct {
sinkFactory *registry.SinkFactory
monitor Monitor
logger log.Logger
retrier *retrier
stopOnSinkError bool
}

// NewAgent returns an Agent with plugin factories.
func NewAgent(ef *registry.ExtractorFactory, pf *registry.ProcessorFactory, sf *registry.SinkFactory, mt Monitor, logger log.Logger) *Agent {
func NewAgent(config Config) *Agent {
mt := config.Monitor
if isNilMonitor(mt) {
mt = new(defaultMonitor)
}

retrier := newRetrier(config.MaxRetries, config.RetryInitialInterval)
return &Agent{
extractorFactory: ef,
processorFactory: pf,
sinkFactory: sf,
extractorFactory: config.ExtractorFactory,
processorFactory: config.ProcessorFactory,
sinkFactory: config.SinkFactory,
stopOnSinkError: config.StopOnSinkError,
monitor: mt,
logger: logger,
logger: config.Logger,
retrier: retrier,
}
}

Expand Down Expand Up @@ -234,14 +241,29 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *str
return
}

stream.subscribe(func(records []models.Record) (err error) {
err = sink.Sink(ctx, records)
retryNotification := func(e error, d time.Duration) {
r.logger.Info(
fmt.Sprintf("retrying sink in %d", d),
"sink", sr.Name,
"error", e.Error())
}
stream.subscribe(func(records []models.Record) error {
err := r.retrier.retry(func() error {
err := sink.Sink(ctx, records)
return err
}, retryNotification)

// error (after exhausted retries) will just be skipped and logged
if err != nil {
err = errors.Wrapf(err, "error running sink \"%s\"", sr.Name)
return
r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error())
if !r.stopOnSinkError {
err = nil
}
}

return
// TODO: create a new error to signal stopping stream.
// returning nil so stream wont stop.
return err
}, defaultBatchSize)

return
Expand Down
188 changes: 172 additions & 16 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/odpf/meteor/agent"
"github.com/odpf/meteor/models"
Expand Down Expand Up @@ -52,7 +53,12 @@ var finalData = []models.Record{

func TestRunnerRun(t *testing.T) {
t.Run("should return run", func(t *testing.T) {
r := agent.NewAgent(registry.NewExtractorFactory(), registry.NewProcessorFactory(), registry.NewSinkFactory(), nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: registry.NewExtractorFactory(),
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: registry.NewSinkFactory(),
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.IsType(t, agent.Run{}, run)
assert.Equal(t, validRecipe, run.Recipe)
Expand All @@ -67,7 +73,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(registry.NewExtractorFactory(), pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: registry.NewExtractorFactory(),
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand All @@ -83,7 +94,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, registry.NewProcessorFactory(), sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand All @@ -101,7 +117,12 @@ func TestRunnerRun(t *testing.T) {
pf := registry.NewProcessorFactory()
pf.Register("test-processor", newProcessor(proc))

r := agent.NewAgent(ef, pf, registry.NewSinkFactory(), nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: registry.NewSinkFactory(),
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand All @@ -121,7 +142,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand All @@ -143,7 +169,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand All @@ -167,7 +198,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand All @@ -191,7 +227,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand All @@ -214,7 +255,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand Down Expand Up @@ -244,7 +290,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand Down Expand Up @@ -273,12 +324,17 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})

t.Run("should return error when sink fails", func(t *testing.T) {
t.Run("should not return error when sink fails", func(t *testing.T) {
data := []models.Record{
models.NewRecord(&assets.Table{}),
}
Expand All @@ -304,7 +360,49 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
})

t.Run("should return error when sink fails if StopOnSinkError is true", func(t *testing.T) {
data := []models.Record{
models.NewRecord(&assets.Table{}),
}

extr := mocks.NewExtractor()
extr.SetEmit(data)
extr.On("Init", mockCtx, validRecipe.Source.Config).Return(nil).Once()
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil)
ef := registry.NewExtractorFactory()
ef.Register("test-extractor", newExtractor(extr))

proc := mocks.NewProcessor()
proc.On("Init", mockCtx, validRecipe.Processors[0].Config).Return(nil).Once()
proc.On("Process", mockCtx, data[0]).Return(data[0], nil)
defer proc.AssertExpectations(t)
pf := registry.NewProcessorFactory()
pf.Register("test-processor", newProcessor(proc))

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(errors.New("some error"))
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
StopOnSinkError: true,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})
Expand Down Expand Up @@ -335,7 +433,12 @@ func TestRunnerRun(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
assert.Equal(t, validRecipe, run.Recipe)
Expand Down Expand Up @@ -372,7 +475,55 @@ func TestRunnerRun(t *testing.T) {
monitor.On("RecordRun", monitor_run).Once()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(ef, pf, sf, monitor, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Monitor: monitor,
Logger: test.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
assert.Equal(t, validRecipe, run.Recipe)
})

t.Run("should retry if sink returns retry error", func(t *testing.T) {
err := errors.New("some-error")
data := []models.Record{
models.NewRecord(&assets.Table{}),
}

extr := mocks.NewExtractor()
extr.SetEmit(data)
extr.On("Init", mockCtx, validRecipe.Source.Config).Return(nil).Once()
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil)
ef := registry.NewExtractorFactory()
ef.Register("test-extractor", newExtractor(extr))

proc := mocks.NewProcessor()
proc.On("Init", mockCtx, validRecipe.Processors[0].Config).Return(nil).Once()
proc.On("Process", mockCtx, data[0]).Return(data[0], nil)
defer proc.AssertExpectations(t)
pf := registry.NewProcessorFactory()
pf.Register("test-processor", newProcessor(proc))

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(plugins.NewRetryError(err)).Once()
sink.On("Sink", mockCtx, data).Return(plugins.NewRetryError(err)).Once()
sink.On("Sink", mockCtx, data).Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
assert.Equal(t, validRecipe, run.Recipe)
Expand Down Expand Up @@ -408,7 +559,12 @@ func TestRunnerRunMultiple(t *testing.T) {
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
})
runs := r.RunMultiple(recipeList)

assert.Len(t, runs, len(recipeList))
Expand Down
20 changes: 15 additions & 5 deletions agent/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package agent

// Config contains the configuration for the agent.
import (
"time"

"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
)

type Config struct {
LogLevel string `mapstructure:"LOG_LEVEL" default:"info"`
StatsdEnabled bool `mapstructure:"STATSD_ENABLED" default:"false"`
StatsdHost string `mapstructure:"STATSD_HOST" default:"localhost:8125"`
StatsdPrefix string `mapstructure:"STATSD_PREFIX" default:"meteor"`
ExtractorFactory *registry.ExtractorFactory
ProcessorFactory *registry.ProcessorFactory
SinkFactory *registry.SinkFactory
Monitor Monitor
Logger log.Logger
MaxRetries int
RetryInitialInterval time.Duration
StopOnSinkError bool
}
Loading

0 comments on commit 93e110a

Please sign in to comment.