From 811125058ffbafff5bfc74019519743e02d0be23 Mon Sep 17 00:00:00 2001 From: Ravi Suhag Date: Thu, 31 Oct 2024 13:23:31 +0530 Subject: [PATCH] chore: StatsD code removal --- agent/agent.go | 13 +- agent/agent_test.go | 56 +++---- agent/config.go | 2 +- cmd/run.go | 21 +-- config/config.go | 2 - config/config_test.go | 4 - config/meteor.yaml.sample | 2 - config/testdata/invalid-config.yaml | 2 +- config/testdata/valid-config.yaml | 5 +- go.mod | 1 - go.sum | 2 - metrics/statsd.go | 103 ------------ metrics/statsd_test.go | 244 ---------------------------- 13 files changed, 38 insertions(+), 419 deletions(-) delete mode 100644 metrics/statsd.go delete mode 100644 metrics/statsd_test.go diff --git a/agent/agent.go b/agent/agent.go index b685bfcae..3f3752a93 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -27,7 +27,7 @@ type Agent struct { extractorFactory *registry.ExtractorFactory processorFactory *registry.ProcessorFactory sinkFactory *registry.SinkFactory - monitor []Monitor + monitor Monitor logger log.Logger retrier *retrier stopOnSinkError bool @@ -279,8 +279,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } retryNotification := func(e error, d time.Duration) { - for _, mt := range r.monitor { - mt.RecordSinkRetryCount(ctx, pluginInfo) + if r.monitor != nil { + r.monitor.RecordSinkRetryCount(ctx, pluginInfo) } r.logger.Warn( @@ -302,9 +302,6 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s ) pluginInfo.Success = err == nil - for _, mt := range r.monitor { - mt.RecordPlugin(ctx, pluginInfo) // this can be deleted when statsd is removed - } if err != nil { // once it reaches here, it means that the retry has been exhausted and still got error r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error()) @@ -328,8 +325,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } func (r *Agent) logAndRecordMetrics(ctx context.Context, run Run) { - for _, monitor := range r.monitor { - monitor.RecordRun(ctx, run) + if r.monitor != nil { + r.monitor.RecordRun(ctx, run) } if run.Success { diff --git a/agent/agent_test.go b/agent/agent_test.go index 3394057fb..149acba48 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -82,7 +82,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -113,7 +113,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -146,7 +146,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: registry.NewSinkFactory(), Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -183,7 +183,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -222,7 +222,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -263,7 +263,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -305,7 +305,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -346,7 +346,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -394,7 +394,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -441,7 +441,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -483,7 +483,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -491,7 +491,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.True(t, run.Success) @@ -533,7 +533,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -542,7 +542,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: true, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) @@ -585,7 +585,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -594,7 +594,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: false, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) @@ -649,7 +649,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -657,7 +657,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.NoError(t, run.Error) @@ -706,14 +706,14 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, Logger: utils.Logger, TimerFn: timerFn, }) @@ -761,7 +761,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -769,7 +769,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -815,7 +815,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() monitor.On("RecordSinkRetryCount", mockCtx, mock.AnythingOfType("agent.PluginInfo")) defer monitor.AssertExpectations(t) @@ -824,7 +824,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -884,7 +884,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 5, RetryInitialInterval: 10 * time.Second, }) @@ -1057,7 +1057,7 @@ func TestAgentRunMultiple(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")) - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -1065,7 +1065,7 @@ func TestAgentRunMultiple(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) runs := r.RunMultiple(ctx, recipeList) @@ -1152,7 +1152,7 @@ func TestValidate(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{newMockMonitor()}, + Monitor: newMockMonitor(), }) var expectedErrs []error diff --git a/agent/config.go b/agent/config.go index 879971f62..879b64e0e 100644 --- a/agent/config.go +++ b/agent/config.go @@ -11,7 +11,7 @@ type Config struct { ExtractorFactory *registry.ExtractorFactory ProcessorFactory *registry.ProcessorFactory SinkFactory *registry.SinkFactory - Monitor []Monitor + Monitor Monitor Logger log.Logger MaxRetries int RetryInitialInterval time.Duration diff --git a/cmd/run.go b/cmd/run.go index d544f0296..b61087249 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -70,16 +70,7 @@ func RunCmd() *cobra.Command { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - var mts []agent.Monitor - if cfg.StatsdEnabled { - mt, err := newStatsdMonitor(cfg) - if err != nil { - return err - } - - mts = append(mts, mt) - - } + var mts agent.Monitor if cfg.OtelEnabled { doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version) @@ -88,7 +79,7 @@ func RunCmd() *cobra.Command { } defer doneOtlp() - mts = append(mts, metrics.NewOtelMonitor()) + mts = metrics.NewOtelMonitor() } runner := agent.NewAgent(agent.Config{ @@ -156,11 +147,3 @@ func RunCmd() *cobra.Command { return cmd } - -func newStatsdMonitor(cfg config.Config) (*metrics.StatsdMonitor, error) { - client, err := metrics.NewStatsdClient(cfg.StatsdHost) - if err != nil { - return nil, err - } - return metrics.NewStatsdMonitor(client, cfg.AppName), nil -} diff --git a/config/config.go b/config/config.go index c26eaf998..cc2009e70 100644 --- a/config/config.go +++ b/config/config.go @@ -14,8 +14,6 @@ type Config struct { MaxRetries int `mapstructure:"MAX_RETRIES" default:"5"` RetryInitialIntervalSeconds int `mapstructure:"RETRY_INITIAL_INTERVAL_SECONDS" default:"5"` StopOnSinkError bool `mapstructure:"STOP_ON_SINK_ERROR" default:"false"` - StatsdEnabled bool `mapstructure:"STATSD_ENABLED" default:"false"` - StatsdHost string `mapstructure:"STATSD_HOST" default:"localhost:8125"` OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"` OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"` OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"` diff --git a/config/config_test.go b/config/config_test.go index 3f8b33c1d..d28afe9cc 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -25,8 +25,6 @@ func TestLoad(t *testing.T) { expected: config.Config{ AppName: "meteor", LogLevel: "info", - StatsdEnabled: false, - StatsdHost: "localhost:8125", OtelEnabled: false, OtelCollectorAddr: "localhost:4317", OtelTraceSampleProbability: 1, @@ -43,8 +41,6 @@ func TestLoad(t *testing.T) { expected: config.Config{ AppName: "meteor", LogLevel: "info", - StatsdEnabled: false, - StatsdHost: "localhost:8125", OtelEnabled: false, OtelCollectorAddr: "localhost:4317", OtelTraceSampleProbability: 1, diff --git a/config/meteor.yaml.sample b/config/meteor.yaml.sample index cc580e041..f660860ba 100644 --- a/config/meteor.yaml.sample +++ b/config/meteor.yaml.sample @@ -1,6 +1,4 @@ LOG_LEVEL: info -STATSD_ENABLED: false -STATSD_HOST: "localhost:8125" MAX_RETRIES: 5 RETRY_INITIAL_INTERVAL_SECONDS: 5 STOP_ON_SINK_ERROR: false diff --git a/config/testdata/invalid-config.yaml b/config/testdata/invalid-config.yaml index fb63b1b38..e756900e4 100755 --- a/config/testdata/invalid-config.yaml +++ b/config/testdata/invalid-config.yaml @@ -1 +1 @@ -STATSD_ENABLED: not-a-boolean \ No newline at end of file +OTEL_ENABLED: not-a-boolean diff --git a/config/testdata/valid-config.yaml b/config/testdata/valid-config.yaml index b2edb1b3b..87e48bb8d 100755 --- a/config/testdata/valid-config.yaml +++ b/config/testdata/valid-config.yaml @@ -1,7 +1,4 @@ LOG_LEVEL: info -STATSD_ENABLED: false -STATSD_HOST: "localhost:8125" -STATSD_PREFIX: meteor MAX_RETRIES: 5 RETRY_INITIAL_INTERVAL_SECONDS: 5 -STOP_ON_SINK_ERROR: false \ No newline at end of file +STOP_ON_SINK_ERROR: false diff --git a/go.mod b/go.mod index 1909118cb..d40af98c9 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,6 @@ require ( github.com/dnaeon/go-vcr/v2 v2.0.1 github.com/elastic/go-elasticsearch v0.0.0 github.com/elastic/go-elasticsearch/v8 v8.0.0-20210708134649-33f644c8e327 - github.com/etsy/statsd v0.9.0 github.com/go-kivik/couchdb v2.0.0+incompatible github.com/go-kivik/kivik v2.0.0+incompatible github.com/go-playground/validator/v10 v10.10.0 diff --git a/go.sum b/go.sum index b0497e416..02f6d872e 100644 --- a/go.sum +++ b/go.sum @@ -566,8 +566,6 @@ github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/etsy/statsd v0.9.0 h1:GLP1pAzn1fGE7/kM2S5QXSU0ZTUV6QnZsyZVMx7IVF4= -github.com/etsy/statsd v0.9.0/go.mod h1:rmx2gVm1TEkQUIcU/KAM4prmC/AAUU8Wndeule9gvW4= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= diff --git a/metrics/statsd.go b/metrics/statsd.go deleted file mode 100644 index 5671afc6c..000000000 --- a/metrics/statsd.go +++ /dev/null @@ -1,103 +0,0 @@ -package metrics - -import ( - "context" - "fmt" - "net" - "strconv" - - statsd "github.com/etsy/statsd/examples/go" - "github.com/raystack/meteor/agent" - "github.com/raystack/meteor/recipe" -) - -const ( - runDurationMetricName = "runDuration" - runRecordCountMetricName = "runRecordCount" - runMetricName = "run" - pluginRunMetricName = "runPlugin" -) - -// StatsdMonitor represents the statsd monitor. -type StatsdMonitor struct { - client statsdClient - prefix string -} - -// NewStatsdMonitor creates a new StatsdMonitor -func NewStatsdMonitor(client statsdClient, prefix string) *StatsdMonitor { - return &StatsdMonitor{ - client: client, - prefix: prefix, - } -} - -// RecordRun records a run behavior -func (m *StatsdMonitor) RecordRun(_ context.Context, run agent.Run) { - m.client.Timing( - m.createMetricName(runDurationMetricName, run.Recipe, run.Success), - int64(run.DurationInMs), - ) - m.client.Increment( - m.createMetricName(runMetricName, run.Recipe, run.Success), - ) - m.client.IncrementByValue( - m.createMetricName(runRecordCountMetricName, run.Recipe, run.Success), - run.RecordCount, - ) -} - -// RecordPlugin records a individual plugin behavior in a run -func (m *StatsdMonitor) RecordPlugin(_ context.Context, pluginInfo agent.PluginInfo) { - m.client.Increment( - fmt.Sprintf( - "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", - m.prefix, - pluginRunMetricName, - pluginInfo.RecipeName, - pluginInfo.PluginName, - pluginInfo.PluginType, - pluginInfo.Success, - ), - ) -} - -func (*StatsdMonitor) RecordSinkRetryCount(context.Context, agent.PluginInfo) {} - -// createMetricName creates a metric name for a given recipe and success -func (m *StatsdMonitor) createMetricName(metricName string, recipe recipe.Recipe, success bool) string { - successText := "false" - if success { - successText = "true" - } - - return fmt.Sprintf( - "%s.%s,name=%s,success=%s,extractor=%s", - m.prefix, - metricName, - recipe.Name, - successText, - recipe.Source.Name, - ) -} - -type statsdClient interface { - Timing(string, int64) - Increment(string) - IncrementByValue(string, int) -} - -// NewStatsdClient returns a new statsd client if the given address is valid -func NewStatsdClient(statsdAddress string) (*statsd.StatsdClient, error) { - host, portStr, err := net.SplitHostPort(statsdAddress) - if err != nil { - return nil, fmt.Errorf("split the network address: %w", err) - } - - port, err := strconv.Atoi(portStr) - if err != nil { - return nil, fmt.Errorf("convert port type: %w", err) - } - - return statsd.New(host, port), nil -} diff --git a/metrics/statsd_test.go b/metrics/statsd_test.go deleted file mode 100644 index 6318d8a21..000000000 --- a/metrics/statsd_test.go +++ /dev/null @@ -1,244 +0,0 @@ -package metrics_test - -import ( - "context" - "fmt" - "log" - "os" - "testing" - - "github.com/ory/dockertest/v3" - "github.com/ory/dockertest/v3/docker" - "github.com/raystack/meteor/agent" - "github.com/raystack/meteor/metrics" - "github.com/raystack/meteor/recipe" - "github.com/raystack/meteor/test/utils" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -type mockStatsdClient struct { - mock.Mock -} - -func (c *mockStatsdClient) Timing(name string, val int64) { - c.Called(name, val) -} - -func (c *mockStatsdClient) IncrementByValue(name string, val int) { - c.Called(name, val) -} - -func (c *mockStatsdClient) Increment(name string) { - c.Called(name) -} - -var port = "8125" - -func TestMain(m *testing.M) { - // setup test - opts := dockertest.RunOptions{ - Repository: "statsd/statsd", - Platform: "linux/amd64", - Tag: "latest", - Env: []string{ - "MYSQL_ALLOW_EMPTY_PASSWORD=true", - }, - ExposedPorts: []string{"8125", port}, - PortBindings: map[docker.Port][]docker.PortBinding{ - "8125": { - {HostIP: "0.0.0.0", HostPort: port}, - }, - }, - } - // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - retryFn := func(resource *dockertest.Resource) error { - c, err := metrics.NewStatsdClient("127.0.0.1:" + port) - if err != nil { - return err - } - c.Open() - return nil - } - purgeFn, err := utils.CreateContainer(opts, retryFn) - if err != nil { - log.Fatal(err) - } - - // run tests - code := m.Run() - - // clean tests - if err := purgeFn(); err != nil { - log.Fatal(err) - } - os.Exit(code) -} - -func TestStatsdMonitorRecordRun(t *testing.T) { - statsdPrefix := "testprefix" - - t.Run("should create metrics with the correct name and value", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "mysql", - }, - } - duration := 100 - recordCount := 2 - timingMetric := fmt.Sprintf( - "%s.runDuration,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "false", - recipe.Source.Name, - ) - incrementMetric := fmt.Sprintf( - "%s.run,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "false", - recipe.Source.Name, - ) - recordIncrementMetric := fmt.Sprintf( - "%s.runRecordCount,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "false", - recipe.Source.Name, - ) - - client := new(mockStatsdClient) - client.On("Timing", timingMetric, int64(duration)) - client.On("Increment", incrementMetric) - client.On("IncrementByValue", recordIncrementMetric, recordCount) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordRun(context.Background(), agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: false}) - }) - - t.Run("should set success field to true on success", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "bigquery", - }, - } - duration := 100 - recordCount := 2 - timingMetric := fmt.Sprintf( - "%s.runDuration,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "true", - recipe.Source.Name, - ) - incrementMetric := fmt.Sprintf( - "%s.run,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "true", - recipe.Source.Name, - ) - recordIncrementMetric := fmt.Sprintf( - "%s.runRecordCount,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "true", - recipe.Source.Name, - ) - - client := new(mockStatsdClient) - client.On("Timing", timingMetric, int64(duration)) - client.On("Increment", incrementMetric) - client.On("IncrementByValue", recordIncrementMetric, recordCount) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordRun(context.Background(), agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: true}) - }) -} - -func TestStatsdMonitorRecordPlugin(t *testing.T) { - statsdPrefix := "testprefix" - - t.Run("should create metrics with the correct name and value", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "mysql", - }, - Sinks: []recipe.PluginRecipe{ - {Name: "test-sink"}, - }, - } - incrementMetric := fmt.Sprintf( - "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", - statsdPrefix, - "runPlugin", - recipe.Name, - recipe.Sinks[0].Name, - "sink", - false, - ) - - client := new(mockStatsdClient) - client.On("Increment", incrementMetric) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordPlugin(context.Background(), agent.PluginInfo{ - RecipeName: recipe.Name, - PluginName: recipe.Sinks[0].Name, - PluginType: "sink", - Success: false, - }) - }) - - t.Run("should set success field to true on success", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "bigquery", - }, - Sinks: []recipe.PluginRecipe{ - {Name: "test-sink"}, - }, - } - incrementMetric := fmt.Sprintf( - "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", - statsdPrefix, - "runPlugin", - recipe.Name, - recipe.Sinks[0].Name, - "sink", - true, - ) - - client := new(mockStatsdClient) - client.On("Increment", incrementMetric) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordPlugin(context.Background(), - agent.PluginInfo{ - RecipeName: recipe.Name, - PluginName: recipe.Sinks[0].Name, - PluginType: "sink", - Success: true, - }) - }) -} - -func TestNewStatsClient(t *testing.T) { - t.Run("should return error for invalid address", func(t *testing.T) { - _, err := metrics.NewStatsdClient("127.0.0.1") - assert.Error(t, err) - }) - t.Run("should return error for invalid port", func(t *testing.T) { - _, err := metrics.NewStatsdClient("127.0.0.1:81A5") - assert.Error(t, err) - }) -}