From 36add17ab9ba4f5ad45653eebb519fd93fea6e9d Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 25 Jan 2024 14:51:18 -0500 Subject: [PATCH 01/10] added tests for main and metrics - updated code to get tests to pass --- config.go | 3 + http.go | 16 ++-- httpClient.go | 6 +- http_test.go | 34 ++++---- main.go | 1 + main_test.go | 197 +++++++++++++++++++++++++++++----------------- metrics.go | 30 ++++--- metrics_test.go | 8 ++ outboundSender.go | 24 +++--- routes.go | 9 +-- senderWrapper.go | 22 +++--- 11 files changed, 205 insertions(+), 145 deletions(-) diff --git a/config.go b/config.go index 31fba38d..a386e2c7 100644 --- a/config.go +++ b/config.go @@ -219,4 +219,7 @@ var defaultConfig = Config{ Tracing: candlelight.Config{ ApplicationName: applicationName, }, + Sender: SenderConfig{ + Linger: time.Duration(3 * time.Minute), + }, } diff --git a/http.go b/http.go index cebdd680..ebae1afb 100644 --- a/http.go +++ b/http.go @@ -40,20 +40,20 @@ type ServerHandler struct { } type HandlerTelemetryIn struct { fx.In - ErrorRequests prometheus.Counter `name:"error_request_body_counter"` - EmptyRequests prometheus.Counter `name:"empty_request_boyd_counter"` - InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"` - IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"` - ModifiedWRPCount prometheus.CounterVec `name:"modified_wrp_count"` - IncomingQueueLatency prometheus.HistogramVec `name:"incoming_queue_latency_histogram_seconds"` + ErrorRequests prometheus.Counter `name:"error_request_body_count"` + EmptyRequests prometheus.Counter `name:"empty_request_body_count"` + InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"` + IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"` + ModifiedWRPCount *prometheus.CounterVec `name:"modified_wrp_count"` + IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"` } type HandlerTelemetry struct { errorRequests prometheus.Counter emptyRequests prometheus.Counter invalidCount prometheus.Counter incomingQueueDepthMetric prometheus.Gauge - modifiedWRPCount prometheus.CounterVec - incomingQueueLatency prometheus.HistogramVec + modifiedWRPCount *prometheus.CounterVec + incomingQueueLatency prometheus.ObserverVec } func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { diff --git a/httpClient.go b/httpClient.go index e478bb44..1ce85498 100644 --- a/httpClient.go +++ b/httpClient.go @@ -33,15 +33,15 @@ func (d doerFunc) Do(req *http.Request) (*http.Response, error) { type metricWrapper struct { now func() time.Time - queryLatency prometheus.HistogramVec + queryLatency prometheus.ObserverVec id string } -func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) { +func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { if now == nil { now = time.Now } - if queryLatency.MetricVec == nil { + if queryLatency == nil { return nil, errNilHistogram } return &metricWrapper{ diff --git a/http_test.go b/http_test.go index 755d8637..3cf5895e 100644 --- a/http_test.go +++ b/http_test.go @@ -1,5 +1,5 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 +// // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// // SPDX-License-Identifier: Apache-2.0 package main // import ( @@ -13,8 +13,8 @@ package main // "github.com/stretchr/testify/assert" // "github.com/stretchr/testify/mock" +// "go.uber.org/zap/zaptest" -// "github.com/xmidt-org/webpa-common/v2/adapter" // "github.com/xmidt-org/wrp-go/v3" // ) @@ -88,7 +88,7 @@ package main // for _, tc := range tcs { // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // if !tc.throwStatusBadRequest { // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), @@ -112,7 +112,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // errorRequests: fakeErrorRequests, // emptyRequests: fakeEmptyRequests, @@ -145,7 +145,7 @@ package main // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).Return().Once() @@ -172,7 +172,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // errorRequests: fakeErrorRequests, // emptyRequests: fakeEmptyRequests, @@ -206,7 +206,7 @@ package main // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) @@ -221,7 +221,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // incomingQueueDepthMetric: fakeQueueDepth, // maxOutstanding: 1, @@ -259,7 +259,7 @@ package main // req := httptest.NewRequest("POST", "localhost:8080", r) // req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) @@ -276,7 +276,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // emptyRequests: fakeEmptyRequests, // incomingQueueDepthMetric: fakeQueueDepth, @@ -314,7 +314,7 @@ package main // req := httptest.NewRequest("POST", "localhost:8080", r) // req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() @@ -331,7 +331,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // errorRequests: fakeErrorRequests, // incomingQueueDepthMetric: fakeQueueDepth, @@ -368,7 +368,7 @@ package main // req := httptest.NewRequest("POST", "localhost:8080", r) // req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() @@ -386,7 +386,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // invalidCount: fakeInvalidCount, // incomingQueueDepthMetric: fakeQueueDepth, @@ -420,13 +420,13 @@ package main // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeQueueDepth := new(mockGauge) // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // incomingQueueDepthMetric: fakeQueueDepth, // maxOutstanding: 1, diff --git a/main.go b/main.go index 185554a1..18525b81 100644 --- a/main.go +++ b/main.go @@ -140,6 +140,7 @@ func caduceus(arguments []string, run bool) error { touchstone.Provide(), touchhttp.Provide(), ProvideMetrics(), + ProvideSenderMetrics(), // ancla.ProvideMetrics(), //TODO: need to add back in once we fix the ancla/argus dependency issue ) diff --git a/main_test.go b/main_test.go index 2fc15712..61d71934 100644 --- a/main_test.go +++ b/main_test.go @@ -1,83 +1,132 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 +//SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC +//SPDX-License-Identifier: Apache-2.0 + package main import ( - "os" "testing" + + _ "github.com/goschtalt/goschtalt/pkg/typical" + _ "github.com/goschtalt/yaml-decoder" + _ "github.com/goschtalt/yaml-encoder" + "github.com/stretchr/testify/assert" + "github.com/xmidt-org/sallust" ) -func TestMain(m *testing.M) { - os.Exit(m.Run()) +func Test_provideCLI(t *testing.T) { + tests := []struct { + description string + args cliArgs + want CLI + exits bool + expectedErr error + }{ + { + description: "no arguments, everything works", + }, { + description: "dev mode", + args: cliArgs{"-d"}, + want: CLI{Dev: true}, + }, { + description: "invalid argument", + args: cliArgs{"-w"}, + exits: true, + }, { + description: "invalid argument", + args: cliArgs{"-d", "-w"}, + exits: true, + }, { + description: "help", + args: cliArgs{"-h"}, + exits: true, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) + + if tc.exits { + assert.Panics(func() { + _, _ = provideCLIWithOpts(tc.args, true) + }) + } else { + got, err := provideCLI(tc.args) + + assert.ErrorIs(err, tc.expectedErr) + want := tc.want + assert.Equal(&want, got) + } + }) + } +} + +func Test_caduceus(t *testing.T) { + tests := []struct { + description string + args []string + panic bool + expectedErr error + }{ + { + description: "show config and exit", + args: []string{"-s"}, + panic: true, + }, { + description: "show help and exit", + args: []string{"-h"}, + panic: true, + }, { + description: "do everything but run", + args: []string{"-f", "minimal.yaml"}, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) + + if tc.panic { + assert.Panics(func() { + _ = caduceus(tc.args, false) + }) + } else { + err := caduceus(tc.args, false) + + assert.ErrorIs(err, tc.expectedErr) + } + }) + } } -// func TestPrintVersionInfo(t *testing.T) { -// testCases := []struct { -// name string -// expectedOutput []string -// overrideValues func() -// lineCount int -// }{ -// { -// "default", -// []string{ -// "caduceus:", -// "version: \tundefined", -// "go version: \tgo", -// "built time: \tundefined", -// "git commit: \tundefined", -// "os/arch: \t", -// }, -// func() {}, -// 6, -// }, -// { -// "set values", -// []string{ -// "caduceus:", -// "version: \t1.0.0\n", -// "go version: \tgo", -// "built time: \tsome time\n", -// "git commit: \tgit sha\n", -// "os/arch: \t", -// }, -// func() { -// Version = "1.0.0" -// BuildTime = "some time" -// GitCommit = "git sha" -// }, -// 6, -// }, -// } -// for _, tc := range testCases { -// t.Run(tc.name, func(t *testing.T) { -// resetGlobals() -// tc.overrideValues() -// buf := &bytes.Buffer{} -// printVersionInfo(buf) -// count := 0 -// for { -// line, err := buf.ReadString(byte('\n')) -// if err != nil { -// break -// } -// assert.Contains(t, line, tc.expectedOutput[count]) -// if strings.Contains(line, "\t") { -// keyAndValue := strings.Split(line, "\t") -// // The value after the tab should have more than 2 characters -// // 1) the first character of the value and the new line -// assert.True(t, len(keyAndValue[1]) > 2) -// } -// count++ -// } -// assert.Equal(t, tc.lineCount, count) -// resetGlobals() -// }) -// } -// } +func Test_provideLogger(t *testing.T) { + tests := []struct { + description string + cli *CLI + cfg sallust.Config + expectedErr error + }{ + { + description: "validate empty config", + cfg: sallust.Config{}, + cli: &CLI{}, + }, { + description: "validate dev config", + cfg: sallust.Config{}, + cli: &CLI{Dev: true}, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) -// func resetGlobals() { -// Version = "undefined" -// BuildTime = "undefined" -// GitCommit = "undefined" -// } + got, err := provideLogger(tc.cli, tc.cfg) + + if tc.expectedErr == nil { + assert.NotNil(got) + assert.NoError(err) + return + } + assert.ErrorIs(err, tc.expectedErr) + assert.Nil(got) + }) + } +} diff --git a/metrics.go b/metrics.go index 0e6e0a48..4e39b1a4 100644 --- a/metrics.go +++ b/metrics.go @@ -50,20 +50,20 @@ const ( type SenderMetricsIn struct { fx.In - QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"` - EventType prometheus.CounterVec `name:"incoming_event_type_count"` - DeliveryCounter prometheus.CounterVec `name:"delivery_count"` - DeliveryRetryCounter prometheus.CounterVec `name:"DeliveryRetryCounter"` - DeliveryRetryMaxGauge prometheus.GaugeVec `name:"delivery_retry_max"` - CutOffCounter prometheus.CounterVec `name:"slow_consumer_cut_off_count"` - SlowConsumerDroppedMsgCounter prometheus.CounterVec `name:"slow_consumer_dropped_message_count"` - DropsDueToPanic prometheus.CounterVec `name:"drops_due_to_panic"` - ConsumerDeliverUntilGauge prometheus.GaugeVec `name:"consumer_deliver_until"` - ConsumerDropUntilGauge prometheus.GaugeVec `name:"consumer_drop_until"` - ConsumerDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers"` - ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers_max"` - OutgoingQueueDepth prometheus.GaugeVec `name:"outgoing_queue_depths"` - ConsumerRenewalTimeGauge prometheus.GaugeVec `name:"consumer_renewal_time"` + QueryLatency prometheus.ObserverVec `name:"query_duration_histogram_seconds"` + EventType *prometheus.CounterVec `name:"incoming_event_type_count"` + DeliveryCounter *prometheus.CounterVec `name:"delivery_count"` + DeliveryRetryCounter *prometheus.CounterVec `name:"delivery_retry_count"` + DeliveryRetryMaxGauge *prometheus.GaugeVec `name:"delivery_retry_max"` + CutOffCounter *prometheus.CounterVec `name:"slow_consumer_cut_off_count"` + SlowConsumerDroppedMsgCounter *prometheus.CounterVec `name:"slow_consumer_dropped_message_count"` + DropsDueToPanic *prometheus.CounterVec `name:"drops_due_to_panic"` + ConsumerDeliverUntilGauge *prometheus.GaugeVec `name:"consumer_deliver_until"` + ConsumerDropUntilGauge *prometheus.GaugeVec `name:"consumer_drop_until"` + ConsumerDeliveryWorkersGauge *prometheus.GaugeVec `name:"consumer_delivery_workers"` + ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec `name:"consumer_delivery_workers_max"` + OutgoingQueueDepth *prometheus.GaugeVec `name:"outgoing_queue_depths"` + ConsumerRenewalTimeGauge *prometheus.GaugeVec `name:"consumer_renewal_time"` } // TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called @@ -180,5 +180,3 @@ func ProvideSenderMetrics() fx.Option { }, ) } - - diff --git a/metrics_test.go b/metrics_test.go index 8092f9f0..db6c3ddf 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -32,3 +32,11 @@ func TestMetrics(t *testing.T) { assert.NotNil(m) } + +func TestProvideSenderMetrics(t *testing.T) { + assert := assert.New(t) + + m := ProvideSenderMetrics() + + assert.NotNil(m) +} diff --git a/outboundSender.go b/outboundSender.go index de75fd59..97b79b3a 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -65,18 +65,18 @@ type OutboundSenderFactory struct { } type OutboundSenderMetrics struct { - DeliveryCounter prometheus.CounterVec - DeliveryRetryCounter prometheus.CounterVec - DeliveryRetryMaxGauge prometheus.GaugeVec - CutOffCounter prometheus.CounterVec - SlowConsumerDroppedMsgCounter prometheus.CounterVec - DropsDueToPanic prometheus.CounterVec - ConsumerDeliverUntilGauge prometheus.GaugeVec - ConsumerDropUntilGauge prometheus.GaugeVec - ConsumerDeliveryWorkersGauge prometheus.GaugeVec - ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec - OutgoingQueueDepth prometheus.GaugeVec - ConsumerRenewalTimeGauge prometheus.GaugeVec + DeliveryCounter *prometheus.CounterVec + DeliveryRetryCounter *prometheus.CounterVec + DeliveryRetryMaxGauge *prometheus.GaugeVec + CutOffCounter *prometheus.CounterVec + SlowConsumerDroppedMsgCounter *prometheus.CounterVec + DropsDueToPanic *prometheus.CounterVec + ConsumerDeliverUntilGauge *prometheus.GaugeVec + ConsumerDropUntilGauge *prometheus.GaugeVec + ConsumerDeliveryWorkersGauge *prometheus.GaugeVec + ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec + OutgoingQueueDepth *prometheus.GaugeVec + ConsumerRenewalTimeGauge *prometheus.GaugeVec } // CaduceusOutboundSender is the outbound sender object. diff --git a/routes.go b/routes.go index 9bea1d3e..03369b81 100644 --- a/routes.go +++ b/routes.go @@ -68,18 +68,17 @@ func provideCoreOption(server string, in RoutesIn) arrangehttp.Option[http.Serve mux := chi.NewMux() - // TODO: should probably customize things a bit - mux.Use(recovery.Middleware(recovery.WithStatusCode(555))) - options := []otelmux.Option{ otelmux.WithTracerProvider(in.Tracing.TracerProvider()), otelmux.WithPropagators(in.Tracing.Propagator()), } + // TODO: should probably customize things a bit + mux.Use(recovery.Middleware(recovery.WithStatusCode(555)), otelmux.Middleware("server_primary", options...), + candlelight.EchoFirstTraceNodeInfo(in.Tracing.Propagator(), true)) + mux.Method("POST", urlPrefix+"/notify", in.Handler) if server == "primary" { - mux.Use(otelmux.Middleware("server_primary", options...), - candlelight.EchoFirstTraceNodeInfo(in.Tracing.Propagator(), true)) s.Handler = in.PrimaryMetrics.Then(mux) } else { s.Handler = in.AlternateMetrics.Then(mux) diff --git a/senderWrapper.go b/senderWrapper.go index 9dfa5e95..3e47f6d6 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -27,12 +27,11 @@ type CaduceusSenderWrapperIn struct { WrapperMetrics SenderWrapperMetrics OutbounderMetrics OutboundSenderMetrics Logger *zap.Logger - OutbounderFactory OutboundSenderFactory } type SenderWrapperMetrics struct { - QueryLatency prometheus.HistogramVec - EventType prometheus.CounterVec + QueryLatency prometheus.ObserverVec + EventType *prometheus.CounterVec } type SenderWrapper interface { @@ -69,8 +68,8 @@ type CaduceusSenderWrapper struct { mutex *sync.RWMutex senders map[string]OutboundSender - eventType prometheus.CounterVec - queryLatency prometheus.HistogramVec + eventType *prometheus.CounterVec + queryLatency prometheus.ObserverVec wg sync.WaitGroup shutdown chan struct{} @@ -111,17 +110,20 @@ func NewSenderWrapper(tr http.RoundTripper, in CaduceusSenderWrapperIn) (csw *Ca eventType: in.WrapperMetrics.EventType, queryLatency: in.WrapperMetrics.QueryLatency, } - - csw.outbounderSetUp.Config = in.SenderConfig - csw.outbounderSetUp.Logger = in.Logger - csw.outbounderSetUp.Metrics = in.OutbounderMetrics + obsSetUp := &OutboundSenderFactory{ + Config: in.SenderConfig, + Logger: in.Logger, + Metrics: in.OutbounderMetrics, + } + csw.outbounderSetUp = obsSetUp csw.sender = doerFunc((&http.Client{ Transport: tr, Timeout: in.SenderConfig.ClientTimeout, }).Do) if in.SenderConfig.Linger <= 0 { - err = errors.New("linger must be positive") + linger := fmt.Sprintf("linger not positive: %v", in.SenderConfig.Linger) + err = errors.New(linger) csw = nil return } From ab7a1b7f549a19d5f5890217dd8fc1accae5029f Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 25 Jan 2024 15:25:39 -0500 Subject: [PATCH 02/10] fixed formatting and copyright issues --- config.go | 2 +- http.go | 2 +- main_test.go | 4 ++-- metrics.go | 1 - primaryHandler.go | 4 ++-- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/config.go b/config.go index a386e2c7..eda9e714 100644 --- a/config.go +++ b/config.go @@ -220,6 +220,6 @@ var defaultConfig = Config{ ApplicationName: applicationName, }, Sender: SenderConfig{ - Linger: time.Duration(3 * time.Minute), + Linger: 3 * time.Minute, }, } diff --git a/http.go b/http.go index ebae1afb..c8207e94 100644 --- a/http.go +++ b/http.go @@ -143,7 +143,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() - sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(float64(endTime.Sub(startTime).Seconds())) + sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) } func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { diff --git a/main_test.go b/main_test.go index 61d71934..8e1c72b9 100644 --- a/main_test.go +++ b/main_test.go @@ -1,5 +1,5 @@ -//SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC -//SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 package main diff --git a/metrics.go b/metrics.go index 4e39b1a4..68494350 100644 --- a/metrics.go +++ b/metrics.go @@ -6,7 +6,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/fx" - // nolint:staticcheck "github.com/xmidt-org/touchstone" ) diff --git a/primaryHandler.go b/primaryHandler.go index 8b72931d..cb2d38e4 100644 --- a/primaryHandler.go +++ b/primaryHandler.go @@ -211,7 +211,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, endpoints = append(endpoints, r) } m := basculehelper.MetricValidator{ - C: basculehelper.CapabilitiesValidator{Checker: c}, + C: basculehelper.CapabilitiesValidator{Checker: c}, // Measures: capabilityCheckMeasures, Endpoints: endpoints, } @@ -227,7 +227,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, // basculehttp.WithEErrorResponseFunc(listener.OnErrorResponse), ) - authChain := alice.New(setLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later + authChain := alice.New(setLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later authChainLegacy := alice.New(setLogger(logger), authConstructorLegacy, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener) commenting for now in case needed later versionCompatibleAuth := alice.New(func(next http.Handler) http.Handler { From e480530e1c1602d5b47210e40510161b565c116f Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 25 Jan 2024 15:29:53 -0500 Subject: [PATCH 03/10] added licensing to listenerStub --- listenerStub.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/listenerStub.go b/listenerStub.go index 4803f681..f259eb56 100644 --- a/listenerStub.go +++ b/listenerStub.go @@ -1,3 +1,5 @@ +// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 package main import "time" From dcb7f0d3bcfd9d3214780fe90fc638af4b09d558 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Mon, 5 Feb 2024 10:13:00 -0500 Subject: [PATCH 04/10] updated to retry repo; added middleware for update request, updating counter/logging --- go.mod | 1 + go.sum | 2 ++ outboundSender.go | 60 +++++++++++++++++++++++++++++++++++------------ 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 36314c58..06753277 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/xmidt-org/candlelight v0.0.21 github.com/xmidt-org/clortho v0.0.4 github.com/xmidt-org/httpaux v0.4.0 + github.com/xmidt-org/retry v0.0.3 github.com/xmidt-org/sallust v0.2.2 github.com/xmidt-org/touchstone v0.1.3 github.com/xmidt-org/webpa-common/v2 v2.2.2 diff --git a/go.sum b/go.sum index 678d69eb..7eb947ea 100644 --- a/go.sum +++ b/go.sum @@ -1690,6 +1690,8 @@ github.com/xmidt-org/httpaux v0.2.1/go.mod h1:mviIlg5fHGb3lAv3l0sbiwVG/q9rqvXaud github.com/xmidt-org/httpaux v0.3.2/go.mod h1:qmlPisXf80FTi3y4gX43eYbCVruSQyvu+FPx1jzvQG8= github.com/xmidt-org/httpaux v0.4.0 h1:cAL/MzIBpSsv4xZZeq/Eu1J5M3vfNe49xr41mP3COKU= github.com/xmidt-org/httpaux v0.4.0/go.mod h1:UypqZwuZV1nn8D6+K1JDb+im9IZrLNg/2oO/Bgiybxc= +github.com/xmidt-org/retry v0.0.3 h1:wvmBnEEn1OKwSZaQtr1RZ2Vey8JIvP72mGTgR+3wPiM= +github.com/xmidt-org/retry v0.0.3/go.mod h1:I7FO3VVrxPckNuotwGYZIxfBnmjMSyOTitTKNL0VkIA= github.com/xmidt-org/sallust v0.1.5/go.mod h1:azcKBypudADIeZ3Em8zGjVq3yQ7n4ueSvM/degHMIxo= github.com/xmidt-org/sallust v0.2.0/go.mod h1:HCQQn7po8czynjxtNVyZ5vzWuTqJyJwnPWkQoqBX67s= github.com/xmidt-org/sallust v0.2.1/go.mod h1:68C0DLwD5xlhRznXTWmfUhx0etyrFpSOzYGU7jzmpzs= diff --git a/outboundSender.go b/outboundSender.go index 3117a935..52b4f8ed 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "net/http" + "net/url" "regexp" "strconv" "strings" @@ -20,11 +21,12 @@ import ( "sync/atomic" "time" - "github.com/xmidt-org/httpaux/retry" "go.uber.org/zap" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/retry" + "github.com/xmidt-org/retry/retryhttp" "github.com/xmidt-org/webpa-common/v2/semaphore" "github.com/xmidt-org/wrp-go/v3" "github.com/xmidt-org/wrp-go/v3/wrphttp" @@ -92,7 +94,7 @@ type CaduceusOutboundSender struct { deliveryRetries int deliveryInterval time.Duration deliveryCounter prometheus.CounterVec - deliveryRetryCounter prometheus.Counter + deliveryRetryCounter *prometheus.CounterVec droppedQueueFullCounter prometheus.Counter droppedCutoffCounter prometheus.Counter droppedExpiredCounter prometheus.Counter @@ -578,20 +580,13 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri // find the event "short name" event := msg.FindEventStringSubMatch() - /*TODO: need middleware for: - Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event) - Logger - Update Request - */ - retryConfig := retry.Config{ - Retries: obs.deliveryRetries, - Interval: obs.deliveryInterval, - } - // Send it obs.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) - - client := retry.New(retryConfig, obs.clientMiddleware(obs.sender)) + client, _ := retryhttp.NewClient( + retryhttp.WithHTTPClient(obs.clientMiddleware(obs.sender)), + retryhttp.WithRunner(obs.addRunner(req, event)), + retryhttp.WithRequesters(obs.updateRequest(urls)), + ) resp, err := client.Do(req) code := "failure" @@ -690,8 +685,43 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } } +func (obs *CaduceusOutboundSender) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { + //TODO: need to handle error + runner, _ := retry.NewRunner[*http.Response]( + retry.WithPolicyFactory[*http.Response](retry.Config{ + Interval: obs.deliveryInterval, + MaxRetries: obs.deliveryRetries, + }), + retry.WithOnAttempt[*http.Response](obs.onAttempt(request, event)), + ) + return runner + +} + +func (obs *CaduceusOutboundSender) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { + return func(request *http.Request) *http.Request { + urls = urls.Next() + tmp, err := url.Parse(urls.Value.(string)) + if err != nil { + obs.logger.Error("failed to update url", zap.String(UrlLabel, urls.Value.(string)), zap.Error(err)) + } + request.URL = tmp + return request + } +} + +func (obs *CaduceusOutboundSender) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { + + return func(attempt retry.Attempt[*http.Response]) { + obs.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: obs.id, EventLabel: event}).Add(1.0) + obs.logger.Debug("retrying HTTP transaction", zap.String("url", request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + + } + +} + func CreateOutbounderMetrics(m OutboundSenderMetrics, c *CaduceusOutboundSender) { - c.deliveryRetryCounter = m.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: c.id}) + c.deliveryRetryCounter = m.DeliveryRetryCounter c.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{UrlLabel: c.id}) c.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{UrlLabel: c.id}) c.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: "queue_full"}) From c9023c2be96fbd18eb0412417f40da9d4644f3dd Mon Sep 17 00:00:00 2001 From: maura fortino Date: Wed, 7 Feb 2024 10:22:04 -0500 Subject: [PATCH 05/10] updated listenerstub to reflect new schema --- config.go | 3 - go.mod | 2 + go.sum | 4 + listenerStub.go | 149 ++++++++++++++++++++++------- main_test.go | 2 +- outboundSender.go | 237 +++++++++++++++++++++++----------------------- senderWrapper.go | 3 +- 7 files changed, 244 insertions(+), 156 deletions(-) diff --git a/config.go b/config.go index eda9e714..31fba38d 100644 --- a/config.go +++ b/config.go @@ -219,7 +219,4 @@ var defaultConfig = Config{ Tracing: candlelight.Config{ ApplicationName: applicationName, }, - Sender: SenderConfig{ - Linger: 3 * time.Minute, - }, } diff --git a/go.mod b/go.mod index 06753277..02c5b94d 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/xmidt-org/retry v0.0.3 github.com/xmidt-org/sallust v0.2.2 github.com/xmidt-org/touchstone v0.1.3 + github.com/xmidt-org/webhook-schema v0.1.0 github.com/xmidt-org/webpa-common/v2 v2.2.2 github.com/xmidt-org/wrp-go/v3 v3.2.3 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1 @@ -87,6 +88,7 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/xmidt-org/chronon v0.1.1 // indirect + github.com/xmidt-org/urlegit v0.1.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect diff --git a/go.sum b/go.sum index 7eb947ea..90865caa 100644 --- a/go.sum +++ b/go.sum @@ -1705,6 +1705,10 @@ github.com/xmidt-org/touchstone v0.1.1/go.mod h1:7Rgqs44l1VndkvFUZewr8WpItzxfJSx github.com/xmidt-org/touchstone v0.1.2/go.mod h1:2xVJVO8FE393Aofw/FD8Cu9wXES4n1AlJP109Nk7/gg= github.com/xmidt-org/touchstone v0.1.3 h1:AtqUBa8U4lyVQj6eRZ9Uo8NShkWWq3StObRRkrv7N0Q= github.com/xmidt-org/touchstone v0.1.3/go.mod h1:vzFD11v5urS3RKVP0jDva/j9aHEjwVZCg3nNMdRSk6E= +github.com/xmidt-org/urlegit v0.1.0 h1:WZLlWo0e5JNZabLEi7/1+sK/np9qrH9XnoB+ZdsHieM= +github.com/xmidt-org/urlegit v0.1.0/go.mod h1:ih/VtgW3xfpV7FNIrHUpNdP0GapcfLOND8y0JwH51vA= +github.com/xmidt-org/webhook-schema v0.1.0 h1:QYutPymtGd6OvukVHTDwpQIEvmk5uNnN8CgbppS089A= +github.com/xmidt-org/webhook-schema v0.1.0/go.mod h1:x3G1lmhryIbr6QXLyzagVkcfY1ZhBxGlP0CdvRD3zZI= github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8= diff --git a/listenerStub.go b/listenerStub.go index f259eb56..a91c84a8 100644 --- a/listenerStub.go +++ b/listenerStub.go @@ -2,57 +2,140 @@ // SPDX-License-Identifier: Apache-2.0 package main -import "time" +import ( + "time" -//This is a stub for the ancla listener. This will be removed once we can add ancla back into caduceus + webhook "github.com/xmidt-org/webhook-schema" +) + +//This is a stub for the webhook and kafka listeners. This will be removed once the webhook-schema configuration is approved type ListenerStub struct { PartnerIds []string - Webhook Webhook + Webhook webhook.Registration } +// Webhook is a substructure with data related to event delivery. type Webhook struct { - // Address is the subscription request origin HTTP Address. - Address string `json:"registered_from_address"` + // Accept is the encoding type of outgoing events. The following encoding types are supported, otherwise + // a 406 response code is returned: application/octet-stream, application/json, application/jsonl, application/msgpack. + // Note: An `Accept` of application/octet-stream or application/json will result in a single response for batch sizes of 0 or 1 + // and batch sizes greater than 1 will result in a multipart response. An `Accept` of application/jsonl or application/msgpack + // will always result in a single response with a list of batched events for any batch size. + Accept string `json:"accept"` - // Config contains data to inform how events are delivered. - Config DeliveryConfig `json:"config"` + // AcceptEncoding is the content type of outgoing events. The following content types are supported, otherwise + // a 406 response code is returned: gzip. + AcceptEncoding string `json:"accept_encoding"` - // FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow. - // Optional, set to "" to disable notifications. - FailureURL string `json:"failure_url"` + // Secret is the string value. + // (Optional, set to "" to disable behavior). + Secret string `json:"secret,omitempty"` - // Events is the list of regular expressions to match an event type against. - Events []string `json:"events"` + // SecretHash is the hash algorithm to be used. Only sha256 HMAC and sha512 HMAC are supported. + // (Optional). + // The Default value is the largest sha HMAC supported, sha512 HMAC. + SecretHash string `json:"secret_hash"` - // Matcher type contains values to match against the metadata. - Matcher MetadataMatcherConfig `json:"matcher,omitempty"` + // If true, response will use the device content-type and wrp payload as its body + // Otherwise, response will Accecpt as the content-type and wrp message as its body + // Default: False (the entire wrp message is sent) + PayloadOnly bool `json:"payload_only"` - // Duration describes how long the subscription lasts once added. - Duration time.Duration `json:"duration"` + // ReceiverUrls is the list of receiver urls that will be used where as if the first url fails, + // then the second url would be used and so on. + // Note: either `ReceiverURLs` or `DNSSrvRecord` must be used but not both. + ReceiverURLs []string `json:"receiver_urls"` - // Until describes the time this subscription expires. - Until time.Time `json:"until"` + // DNSSrvRecord is the substructure for configuration related to load balancing. + // Note: either `ReceiverURLs` or `DNSSrvRecord` must be used but not both. + DNSSrvRecord struct { + // FQDNs is a list of FQDNs pointing to dns srv records + FQDNs []string `json:"fqdns"` + + // LoadBalancingScheme is the scheme to use for load balancing. Either the + // srv record attribute `weight` or `priortiy` can be used. + LoadBalancingScheme string `json:"load_balancing_scheme"` + } `json:"dns_srv_record"` } -// DeliveryConfig is a Webhook substructure with data related to event delivery. -type DeliveryConfig struct { - // URL is the HTTP URL to deliver messages to. - URL string `json:"url"` +// Kafka is a substructure with data related to event delivery. +type Kafka struct { + // Accept is content type value to set WRP messages to (unless already specified in the WRP). + Accept string `json:"accept"` - // ContentType is content type value to set WRP messages to (unless already specified in the WRP). - ContentType string `json:"content_type"` + // BootstrapServers is a list of kafka broker addresses. + BootstrapServers []string `json:"bootstrap_servers"` - // Secret is the string value for the SHA1 HMAC. - // (Optional, set to "" to disable behavior). - Secret string `json:"secret,omitempty"` + // TODO: figure out which kafka configuration substructures we want to expose to users (to be set by users) + // going to be based on https://pkg.go.dev/github.com/IBM/sarama#Config + // this substructures also includes auth related secrets, noted `MaxOpenRequests` will be excluded since it's already exposed + KafkaProducer struct{} `json:"kafka_producer"` +} - // AlternativeURLs is a list of explicit URLs that should be round robin through on failure cases to the main URL. - AlternativeURLs []string `json:"alt_urls,omitempty"` +type BatchHint struct { + // MaxLingerDuration is the maximum delay for batching if MaxMesasges has not been reached. + // Default value will set no maximum value. + MaxLingerDuration time.Duration `json:"max_linger_duration"` + // MaxMesasges is the maximum number of events that will be sent in a single batch. + // Default value will set no maximum value. + MaxMesasges int `json:"max_messages"` } -// MetadataMatcherConfig is Webhook substructure with config to match event metadata. -type MetadataMatcherConfig struct { - // DeviceID is the list of regular expressions to match device id type against. - DeviceID []string `json:"device_id"` +// FieldRegex is a substructure with data related to regular expressions. +type FieldRegex struct { + // Field is the wrp field to be used for regex. + // All wrp field can be used, refer to the schema for examples. + Field string `json:"field"` + + // FieldRegex is the regular expression to match `Field` against to. + Regex string `json:"regex"` +} + +type ContactInfo struct { + Name string `json:"name"` + Phone string `json:"phone"` + Email string `json:"email"` +} + +// RegistrationV2 is a special struct for unmarshaling sink information as part of a sink registration request. +type RegistrationV2 struct { + // ContactInfo contains contact information used to reach the owner of the registration. + // (Optional). + ContactInfo ContactInfo `json:"contact_info,omitempty"` + + // CanonicalName is the canonical name of the registration request. + // Reusing a CanonicalName will override the configurations set in that previous + // registration request with the same CanonicalName. + CanonicalName string `json:"canonical_name"` + + // Address is the subscription request origin HTTP Address. + Address string `json:"registered_from_address"` + + // Webhooks contains data to inform how events are delivered to multiple urls. + Webhooks []Webhook `json:"webhooks"` + + // Kafkas contains data to inform how events are delivered to multiple kafkas. + Kafkas []Kafka `json:"kafkas"` + + // Hash is a substructure for configuration related to distributing events among sinks. + // Note. Any failures due to a bad regex feild or regex expression will result in a silent failure. + Hash FieldRegex `json:"hash"` + + // BatchHint is the substructure for configuration related to event batching. + // (Optional, if omited then batches of singal events will be sent) + // Default value will disable batch. All zeros will also disable batch. + BatchHint BatchHint `json:"batch_hints"` + + // FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow. + // Optional, set to "" to disable notifications. + FailureURL string `json:"failure_url"` + + // Matcher is the list of regular expressions to match incoming events against to. + // Note. Any failures due to a bad regex feild or regex expression will result in a silent failure. + Matcher []FieldRegex `json:"matcher,omitempty"` + + // Expires describes the time this subscription expires. + // TODO: list of supported formats + Expires time.Time `json:"expires"` } diff --git a/main_test.go b/main_test.go index 8e1c72b9..2e138e31 100644 --- a/main_test.go +++ b/main_test.go @@ -77,7 +77,7 @@ func Test_caduceus(t *testing.T) { panic: true, }, { description: "do everything but run", - args: []string{"-f", "minimal.yaml"}, + args: []string{"-f", "caduceus.yaml"}, }, } for _, tc := range tests { diff --git a/outboundSender.go b/outboundSender.go index 52b4f8ed..259383eb 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "net/url" "regexp" @@ -42,11 +43,11 @@ const failureText = `Unfortunately, your endpoint is not able to keep up with th // FailureMessage is a helper that lets us easily create a json struct to send // when we have to cut and endpoint off. type FailureMessage struct { - Text string `json:"text"` - // Original ancla.InternalWebhook `json:"webhook_registration"` //TODO: add back in once ancla/argus dependency issue is fixed. - CutOffPeriod string `json:"cut_off_period"` - QueueSize int `json:"queue_size"` - Workers int `json:"worker_count"` + Text string `json:"text"` + Original ListenerStub `json:"webhook_registration"` //TODO: remove listener stub once ancla/argus issues fixed + CutOffPeriod string `json:"cut_off_period"` + QueueSize int `json:"queue_size"` + Workers int `json:"worker_count"` } type OutboundSender interface { @@ -82,9 +83,9 @@ type OutboundSenderMetrics struct { // CaduceusOutboundSender is the outbound sender object. type CaduceusOutboundSender struct { - id string - urls *ring.Ring - // listener ancla.InternalWebhook //TODO: add back in once ancla/argus dependency issue is fixed + id string + urls *ring.Ring + listener ListenerStub deliverUntil time.Time dropUntil time.Time sender httpClient @@ -152,7 +153,7 @@ func (osf *OutboundSenderFactory) New() (obs OutboundSender, err error) { caduceusOutboundSender := &CaduceusOutboundSender{ // id: osf.Listener.Webhook.Config.URL, - // listener: osf.Listener, + listener: osf.Listener, sender: osf.Sender, queueSize: osf.Config.QueueSizePerSender, cutOffPeriod: osf.Config.CutOffPeriod, @@ -162,7 +163,7 @@ func (osf *OutboundSenderFactory) New() (obs OutboundSender, err error) { deliveryInterval: osf.Config.DeliveryInterval, maxWorkers: osf.Config.NumWorkersPerSender, failureMsg: FailureMessage{ - // Original: osf.Listener, + Original: osf.Listener, Text: failureText, CutOffPeriod: osf.Config.CutOffPeriod.String(), QueueSize: osf.Config.QueueSizePerSender, @@ -199,111 +200,111 @@ func (osf *OutboundSenderFactory) New() (obs OutboundSender, err error) { // Update applies user configurable values for the outbound sender when a // webhook is registered -//TODO: commenting out for now until argus/ancla dependency issue is fixed -// func (obs *CaduceusOutboundSender) Update(wh ancla.InternalWebhook) (err error) { - -// // Validate the failure URL, if present -// if "" != wh.Webhook.FailureURL { -// if _, err = url.ParseRequestURI(wh.Webhook.FailureURL); nil != err { -// return -// } -// } - -// // Create and validate the event regex objects -// // nolint:prealloc -// var events []*regexp.Regexp -// for _, event := range wh.Webhook.Events { -// var re *regexp.Regexp -// if re, err = regexp.Compile(event); nil != err { -// return -// } - -// events = append(events, re) -// } -// if len(events) < 1 { -// err = errors.New("events must not be empty.") -// return -// } - -// // Create the matcher regex objects -// matcher := []*regexp.Regexp{} -// for _, item := range wh.Webhook.Matcher.DeviceID { -// if ".*" == item { -// // Match everything - skip the filtering -// matcher = []*regexp.Regexp{} -// break -// } - -// var re *regexp.Regexp -// if re, err = regexp.Compile(item); nil != err { -// err = fmt.Errorf("invalid matcher item: '%s'", item) -// return -// } -// matcher = append(matcher, re) -// } - -// // Validate the various urls -// urlCount := len(wh.Webhook.Config.AlternativeURLs) -// for i := 0; i < urlCount; i++ { -// _, err = url.Parse(wh.Webhook.Config.AlternativeURLs[i]) -// if err != nil { -// obs.logger.Error("failed to update url", zap.Any("url", wh.Webhook.Config.AlternativeURLs[i]), zap.Error(err)) -// return -// } -// } - -// obs.renewalTimeGauge.Set(float64(time.Now().Unix())) - -// // write/update obs -// obs.mutex.Lock() - -// obs.listener = wh - -// obs.failureMsg.Original = wh -// // Don't share the secret with others when there is an error. -// obs.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - -// obs.listener.Webhook.FailureURL = wh.Webhook.FailureURL -// obs.deliverUntil = wh.Webhook.Until -// obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) - -// obs.events = events - -// obs.deliveryRetryMaxGauge.Set(float64(obs.deliveryRetries)) - -// // if matcher list is empty set it nil for Queue() logic -// obs.matcher = nil -// if 0 < len(matcher) { -// obs.matcher = matcher -// } - -// if 0 == urlCount { -// obs.urls = ring.New(1) -// obs.urls.Value = obs.id -// } else { -// r := ring.New(urlCount) -// for i := 0; i < urlCount; i++ { -// r.Value = wh.Webhook.Config.AlternativeURLs[i] -// r = r.Next() -// } -// obs.urls = r -// } - -// // Randomize where we start so all the instances don't synchronize -// r := rand.New(rand.NewSource(time.Now().UnixNano())) -// offset := r.Intn(obs.urls.Len()) -// for 0 < offset { -// obs.urls = obs.urls.Next() -// offset-- -// } - -// // Update this here in case we make this configurable later -// obs.maxWorkersGauge.Set(float64(obs.maxWorkers)) - -// obs.mutex.Unlock() - -// return -// } +// TODO: commenting out for now until argus/ancla dependency issue is fixed +func (obs *CaduceusOutboundSender) Update(wh ListenerStub) (err error) { + + // Validate the failure URL, if present + if wh.Webhook.FailureURL != "" { + if _, err = url.ParseRequestURI(wh.Webhook.FailureURL); nil != err { + return + } + } + + // Create and validate the event regex objects + // nolint:prealloc + var events []*regexp.Regexp + for _, event := range wh.Webhook.Events { + var re *regexp.Regexp + if re, err = regexp.Compile(event); nil != err { + return + } + + events = append(events, re) + } + if len(events) < 1 { + err = errors.New("events must not be empty.") + return + } + + // Create the matcher regex objects + matcher := []*regexp.Regexp{} + for _, item := range wh.Webhook.Matcher.DeviceID { + if ".*" == item { + // Match everything - skip the filtering + matcher = []*regexp.Regexp{} + break + } + + var re *regexp.Regexp + if re, err = regexp.Compile(item); nil != err { + err = fmt.Errorf("invalid matcher item: '%s'", item) + return + } + matcher = append(matcher, re) + } + + // Validate the various urls + urlCount := len(wh.Webhook.Config.AlternativeURLs) + for i := 0; i < urlCount; i++ { + _, err = url.Parse(wh.Webhook.Config.AlternativeURLs[i]) + if err != nil { + obs.logger.Error("failed to update url", zap.Any("url", wh.Webhook.Config.AlternativeURLs[i]), zap.Error(err)) + return + } + } + + obs.renewalTimeGauge.Set(float64(time.Now().Unix())) + + // write/update obs + obs.mutex.Lock() + + obs.listener = wh + + obs.failureMsg.Original = wh + // Don't share the secret with others when there is an error. + obs.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" + + obs.listener.Webhook.FailureURL = wh.Webhook.FailureURL + obs.deliverUntil = wh.Webhook.Until + obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) + + obs.events = events + + obs.deliveryRetryMaxGauge.Set(float64(obs.deliveryRetries)) + + // if matcher list is empty set it nil for Queue() logic + obs.matcher = nil + if 0 < len(matcher) { + obs.matcher = matcher + } + + if 0 == urlCount { + obs.urls = ring.New(1) + obs.urls.Value = obs.id + } else { + r := ring.New(urlCount) + for i := 0; i < urlCount; i++ { + r.Value = wh.Webhook.Config.AlternativeURLs[i] + r = r.Next() + } + obs.urls = r + } + + // Randomize where we start so all the instances don't synchronize + r := rand.New(rand.NewSource(time.Now().UnixNano())) + offset := r.Intn(obs.urls.Len()) + for 0 < offset { + obs.urls = obs.urls.Next() + offset-- + } + + // Update this here in case we make this configurable later + obs.maxWorkersGauge.Set(float64(obs.maxWorkers)) + + obs.mutex.Unlock() + + return +} // Shutdown causes the CaduceusOutboundSender to stop its activities either gently or // abruptly based on the gentle parameter. If gentle is false, all queued @@ -713,8 +714,10 @@ func (obs *CaduceusOutboundSender) updateRequest(urls *ring.Ring) func(*http.Req func (obs *CaduceusOutboundSender) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { return func(attempt retry.Attempt[*http.Response]) { - obs.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: obs.id, EventLabel: event}).Add(1.0) - obs.logger.Debug("retrying HTTP transaction", zap.String("url", request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + if attempt.Retries > 0 { + obs.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: obs.id, EventLabel: event}).Add(1.0) + obs.logger.Debug("retrying HTTP transaction", zap.String("url", request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + } } diff --git a/senderWrapper.go b/senderWrapper.go index 3e47f6d6..b5554597 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -127,7 +127,6 @@ func NewSenderWrapper(tr http.RoundTripper, in CaduceusSenderWrapperIn) (csw *Ca csw = nil return } - csw.senders = make(map[string]OutboundSender) csw.shutdown = make(chan struct{}) @@ -165,7 +164,7 @@ func (sw *CaduceusSenderWrapper) Update(list []ListenerStub) { for i, v := range list { ids[i].Listener = v - ids[i].ID = v.Webhook.Config.URL + ids[i].ID = v.Webhook.Config.ReceiverURL } sw.mutex.Lock() From 519b4321b7f03f8e1246b089ead9084e55ecba9d Mon Sep 17 00:00:00 2001 From: maura fortino Date: Wed, 14 Feb 2024 16:10:48 -0500 Subject: [PATCH 06/10] renamed main structs and removed the factory as it was no longer needed --- caduceus_type.go | 4 +- caduceus_type_test.go | 2 +- client.go | 9 + http.go | 14 +- httpClient.go | 8 +- http_test.go | 940 +++++++++---------- main.go | 4 +- metrics.go | 10 +- metrics_test.go | 2 +- mocks_test.go | 23 + outboundSender.go => sinkSender.go | 335 ++++--- outboundSender_test.go => sinkSender_test.go | 0 senderWrapper.go => sinkWrapper.go | 141 +-- senderWrapper_test.go => sinkWrapper_test.go | 0 14 files changed, 733 insertions(+), 759 deletions(-) create mode 100644 client.go rename outboundSender.go => sinkSender.go (64%) rename outboundSender_test.go => sinkSender_test.go (100%) rename senderWrapper.go => sinkWrapper.go (54%) rename senderWrapper_test.go => sinkWrapper_test.go (100%) diff --git a/caduceus_type.go b/caduceus_type.go index cd2b6bfc..8575dfa1 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -41,11 +41,11 @@ type RequestHandler interface { } type CaduceusHandler struct { - senderWrapper SenderWrapper + wrapper Wrapper *zap.Logger } func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { ch.Logger.Info("Worker received a request, now passing to sender", zap.Int("workerId", workerID)) - ch.senderWrapper.Queue(msg) + ch.wrapper.Queue(msg) } diff --git a/caduceus_type_test.go b/caduceus_type_test.go index 84580531..73ea159a 100644 --- a/caduceus_type_test.go +++ b/caduceus_type_test.go @@ -18,7 +18,7 @@ func TestCaduceusHandler(t *testing.T) { fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once() testHandler := CaduceusHandler{ - senderWrapper: fakeSenderWrapper, + wrapper: fakeSenderWrapper, Logger: logger, } diff --git a/client.go b/client.go new file mode 100644 index 00000000..dfad9c57 --- /dev/null +++ b/client.go @@ -0,0 +1,9 @@ +package main + +import "net/http" + +// Client is the interface used to requests messages to the desired location. +// The Client can be either and HTTP Client or a Kafka Producer. +type Client interface { + Do(*http.Request) (*http.Response, error) +} diff --git a/http.go b/http.go index 59e67f60..5cbe98b2 100644 --- a/http.go +++ b/http.go @@ -20,9 +20,9 @@ import ( type ServerHandlerIn struct { fx.In - CaduceusSenderWrapper *CaduceusSenderWrapper - Logger *zap.Logger - Telemetry *HandlerTelemetry + SinkWrapper *SinkWrapper + Logger *zap.Logger + Telemetry *HandlerTelemetry } type ServerHandlerOut struct { @@ -189,18 +189,18 @@ func ProvideHandler() fx.Option { }, func(in ServerHandlerIn) (ServerHandlerOut, error) { //Hard coding maxOutstanding and incomingQueueDepth for now - handler, err := New(in.CaduceusSenderWrapper, in.Logger, in.Telemetry, 0.0, 0.0) + handler, err := New(in.SinkWrapper, in.Logger, in.Telemetry, 0.0, 0.0) return ServerHandlerOut{ Handler: handler, }, err }, ) } -func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { +func New(sw *SinkWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { return &ServerHandler{ caduceusHandler: &CaduceusHandler{ - senderWrapper: senderWrapper, - Logger: log, + wrapper: sw, + Logger: log, }, telemetry: t, maxOutstanding: maxOutstanding, diff --git a/httpClient.go b/httpClient.go index 1ce85498..51057378 100644 --- a/httpClient.go +++ b/httpClient.go @@ -16,11 +16,7 @@ var ( errNilHistogram = errors.New("histogram cannot be nil") ) -type httpClient interface { - Do(*http.Request) (*http.Response, error) -} - -func nopHTTPClient(next httpClient) httpClient { +func nopHTTPClient(next Client) Client { return next } @@ -51,7 +47,7 @@ func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, }, nil } -func (m *metricWrapper) roundTripper(next httpClient) httpClient { +func (m *metricWrapper) roundTripper(next Client) Client { return doerFunc(func(req *http.Request) (*http.Response, error) { startTime := m.now() resp, err := next.Do(req) diff --git a/http_test.go b/http_test.go index 3cf5895e..ac91c1c2 100644 --- a/http_test.go +++ b/http_test.go @@ -2,473 +2,473 @@ // // SPDX-License-Identifier: Apache-2.0 package main -// import ( -// "bytes" -// "io" -// "net/http" -// "net/http/httptest" -// "testing" -// "testing/iotest" -// "time" - -// "github.com/stretchr/testify/assert" -// "github.com/stretchr/testify/mock" -// "go.uber.org/zap/zaptest" - -// "github.com/xmidt-org/wrp-go/v3" -// ) - -// func exampleRequest(msgType int, list ...string) *http.Request { -// var buffer bytes.Buffer - -// trans := "1234" -// ct := wrp.MimeTypeMsgpack -// url := "localhost:8080" - -// for i := range list { -// switch { -// case i == 0: -// trans = list[i] -// case i == 1: -// ct = list[i] -// case i == 2: -// url = list[i] -// } - -// } -// wrp.NewEncoder(&buffer, wrp.Msgpack).Encode( -// &wrp.Message{ -// Type: wrp.MessageType(msgType), -// Source: "mac:112233445566/lmlite", -// TransactionUUID: trans, -// ContentType: ct, -// Destination: "event:bob/magic/dog", -// Payload: []byte("Hello, world."), -// }) - -// r := bytes.NewReader(buffer.Bytes()) -// req := httptest.NewRequest("POST", url, r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// return req -// } - -// func TestServerHandler(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// tcs := []struct { -// desc string -// expectedResponse int -// request *http.Request -// throwStatusBadRequest bool -// expectedEventType string -// startTime time.Time -// endTime time.Time -// }{ -// { -// desc: "TestServeHTTPHappyPath", -// expectedResponse: http.StatusAccepted, -// request: exampleRequest(4), -// expectedEventType: "bob", -// startTime: date1, -// endTime: date2, -// }, -// { -// desc: "TestServeHTTPInvalidMessageType", -// expectedResponse: http.StatusBadRequest, -// request: exampleRequest(1), -// throwStatusBadRequest: true, -// expectedEventType: unknownEventType, -// startTime: date1, -// endTime: date2, -// }, -// } - -// for _, tc := range tcs { -// assert := assert.New(t) - -// logger := zaptest.NewLogger(t) -// fakeHandler := new(mockHandler) -// if !tc.throwStatusBadRequest { -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).Return().Times(1) -// } - -// fakeEmptyRequests := new(mockCounter) -// fakeErrorRequests := new(mockCounter) -// fakeInvalidCount := new(mockCounter) -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) -// if tc.throwStatusBadRequest { -// fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() -// } - -// fakeTime := mockTime(tc.startTime, tc.endTime) -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", tc.expectedEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// log: logger, -// caduceusHandler: fakeHandler, -// errorRequests: fakeErrorRequests, -// emptyRequests: fakeEmptyRequests, -// invalidCount: fakeInvalidCount, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: fakeTime, -// } -// t.Run(tc.desc, func(t *testing.T) { -// w := httptest.NewRecorder() - -// serverWrapper.ServeHTTP(w, tc.request) -// resp := w.Result() - -// assert.Equal(tc.expectedResponse, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHandler.AssertExpectations(t) -// fakeHist.AssertExpectations(t) -// }) -// } -// } - -// func TestServerHandlerFixWrp(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// logger := zaptest.NewLogger(t) -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).Return().Once() - -// fakeEmptyRequests := new(mockCounter) -// fakeErrorRequests := new(mockCounter) -// fakeInvalidCount := new(mockCounter) -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) - -// fakeIncomingContentTypeCount := new(mockCounter) -// fakeIncomingContentTypeCount.On("With", []string{"content_type", wrp.MimeTypeMsgpack}).Return(fakeIncomingContentTypeCount) -// fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount) -// fakeIncomingContentTypeCount.On("Add", 1.0).Return() - -// fakeModifiedWRPCount := new(mockCounter) -// fakeModifiedWRPCount.On("With", []string{"reason", bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() -// fakeModifiedWRPCount.On("Add", 1.0).Return().Once() - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", "bob"} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// log: logger, -// caduceusHandler: fakeHandler, -// errorRequests: fakeErrorRequests, -// emptyRequests: fakeEmptyRequests, -// invalidCount: fakeInvalidCount, -// modifiedWRPCount: fakeModifiedWRPCount, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPHappyPath", func(t *testing.T) { -// w := httptest.NewRecorder() - -// serverWrapper.ServeHTTP(w, exampleRequest(4, "", "")) -// resp := w.Result() - -// assert.Equal(http.StatusAccepted, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHandler.AssertExpectations(t) -// fakeHist.AssertExpectations(t) -// }) -// } - -// func TestServerHandlerFull(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// logger := zaptest.NewLogger(t) -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// log: logger, -// caduceusHandler: fakeHandler, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Act like we have 1 in flight */ -// serverWrapper.incomingQueueDepth = 1 - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, exampleRequest(4)) -// resp := w.Result() - -// assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHist.AssertExpectations(t) -// }) -// } - -// func TestServerEmptyPayload(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// var buffer bytes.Buffer -// r := bytes.NewReader(buffer.Bytes()) -// req := httptest.NewRequest("POST", "localhost:8080", r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// logger := zaptest.NewLogger(t) -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - -// fakeEmptyRequests := new(mockCounter) -// fakeEmptyRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// log: logger, -// caduceusHandler: fakeHandler, -// emptyRequests: fakeEmptyRequests, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusBadRequest, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHist.AssertExpectations(t) -// }) -// } - -// func TestServerUnableToReadBody(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// var buffer bytes.Buffer -// r := iotest.TimeoutReader(bytes.NewReader(buffer.Bytes())) - -// _, _ = r.Read(nil) -// req := httptest.NewRequest("POST", "localhost:8080", r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// logger := zaptest.NewLogger(t) -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - -// fakeErrorRequests := new(mockCounter) -// fakeErrorRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// log: logger, -// caduceusHandler: fakeHandler, -// errorRequests: fakeErrorRequests, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusBadRequest, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// }) -// fakeHist.AssertExpectations(t) -// } - -// func TestServerInvalidBody(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// r := bytes.NewReader([]byte("Invalid payload.")) - -// _, _ = r.Read(nil) -// req := httptest.NewRequest("POST", "localhost:8080", r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// logger := zaptest.NewLogger(t) -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeInvalidCount := new(mockCounter) -// fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// log: logger, -// caduceusHandler: fakeHandler, -// invalidCount: fakeInvalidCount, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusBadRequest, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// }) -// fakeHist.AssertExpectations(t) -// } - -// func TestHandlerUnsupportedMediaType(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) - -// assert := assert.New(t) - -// logger := zaptest.NewLogger(t) -// fakeHandler := new(mockHandler) - -// fakeQueueDepth := new(mockGauge) - -// serverWrapper := &ServerHandler{ -// log: logger, -// caduceusHandler: fakeHandler, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// } -// testCases := []struct { -// name string -// headers []string -// }{ -// { -// name: "No Content Type Header", -// }, { -// name: "Wrong Content Type Header", -// headers: []string{"application/json"}, -// }, { -// name: "Multiple Content Type Headers", -// headers: []string{"application/msgpack", "application/msgpack", "application/msgpack"}, -// }, -// } - -// for _, testCase := range testCases { -// t.Run(testCase.name, func(t *testing.T) { -// fakeHist := new(mockHistogram) -// serverWrapper.incomingQueueLatency = fakeHist -// serverWrapper.now = mockTime(date1, date2) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// w := httptest.NewRecorder() -// req := exampleRequest(4) -// req.Header.Del("Content-Type") -// for _, h := range testCase.headers { -// req.Header.Add("Content-Type", h) -// } -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusUnsupportedMediaType, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// }) -// } - -// } +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "testing" + "testing/iotest" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap/zaptest" + + "github.com/xmidt-org/wrp-go/v3" +) + +func exampleRequest(msgType int, list ...string) *http.Request { + var buffer bytes.Buffer + + trans := "1234" + ct := wrp.MimeTypeMsgpack + url := "localhost:8080" + + for i := range list { + switch { + case i == 0: + trans = list[i] + case i == 1: + ct = list[i] + case i == 2: + url = list[i] + } + + } + wrp.NewEncoder(&buffer, wrp.Msgpack).Encode( + &wrp.Message{ + Type: wrp.MessageType(msgType), + Source: "mac:112233445566/lmlite", + TransactionUUID: trans, + ContentType: ct, + Destination: "event:bob/magic/dog", + Payload: []byte("Hello, world."), + }) + + r := bytes.NewReader(buffer.Bytes()) + req := httptest.NewRequest("POST", url, r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + return req +} + +func TestServerHandler(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + tcs := []struct { + desc string + expectedResponse int + request *http.Request + throwStatusBadRequest bool + expectedEventType string + startTime time.Time + endTime time.Time + }{ + { + desc: "TestServeHTTPHappyPath", + expectedResponse: http.StatusAccepted, + request: exampleRequest(4), + expectedEventType: "bob", + startTime: date1, + endTime: date2, + }, + { + desc: "TestServeHTTPInvalidMessageType", + expectedResponse: http.StatusBadRequest, + request: exampleRequest(1), + throwStatusBadRequest: true, + expectedEventType: unknownEventType, + startTime: date1, + endTime: date2, + }, + } + + for _, tc := range tcs { + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + if !tc.throwStatusBadRequest { + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).Return().Times(1) + } + + fakeEmptyRequests := new(mockCounter) + fakeErrorRequests := new(mockCounter) + fakeInvalidCount := new(mockCounter) + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) + if tc.throwStatusBadRequest { + fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() + } + + fakeTime := mockTime(tc.startTime, tc.endTime) + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", tc.expectedEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + errorRequests: fakeErrorRequests, + emptyRequests: fakeEmptyRequests, + invalidCount: fakeInvalidCount, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: fakeTime, + } + t.Run(tc.desc, func(t *testing.T) { + w := httptest.NewRecorder() + + serverWrapper.ServeHTTP(w, tc.request) + resp := w.Result() + + assert.Equal(tc.expectedResponse, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHandler.AssertExpectations(t) + fakeHist.AssertExpectations(t) + }) + } +} + +func TestServerHandlerFixWrp(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).Return().Once() + + fakeEmptyRequests := new(mockCounter) + fakeErrorRequests := new(mockCounter) + fakeInvalidCount := new(mockCounter) + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) + + fakeIncomingContentTypeCount := new(mockCounter) + fakeIncomingContentTypeCount.On("With", []string{"content_type", wrp.MimeTypeMsgpack}).Return(fakeIncomingContentTypeCount) + fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount) + fakeIncomingContentTypeCount.On("Add", 1.0).Return() + + fakeModifiedWRPCount := new(mockCounter) + fakeModifiedWRPCount.On("With", []string{"reason", bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() + fakeModifiedWRPCount.On("Add", 1.0).Return().Once() + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", "bob"} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + errorRequests: fakeErrorRequests, + emptyRequests: fakeEmptyRequests, + invalidCount: fakeInvalidCount, + modifiedWRPCount: fakeModifiedWRPCount, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPHappyPath", func(t *testing.T) { + w := httptest.NewRecorder() + + serverWrapper.ServeHTTP(w, exampleRequest(4, "", "")) + resp := w.Result() + + assert.Equal(http.StatusAccepted, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHandler.AssertExpectations(t) + fakeHist.AssertExpectations(t) + }) +} + +func TestServerHandlerFull(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) + + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Act like we have 1 in flight */ + serverWrapper.incomingQueueDepth = 1 + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, exampleRequest(4)) + resp := w.Result() + + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHist.AssertExpectations(t) + }) +} + +func TestServerEmptyPayload(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + var buffer bytes.Buffer + r := bytes.NewReader(buffer.Bytes()) + req := httptest.NewRequest("POST", "localhost:8080", r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) + + fakeEmptyRequests := new(mockCounter) + fakeEmptyRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + emptyRequests: fakeEmptyRequests, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusBadRequest, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHist.AssertExpectations(t) + }) +} + +func TestServerUnableToReadBody(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + var buffer bytes.Buffer + r := iotest.TimeoutReader(bytes.NewReader(buffer.Bytes())) + + _, _ = r.Read(nil) + req := httptest.NewRequest("POST", "localhost:8080", r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() + + fakeErrorRequests := new(mockCounter) + fakeErrorRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + errorRequests: fakeErrorRequests, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusBadRequest, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + fakeHist.AssertExpectations(t) +} + +func TestServerInvalidBody(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + r := bytes.NewReader([]byte("Invalid payload.")) + + _, _ = r.Read(nil) + req := httptest.NewRequest("POST", "localhost:8080", r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() + + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeInvalidCount := new(mockCounter) + fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + invalidCount: fakeInvalidCount, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusBadRequest, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + fakeHist.AssertExpectations(t) +} + +func TestHandlerUnsupportedMediaType(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + + fakeQueueDepth := new(mockGauge) + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + } + testCases := []struct { + name string + headers []string + }{ + { + name: "No Content Type Header", + }, { + name: "Wrong Content Type Header", + headers: []string{"application/json"}, + }, { + name: "Multiple Content Type Headers", + headers: []string{"application/msgpack", "application/msgpack", "application/msgpack"}, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + fakeHist := new(mockHistogram) + serverWrapper.incomingQueueLatency = fakeHist + serverWrapper.now = mockTime(date1, date2) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + w := httptest.NewRecorder() + req := exampleRequest(4) + req.Header.Del("Content-Type") + for _, h := range testCase.headers { + req.Header.Add("Content-Type", h) + } + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusUnsupportedMediaType, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + } + +} diff --git a/main.go b/main.go index 18525b81..9a73d3e0 100644 --- a/main.go +++ b/main.go @@ -136,11 +136,11 @@ func caduceus(arguments []string, run bool) error { arrangehttp.ProvideServer("servers.alternate"), ProvideHandler(), - ProvideSenderWrapper(), + ProvideWrapper(), touchstone.Provide(), touchhttp.Provide(), ProvideMetrics(), - ProvideSenderMetrics(), + ProvideSinkMetrics(), // ancla.ProvideMetrics(), //TODO: need to add back in once we fix the ancla/argus dependency issue ) diff --git a/metrics.go b/metrics.go index 68494350..516704d2 100644 --- a/metrics.go +++ b/metrics.go @@ -153,10 +153,10 @@ func ProvideMetrics() fx.Option { ) } -func ProvideSenderMetrics() fx.Option { +func ProvideSinkMetrics() fx.Option { return fx.Provide( - func(in SenderMetricsIn) (SenderWrapperMetrics, OutboundSenderMetrics) { - outbounderMetrics := OutboundSenderMetrics{ + func(in SenderMetricsIn) (SinkWrapperMetrics, SinkSenderMetrics) { + senderMetrics := SinkSenderMetrics{ DeliveryCounter: in.DeliveryCounter, DeliveryRetryCounter: in.DeliveryRetryCounter, DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, @@ -170,12 +170,12 @@ func ProvideSenderMetrics() fx.Option { OutgoingQueueDepth: in.OutgoingQueueDepth, ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, } - wrapperMetrics := SenderWrapperMetrics{ + wrapperMetrics := SinkWrapperMetrics{ QueryLatency: in.QueryLatency, EventType: in.EventType, } - return wrapperMetrics, outbounderMetrics + return wrapperMetrics, senderMetrics }, ) } diff --git a/metrics_test.go b/metrics_test.go index db6c3ddf..a3b957b5 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -36,7 +36,7 @@ func TestMetrics(t *testing.T) { func TestProvideSenderMetrics(t *testing.T) { assert := assert.New(t) - m := ProvideSenderMetrics() + m := ProvideSinkMetrics() assert.NotNil(m) } diff --git a/mocks_test.go b/mocks_test.go index 6f69a97a..5a2c19b8 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -4,7 +4,9 @@ package main import ( "time" + "unicode/utf8" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" "github.com/xmidt-org/wrp-go/v3" ) @@ -46,3 +48,24 @@ func mockTime(one, two time.Time) func() time.Time { return one } } + +type mockCounter struct { + mock.Mock +} + +func (m *mockCounter) Add(delta float64) { + m.Called(delta) +} + +func (m *mockCounter) Inc (){ + m.Called(1) +} +func (m *mockCounter) With(labelValues ...string) prometheus.Counter { + for _, v := range labelValues { + if !utf8.ValidString(v) { + panic("not UTF-8") + } + } + args := m.Called(labelValues) + return args.Get(0).(prometheus.Counter) +} diff --git a/outboundSender.go b/sinkSender.go similarity index 64% rename from outboundSender.go rename to sinkSender.go index 259383eb..57153d29 100644 --- a/outboundSender.go +++ b/sinkSender.go @@ -50,23 +50,14 @@ type FailureMessage struct { Workers int `json:"worker_count"` } -type OutboundSender interface { +type Sender interface { // Update(ancla.InternalWebhook) error Shutdown(bool) RetiredSince() time.Time Queue(*wrp.Message) } -type OutboundSenderFactory struct { - Config SenderConfig - Logger *zap.Logger - Metrics OutboundSenderMetrics - Sender httpClient - Listener ListenerStub - ClientMiddleware func(httpClient) httpClient -} - -type OutboundSenderMetrics struct { +type SinkSenderMetrics struct { DeliveryCounter *prometheus.CounterVec DeliveryRetryCounter *prometheus.CounterVec DeliveryRetryMaxGauge *prometheus.GaugeVec @@ -82,13 +73,13 @@ type OutboundSenderMetrics struct { } // CaduceusOutboundSender is the outbound sender object. -type CaduceusOutboundSender struct { +type SinkSender struct { id string urls *ring.Ring listener ListenerStub deliverUntil time.Time dropUntil time.Time - sender httpClient + client Client events []*regexp.Regexp matcher []*regexp.Regexp queueSize int @@ -121,79 +112,73 @@ type CaduceusOutboundSender struct { queue atomic.Value customPIDs []string disablePartnerIDs bool - clientMiddleware func(httpClient) httpClient + clientMiddleware func(Client) Client } -// New creates a new OutboundSender object from the factory, or returns an error. -func (osf *OutboundSenderFactory) New() (obs OutboundSender, err error) { - // if _, err = url.ParseRequestURI(osf.Listener.Webhook.Config.URL); nil != err { - // return - // } - - if osf.ClientMiddleware == nil { - osf.ClientMiddleware = nopHTTPClient +func newSinkSender(sw *SinkWrapper) (s Sender, err error) { + if sw.clientMiddleware == nil { + sw.clientMiddleware = nopHTTPClient } - - if osf.Sender == nil { - err = errors.New("nil Sender()") + if sw.client == nil { + err = errors.New("nil Client") return } - if osf.Config.CutOffPeriod.Nanoseconds() == 0 { + if sw.config.CutOffPeriod.Nanoseconds() == 0 { err = errors.New("invalid CutOffPeriod") return } - if osf.Logger == nil { + if sw.logger == nil { err = errors.New("logger required") return } // decoratedLogger := osf.Logger.With(zap.String("webhook.address", osf.Listener.Webhook.Address)) - caduceusOutboundSender := &CaduceusOutboundSender{ + sinkSender := &SinkSender{ // id: osf.Listener.Webhook.Config.URL, - listener: osf.Listener, - sender: osf.Sender, - queueSize: osf.Config.QueueSizePerSender, - cutOffPeriod: osf.Config.CutOffPeriod, + listener: sw.listener, + client: sw.client, + queueSize: sw.config.QueueSizePerSender, + cutOffPeriod: sw.config.CutOffPeriod, // deliverUntil: osf.Listener.Webhook.Until, // logger: decoratedLogger, - deliveryRetries: osf.Config.DeliveryRetries, - deliveryInterval: osf.Config.DeliveryInterval, - maxWorkers: osf.Config.NumWorkersPerSender, + deliveryRetries: sw.config.DeliveryRetries, + deliveryInterval: sw.config.DeliveryInterval, + maxWorkers: sw.config.NumWorkersPerSender, failureMsg: FailureMessage{ - Original: osf.Listener, + Original: sw.listener, Text: failureText, - CutOffPeriod: osf.Config.CutOffPeriod.String(), - QueueSize: osf.Config.QueueSizePerSender, - Workers: osf.Config.NumWorkersPerSender, + CutOffPeriod: sw.config.CutOffPeriod.String(), + QueueSize: sw.config.QueueSizePerSender, + Workers: sw.config.NumWorkersPerSender, }, - customPIDs: osf.Config.CustomPIDs, - disablePartnerIDs: osf.Config.DisablePartnerIDs, - clientMiddleware: osf.ClientMiddleware, + customPIDs: sw.config.CustomPIDs, + disablePartnerIDs: sw.config.DisablePartnerIDs, + clientMiddleware: sw.clientMiddleware, } // Don't share the secret with others when there is an error. // caduceusOutboundSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - CreateOutbounderMetrics(osf.Metrics, caduceusOutboundSender) + CreateOutbounderMetrics(sw.metrics, sinkSender) // update queue depth and current workers gauge to make sure they start at 0 - caduceusOutboundSender.queueDepthGauge.Set(0) - caduceusOutboundSender.currentWorkersGauge.Set(0) + sinkSender.queueDepthGauge.Set(0) + sinkSender.currentWorkersGauge.Set(0) - caduceusOutboundSender.queue.Store(make(chan *wrp.Message, osf.Config.QueueSizePerSender)) + sinkSender.queue.Store(make(chan *wrp.Message, sw.config.QueueSizePerSender)) // if err = caduceusOutboundSender.Update(osf.Listener); nil != err { // return // } - caduceusOutboundSender.workers = semaphore.New(caduceusOutboundSender.maxWorkers) - caduceusOutboundSender.wg.Add(1) - go caduceusOutboundSender.dispatcher() + sinkSender.workers = semaphore.New(sinkSender.maxWorkers) + sinkSender.wg.Add(1) + go sinkSender.dispatcher() - obs = caduceusOutboundSender + s = sinkSender return } @@ -201,7 +186,7 @@ func (osf *OutboundSenderFactory) New() (obs OutboundSender, err error) { // Update applies user configurable values for the outbound sender when a // webhook is registered // TODO: commenting out for now until argus/ancla dependency issue is fixed -func (obs *CaduceusOutboundSender) Update(wh ListenerStub) (err error) { +func (s *SinkSender) Update(wh ListenerStub) (err error) { // Validate the failure URL, if present if wh.Webhook.FailureURL != "" { @@ -248,60 +233,60 @@ func (obs *CaduceusOutboundSender) Update(wh ListenerStub) (err error) { for i := 0; i < urlCount; i++ { _, err = url.Parse(wh.Webhook.Config.AlternativeURLs[i]) if err != nil { - obs.logger.Error("failed to update url", zap.Any("url", wh.Webhook.Config.AlternativeURLs[i]), zap.Error(err)) + s.logger.Error("failed to update url", zap.Any("url", wh.Webhook.Config.AlternativeURLs[i]), zap.Error(err)) return } } - obs.renewalTimeGauge.Set(float64(time.Now().Unix())) + s.renewalTimeGauge.Set(float64(time.Now().Unix())) // write/update obs - obs.mutex.Lock() + s.mutex.Lock() - obs.listener = wh + s.listener = wh - obs.failureMsg.Original = wh + s.failureMsg.Original = wh // Don't share the secret with others when there is an error. - obs.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" + s.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - obs.listener.Webhook.FailureURL = wh.Webhook.FailureURL - obs.deliverUntil = wh.Webhook.Until - obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) + s.listener.Webhook.FailureURL = wh.Webhook.FailureURL + s.deliverUntil = wh.Webhook.Until + s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix())) - obs.events = events + s.events = events - obs.deliveryRetryMaxGauge.Set(float64(obs.deliveryRetries)) + s.deliveryRetryMaxGauge.Set(float64(s.deliveryRetries)) // if matcher list is empty set it nil for Queue() logic - obs.matcher = nil + s.matcher = nil if 0 < len(matcher) { - obs.matcher = matcher + s.matcher = matcher } if 0 == urlCount { - obs.urls = ring.New(1) - obs.urls.Value = obs.id + s.urls = ring.New(1) + s.urls.Value = s.id } else { r := ring.New(urlCount) for i := 0; i < urlCount; i++ { r.Value = wh.Webhook.Config.AlternativeURLs[i] r = r.Next() } - obs.urls = r + s.urls = r } // Randomize where we start so all the instances don't synchronize r := rand.New(rand.NewSource(time.Now().UnixNano())) - offset := r.Intn(obs.urls.Len()) + offset := r.Intn(s.urls.Len()) for 0 < offset { - obs.urls = obs.urls.Next() + s.urls = s.urls.Next() offset-- } // Update this here in case we make this configurable later - obs.maxWorkersGauge.Set(float64(obs.maxWorkers)) + s.maxWorkersGauge.Set(float64(s.maxWorkers)) - obs.mutex.Unlock() + s.mutex.Unlock() return } @@ -309,29 +294,29 @@ func (obs *CaduceusOutboundSender) Update(wh ListenerStub) (err error) { // Shutdown causes the CaduceusOutboundSender to stop its activities either gently or // abruptly based on the gentle parameter. If gentle is false, all queued // messages will be dropped without an attempt to send made. -func (obs *CaduceusOutboundSender) Shutdown(gentle bool) { +func (s *SinkSender) Shutdown(gentle bool) { if !gentle { // need to close the channel we're going to replace, in case it doesn't // have any events in it. - close(obs.queue.Load().(chan *wrp.Message)) - obs.Empty(obs.droppedExpiredCounter) + close(s.queue.Load().(chan *wrp.Message)) + s.Empty(s.droppedExpiredCounter) } - close(obs.queue.Load().(chan *wrp.Message)) - obs.wg.Wait() + close(s.queue.Load().(chan *wrp.Message)) + s.wg.Wait() - obs.mutex.Lock() - obs.deliverUntil = time.Time{} - obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) - obs.queueDepthGauge.Set(0) //just in case - obs.mutex.Unlock() + s.mutex.Lock() + s.deliverUntil = time.Time{} + s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix())) + s.queueDepthGauge.Set(0) //just in case + s.mutex.Unlock() } // RetiredSince returns the time the CaduceusOutboundSender retired (which could be in // the future). -func (obs *CaduceusOutboundSender) RetiredSince() time.Time { - obs.mutex.RLock() - deliverUntil := obs.deliverUntil - obs.mutex.RUnlock() +func (s *SinkSender) RetiredSince() time.Time { + s.mutex.RLock() + deliverUntil := s.deliverUntil + s.mutex.RUnlock() return deliverUntil } @@ -349,28 +334,28 @@ func overlaps(sl1 []string, sl2 []string) bool { // Queue is given a request to evaluate and optionally enqueue in the list // of messages to deliver. The request is checked to see if it matches the // criteria before being accepted or silently dropped. -func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { - obs.mutex.RLock() - deliverUntil := obs.deliverUntil - dropUntil := obs.dropUntil - events := obs.events - matcher := obs.matcher - obs.mutex.RUnlock() +func (s *SinkSender) Queue(msg *wrp.Message) { + s.mutex.RLock() + deliverUntil := s.deliverUntil + dropUntil := s.dropUntil + events := s.events + matcher := s.matcher + s.mutex.RUnlock() now := time.Now() - if !obs.isValidTimeWindow(now, dropUntil, deliverUntil) { - obs.logger.Debug("invalid time window for event", zap.Any("now", now), zap.Any("dropUntil", dropUntil), zap.Any("deliverUntil", deliverUntil)) + if !s.isValidTimeWindow(now, dropUntil, deliverUntil) { + s.logger.Debug("invalid time window for event", zap.Any("now", now), zap.Any("dropUntil", dropUntil), zap.Any("deliverUntil", deliverUntil)) return } //check the partnerIDs - if !obs.disablePartnerIDs { + if !s.disablePartnerIDs { if len(msg.PartnerIDs) == 0 { - msg.PartnerIDs = obs.customPIDs + msg.PartnerIDs = s.customPIDs } - // if !overlaps(obs.listener.PartnerIDs, msg.PartnerIDs) { - // obs.logger.Debug("parter id check failed", zap.Strings("webhook.partnerIDs", obs.listener.PartnerIDs), zap.Strings("event.partnerIDs", msg.PartnerIDs)) + // if !overlaps(s.listener.PartnerIDs, msg.PartnerIDs) { + // s.logger.Debug("parter id check failed", zap.Strings("webhook.partnerIDs", s.listener.PartnerIDs), zap.Strings("event.partnerIDs", msg.PartnerIDs)) // return // } } @@ -386,7 +371,7 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { } } if !matchEvent { - obs.logger.Debug("destination regex doesn't match", zap.String("event.dest", msg.Destination)) + s.logger.Debug("destination regex doesn't match", zap.String("event.dest", msg.Destination)) return } @@ -401,31 +386,31 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { } if !matchDevice { - obs.logger.Debug("device regex doesn't match", zap.String("event.source", msg.Source)) + s.logger.Debug("device regex doesn't match", zap.String("event.source", msg.Source)) return } select { - case obs.queue.Load().(chan *wrp.Message) <- msg: - obs.queueDepthGauge.Add(1.0) - obs.logger.Debug("event added to outbound queue", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + case s.queue.Load().(chan *wrp.Message) <- msg: + s.queueDepthGauge.Add(1.0) + s.logger.Debug("event added to outbound queue", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) default: - obs.logger.Debug("queue full. event dropped", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) - obs.queueOverflow() - obs.droppedQueueFullCounter.Add(1.0) + s.logger.Debug("queue full. event dropped", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + s.queueOverflow() + s.droppedQueueFullCounter.Add(1.0) } } -func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { +func (s *SinkSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { if !now.After(dropUntil) { // client was cut off - obs.droppedCutoffCounter.Add(1.0) + s.droppedCutoffCounter.Add(1.0) return false } if !now.Before(deliverUntil) { // outside delivery window - obs.droppedExpiredBeforeQueueCounter.Add(1.0) + s.droppedExpiredBeforeQueueCounter.Add(1.0) return false } @@ -436,15 +421,15 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti // a fresh one, counting any current messages in the queue as dropped. // It should never close a queue, as a queue not referenced anywhere will be // cleaned up by the garbage collector without needing to be closed. -func (obs *CaduceusOutboundSender) Empty(droppedCounter prometheus.Counter) { - droppedMsgs := obs.queue.Load().(chan *wrp.Message) - obs.queue.Store(make(chan *wrp.Message, obs.queueSize)) +func (s *SinkSender) Empty(droppedCounter prometheus.Counter) { + droppedMsgs := s.queue.Load().(chan *wrp.Message) + s.queue.Store(make(chan *wrp.Message, s.queueSize)) droppedCounter.Add(float64(len(droppedMsgs))) - obs.queueDepthGauge.Set(0.0) + s.queueDepthGauge.Set(0.0) } -func (obs *CaduceusOutboundSender) dispatcher() { - defer obs.wg.Done() +func (s *SinkSender) dispatcher() { + defer s.wg.Done() var ( msg *wrp.Message urls *ring.Ring @@ -456,7 +441,7 @@ Loop: for { // Always pull a new queue in case we have been cutoff or are shutting // down. - msgQueue := obs.queue.Load().(chan *wrp.Message) + msgQueue := s.queue.Load().(chan *wrp.Message) // nolint:gosimple select { // The dispatcher cannot get stuck blocking here forever (caused by an @@ -483,50 +468,50 @@ Loop: if !ok { break Loop } - obs.queueDepthGauge.Add(-1.0) - obs.mutex.RLock() - urls = obs.urls + s.queueDepthGauge.Add(-1.0) + s.mutex.RLock() + urls = s.urls // Move to the next URL to try 1st the next time. // This is okay because we run a single dispatcher and it's the // only one updating this field. - obs.urls = obs.urls.Next() - deliverUntil := obs.deliverUntil - dropUntil := obs.dropUntil - // secret = obs.listener.Webhook.Config.Secret - // accept = obs.listener.Webhook.Config.ContentType - obs.mutex.RUnlock() + s.urls = s.urls.Next() + deliverUntil := s.deliverUntil + dropUntil := s.dropUntil + // secret = s.listener.Webhook.Config.Secret + // accept = s.listener.Webhook.Config.ContentType + s.mutex.RUnlock() now := time.Now() if now.Before(dropUntil) { - obs.droppedCutoffCounter.Add(1.0) + s.droppedCutoffCounter.Add(1.0) continue } if now.After(deliverUntil) { - obs.Empty(obs.droppedExpiredCounter) + s.Empty(s.droppedExpiredCounter) continue } - obs.workers.Acquire() - obs.currentWorkersGauge.Add(1.0) + s.workers.Acquire() + s.currentWorkersGauge.Add(1.0) - go obs.send(urls, secret, accept, msg) + go s.send(urls, secret, accept, msg) } } - for i := 0; i < obs.maxWorkers; i++ { - obs.workers.Acquire() + for i := 0; i < s.maxWorkers; i++ { + s.workers.Acquire() } } // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa -func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType string, msg *wrp.Message) { +func (s *SinkSender) send(urls *ring.Ring, secret, acceptType string, msg *wrp.Message) { defer func() { if r := recover(); nil != r { - obs.droppedPanic.Add(1.0) - obs.logger.Error("goroutine send() panicked", zap.String("id", obs.id), zap.Any("panic", r)) + s.droppedPanic.Add(1.0) + s.logger.Error("goroutine send() panicked", zap.String("id", s.id), zap.Any("panic", r)) } - obs.workers.Release() - obs.currentWorkersGauge.Add(-1.0) + s.workers.Release() + s.currentWorkersGauge.Add(-1.0) }() payload := msg.Payload @@ -550,8 +535,8 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri req, err := http.NewRequest("POST", urls.Value.(string), payloadReader) if nil != err { // Report drop - obs.droppedInvalidConfig.Add(1.0) - obs.logger.Error("Invalid URL", zap.String("url", urls.Value.(string)), zap.String("id", obs.id), zap.Error(err)) + s.droppedInvalidConfig.Add(1.0) + s.logger.Error("Invalid URL", zap.String("url", urls.Value.(string)), zap.String("id", s.id), zap.Error(err)) return } @@ -582,20 +567,20 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri event := msg.FindEventStringSubMatch() // Send it - obs.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + s.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) client, _ := retryhttp.NewClient( - retryhttp.WithHTTPClient(obs.clientMiddleware(obs.sender)), - retryhttp.WithRunner(obs.addRunner(req, event)), - retryhttp.WithRequesters(obs.updateRequest(urls)), + retryhttp.WithHTTPClient(s.clientMiddleware(s.client)), + retryhttp.WithRunner(s.addRunner(req, event)), + retryhttp.WithRequesters(s.updateRequest(urls)), ) resp, err := client.Do(req) code := "failure" - l := obs.logger + l := s.logger if nil != err { // Report failure - obs.droppedNetworkErrCounter.Add(1.0) - l = obs.logger.With(zap.Error(err)) + s.droppedNetworkErrCounter.Add(1.0) + l = s.logger.With(zap.Error(err)) } else { // Report Result code = strconv.Itoa(resp.StatusCode) @@ -607,37 +592,37 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri resp.Body.Close() } } - obs.deliveryCounter.With(prometheus.Labels{UrlLabel: obs.id, CodeLabel: code, EventLabel: event}).Add(1.0) + s.deliveryCounter.With(prometheus.Labels{UrlLabel: s.id, CodeLabel: code, EventLabel: event}).Add(1.0) l.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String("code", code), zap.String("url", req.URL.String())) } // queueOverflow handles the logic of what to do when a queue overflows: // cutting off the webhook for a time and sending a cut off notification // to the failure URL. -func (obs *CaduceusOutboundSender) queueOverflow() { - obs.mutex.Lock() - if time.Now().Before(obs.dropUntil) { - obs.mutex.Unlock() +func (s *SinkSender) queueOverflow() { + s.mutex.Lock() + if time.Now().Before(s.dropUntil) { + s.mutex.Unlock() return } - obs.dropUntil = time.Now().Add(obs.cutOffPeriod) - obs.dropUntilGauge.Set(float64(obs.dropUntil.Unix())) - // secret := obs.listener.Webhook.Config.Secret + s.dropUntil = time.Now().Add(s.cutOffPeriod) + s.dropUntilGauge.Set(float64(s.dropUntil.Unix())) + // secret := s.listener.Webhook.Config.Secret secret := "placeholderSecret" - failureMsg := obs.failureMsg - // failureURL := obs.listener.Webhook.FailureURL + failureMsg := s.failureMsg + // failureURL := s.listener.Webhook.FailureURL failureURL := "placeholderURL" - obs.mutex.Unlock() + s.mutex.Unlock() - obs.cutOffCounter.Add(1.0) + s.cutOffCounter.Add(1.0) // We empty the queue but don't close the channel, because we're not // shutting down. - obs.Empty(obs.droppedCutoffCounter) + s.Empty(s.droppedCutoffCounter) msg, err := json.Marshal(failureMsg) if nil != err { - obs.logger.Error("Cut-off notification json.Marshal failed", zap.Any("failureMessage", obs.failureMsg), zap.String("for", obs.id), zap.Error(err)) + s.logger.Error("Cut-off notification json.Marshal failed", zap.Any("failureMessage", s.failureMsg), zap.String("for", s.id), zap.Error(err)) return } @@ -651,8 +636,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() { req, err := http.NewRequest("POST", failureURL, payload) if nil != err { // Failure - obs.logger.Error("Unable to send cut-off notification", zap.String("notification", - failureURL), zap.String("for", obs.id), zap.Error(err)) + s.logger.Error("Unable to send cut-off notification", zap.String("notification", + failureURL), zap.String("for", s.id), zap.Error(err)) return } req.Header.Set("Content-Type", wrp.MimeTypeJson) @@ -664,16 +649,16 @@ func (obs *CaduceusOutboundSender) queueOverflow() { req.Header.Set("X-Webpa-Signature", sig) } - resp, err := obs.sender.Do(req) + resp, err := s.client.Do(req) if nil != err { // Failure - obs.logger.Error("Unable to send cut-off notification", zap.String("notification", failureURL), zap.String("for", obs.id), zap.Error(err)) + s.logger.Error("Unable to send cut-off notification", zap.String("notification", failureURL), zap.String("for", s.id), zap.Error(err)) return } if nil == resp { // Failure - obs.logger.Error("Unable to send cut-off notification, nil response", zap.String("notification", failureURL)) + s.logger.Error("Unable to send cut-off notification, nil response", zap.String("notification", failureURL)) return } @@ -686,44 +671,44 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } } -func (obs *CaduceusOutboundSender) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { +func (s *SinkSender) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { //TODO: need to handle error runner, _ := retry.NewRunner[*http.Response]( retry.WithPolicyFactory[*http.Response](retry.Config{ - Interval: obs.deliveryInterval, - MaxRetries: obs.deliveryRetries, + Interval: s.deliveryInterval, + MaxRetries: s.deliveryRetries, }), - retry.WithOnAttempt[*http.Response](obs.onAttempt(request, event)), + retry.WithOnAttempt[*http.Response](s.onAttempt(request, event)), ) return runner } -func (obs *CaduceusOutboundSender) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { +func (s *SinkSender) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { return func(request *http.Request) *http.Request { urls = urls.Next() tmp, err := url.Parse(urls.Value.(string)) if err != nil { - obs.logger.Error("failed to update url", zap.String(UrlLabel, urls.Value.(string)), zap.Error(err)) + s.logger.Error("failed to update url", zap.String(UrlLabel, urls.Value.(string)), zap.Error(err)) } request.URL = tmp return request } } -func (obs *CaduceusOutboundSender) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { +func (s *SinkSender) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { return func(attempt retry.Attempt[*http.Response]) { if attempt.Retries > 0 { - obs.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: obs.id, EventLabel: event}).Add(1.0) - obs.logger.Debug("retrying HTTP transaction", zap.String("url", request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + s.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: s.id, EventLabel: event}).Add(1.0) + s.logger.Debug("retrying HTTP transaction", zap.String("url", request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) } } } -func CreateOutbounderMetrics(m OutboundSenderMetrics, c *CaduceusOutboundSender) { +func CreateOutbounderMetrics(m SinkSenderMetrics, c *SinkSender) { c.deliveryRetryCounter = m.DeliveryRetryCounter c.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{UrlLabel: c.id}) c.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{UrlLabel: c.id}) diff --git a/outboundSender_test.go b/sinkSender_test.go similarity index 100% rename from outboundSender_test.go rename to sinkSender_test.go diff --git a/senderWrapper.go b/sinkWrapper.go similarity index 54% rename from senderWrapper.go rename to sinkWrapper.go index b5554597..dd4899aa 100644 --- a/senderWrapper.go +++ b/sinkWrapper.go @@ -18,47 +18,31 @@ import ( "go.uber.org/zap" ) -// SenderWrapperFactory configures the CaduceusSenderWrapper for creation -type CaduceusSenderWrapperIn struct { +// WrapperIn configures the Wrapper for creation +type SinkWrapperIn struct { fx.In - Tracing candlelight.Tracing - SenderConfig SenderConfig - WrapperMetrics SenderWrapperMetrics - OutbounderMetrics OutboundSenderMetrics - Logger *zap.Logger + Tracing candlelight.Tracing + SenderConfig SenderConfig + WrapperMetrics SinkWrapperMetrics + SenderMetrics SinkSenderMetrics + Logger *zap.Logger } -type SenderWrapperMetrics struct { +type SinkWrapperMetrics struct { QueryLatency prometheus.ObserverVec EventType *prometheus.CounterVec } -type SenderWrapper interface { +// SinkWrapper interface is needed for unit testing. +type Wrapper interface { // Update([]ancla.InternalWebhook) Queue(*wrp.Message) Shutdown(bool) } -// CaduceusSenderWrapper contains no external parameters. -type CaduceusSenderWrapper struct { - // The http client Do() function to share with OutboundSenders. - sender httpClient - // The number of workers to assign to each OutboundSender created. - numWorkersPerSender int - - // The queue size to assign to each OutboundSender created. - queueSizePerSender int - - // Number of delivery retries before giving up - deliveryRetries int - - // Time in between delivery retries - deliveryInterval time.Duration - - // The cut off time to assign to each OutboundSender created. - cutOffPeriod time.Duration - +// Wrapper contains the configuration that will be shared with each outbound sender. It contains no external parameters. +type SinkWrapper struct { // The amount of time to let expired OutboundSenders linger before // shutting them down and cleaning up the resources associated with them. linger time.Duration @@ -66,76 +50,54 @@ type CaduceusSenderWrapper struct { // The logger implementation to share with OutboundSenders. logger *zap.Logger - mutex *sync.RWMutex - senders map[string]OutboundSender - eventType *prometheus.CounterVec - queryLatency prometheus.ObserverVec - wg sync.WaitGroup - shutdown chan struct{} - - // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message - // has no partner IDs. - customPIDs []string - - // DisablePartnerIDs dictates whether or not to enforce the partner ID check. - disablePartnerIDs bool - outbounderSetUp *OutboundSenderFactory + mutex *sync.RWMutex + senders map[string]Sender + eventType *prometheus.CounterVec + queryLatency prometheus.ObserverVec + wg sync.WaitGroup + shutdown chan struct{} + config SenderConfig + metrics SinkSenderMetrics + client Client //should this be a part of wrapper or sender? + listener ListenerStub //should this be a part of wrapper or sender? + clientMiddleware func(Client) Client //should this be a part of wrapper or sender? } -func ProvideSenderWrapper() fx.Option { +func ProvideWrapper() fx.Option { return fx.Provide( - func(in CaduceusSenderWrapperIn) http.RoundTripper { - return NewRoundTripper(in.SenderConfig, in.Tracing) - }, - func(tr http.RoundTripper, in CaduceusSenderWrapperIn) (*CaduceusSenderWrapper, error) { - csw, err := NewSenderWrapper(tr, in) + func(in SinkWrapperIn) (*SinkWrapper, error) { + csw, err := NewSinkWrapper(in) return csw, err }, ) } -// New produces a new CaduceusSenderWrapper -// based on the SenderConfig -func NewSenderWrapper(tr http.RoundTripper, in CaduceusSenderWrapperIn) (csw *CaduceusSenderWrapper, err error) { - csw = &CaduceusSenderWrapper{ - numWorkersPerSender: in.SenderConfig.NumWorkersPerSender, - queueSizePerSender: in.SenderConfig.QueueSizePerSender, - deliveryRetries: in.SenderConfig.DeliveryRetries, - deliveryInterval: in.SenderConfig.DeliveryInterval, - cutOffPeriod: in.SenderConfig.CutOffPeriod, - linger: in.SenderConfig.Linger, - logger: in.Logger, - customPIDs: in.SenderConfig.CustomPIDs, - disablePartnerIDs: in.SenderConfig.DisablePartnerIDs, - eventType: in.WrapperMetrics.EventType, - queryLatency: in.WrapperMetrics.QueryLatency, - } - obsSetUp := &OutboundSenderFactory{ - Config: in.SenderConfig, - Logger: in.Logger, - Metrics: in.OutbounderMetrics, +func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { + sw = &SinkWrapper{ + linger: in.SenderConfig.Linger, + logger: in.Logger, + eventType: in.WrapperMetrics.EventType, + queryLatency: in.WrapperMetrics.QueryLatency, + config: in.SenderConfig, + metrics: in.SenderMetrics, } - csw.outbounderSetUp = obsSetUp - csw.sender = doerFunc((&http.Client{ - Transport: tr, - Timeout: in.SenderConfig.ClientTimeout, - }).Do) if in.SenderConfig.Linger <= 0 { linger := fmt.Sprintf("linger not positive: %v", in.SenderConfig.Linger) err = errors.New(linger) - csw = nil + sw = nil return } - csw.senders = make(map[string]OutboundSender) - csw.shutdown = make(chan struct{}) + sw.senders = make(map[string]Sender) + sw.shutdown = make(chan struct{}) - csw.wg.Add(1) - go undertaker(csw) + sw.wg.Add(1) + go undertaker(sw) return } +// no longer being initialized at start up - needs to be initialized by the creation of the outbound sender func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { tr = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, @@ -155,7 +117,7 @@ func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http. // Update is called when we get changes to our webhook listeners with either // additions, or updates. This code takes care of building new OutboundSenders // and maintaining the existing OutboundSenders. -func (sw *CaduceusSenderWrapper) Update(list []ListenerStub) { +func (sw *SinkWrapper) Update(list []ListenerStub) { ids := make([]struct { Listener ListenerStub @@ -173,18 +135,17 @@ func (sw *CaduceusSenderWrapper) Update(list []ListenerStub) { for _, inValue := range ids { sender, ok := sw.senders[inValue.ID] if !ok { - osf := sw.outbounderSetUp - osf.Sender = sw.sender - osf.Listener = inValue.Listener + // osf.Sender = sw.sender + sw.listener = inValue.Listener metricWrapper, err := newMetricWrapper(time.Now, sw.queryLatency, inValue.ID) if err != nil { continue } - osf.ClientMiddleware = metricWrapper.roundTripper - obs, err := osf.New() + sw.clientMiddleware = metricWrapper.roundTripper + ss, err := newSinkSender(sw) if nil == err { - sw.senders[inValue.ID] = obs + sw.senders[inValue.ID] = ss } continue } @@ -195,7 +156,7 @@ func (sw *CaduceusSenderWrapper) Update(list []ListenerStub) { // Queue is used to send all the possible outbound senders a request. This // function performs the fan-out and filtering to multiple possible endpoints. -func (sw *CaduceusSenderWrapper) Queue(msg *wrp.Message) { +func (sw *SinkWrapper) Queue(msg *wrp.Message) { sw.mutex.RLock() defer sw.mutex.RUnlock() @@ -209,7 +170,7 @@ func (sw *CaduceusSenderWrapper) Queue(msg *wrp.Message) { // Shutdown closes down the delivery mechanisms and cleans up the underlying // OutboundSenders either gently (waiting for delivery queues to empty) or not // (dropping enqueued messages) -func (sw *CaduceusSenderWrapper) Shutdown(gentle bool) { +func (sw *SinkWrapper) Shutdown(gentle bool) { sw.mutex.Lock() defer sw.mutex.Unlock() for k, v := range sw.senders { @@ -221,7 +182,7 @@ func (sw *CaduceusSenderWrapper) Shutdown(gentle bool) { // undertaker looks at the OutboundSenders periodically and prunes the ones // that have been retired for too long, freeing up resources. -func undertaker(sw *CaduceusSenderWrapper) { +func undertaker(sw *SinkWrapper) { defer sw.wg.Done() // Collecting unused OutboundSenders isn't a huge priority, so do it // slowly. @@ -247,12 +208,12 @@ func undertaker(sw *CaduceusSenderWrapper) { } } -func createDeadlist(sw *CaduceusSenderWrapper, threshold time.Time) map[string]OutboundSender { +func createDeadlist(sw *SinkWrapper, threshold time.Time) map[string]Sender { if sw == nil || threshold.IsZero() { return nil } - deadList := make(map[string]OutboundSender) + deadList := make(map[string]Sender) sw.mutex.Lock() defer sw.mutex.Unlock() for k, v := range sw.senders { diff --git a/senderWrapper_test.go b/sinkWrapper_test.go similarity index 100% rename from senderWrapper_test.go rename to sinkWrapper_test.go From 66ee034590493922c6276a3004aa9f92401add20 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 15 Feb 2024 16:18:15 -0500 Subject: [PATCH 07/10] updated names and moved listener out of the sinkWrapper --- caduceus_type.go | 54 ++++++++++++++++++++++++++++++++++++------------ config.go | 2 +- httpClient.go | 2 +- main.go | 2 +- sinkSender.go | 34 ++++++++++++++---------------- sinkWrapper.go | 30 ++++++++++++++------------- 6 files changed, 76 insertions(+), 48 deletions(-) diff --git a/caduceus_type.go b/caduceus_type.go index 8575dfa1..3ccfcdcb 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -16,24 +16,52 @@ type CaduceusConfig struct { AuthHeader []string NumWorkerThreads int JobQueueSize int - Sender SenderConfig + Sink SinkConfig JWTValidators []JWTValidator AllowInsecureTLS bool } -type SenderConfig struct { - NumWorkersPerSender int - QueueSizePerSender int - CutOffPeriod time.Duration - Linger time.Duration - ClientTimeout time.Duration +type SinkConfig struct { + // The number of workers to assign to each SinkSender created. + NumWorkersPerSender int + + // The queue size to assign to each SinkSender created. + QueueSizePerSender int + + // The cut off time to assign to each SinkSender created. + CutOffPeriod time.Duration + + // The amount of time to let expired SinkSenders linger before + // shutting them down and cleaning up the resources associated with them. + Linger time.Duration + + // Number of delivery retries before giving up + DeliveryRetries int + + // Time in between delivery retries + DeliveryInterval time.Duration + + // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message + // has no partner IDs. + CustomPIDs []string + + // DisablePartnerIDs dictates whether or not to enforce the partner ID check. + DisablePartnerIDs bool + + // ClientTimeout specifies a time limit for requests made by the SinkSender's Client + ClientTimeout time.Duration + + //DisableClientHostnameValidation used in HTTP Client creation DisableClientHostnameValidation bool - ResponseHeaderTimeout time.Duration - IdleConnTimeout time.Duration - DeliveryRetries int - DeliveryInterval time.Duration - CustomPIDs []string - DisablePartnerIDs bool + + // ResponseHeaderTimeout specifies the amount of time to wait for a server's response headers after fully + // writing the request + ResponseHeaderTimeout time.Duration + + //IdleConnTimeout is the maximum amount of time an idle + // (keep-alive) connection will remain idle before closing + // itself. + IdleConnTimeout time.Duration } type RequestHandler interface { diff --git a/config.go b/config.go index 31fba38d..182bacc3 100644 --- a/config.go +++ b/config.go @@ -26,7 +26,7 @@ type Config struct { Servers Servers ArgusClientTimeout HttpClientTimeout JWTValidator JWTValidator - Sender SenderConfig + Sink SinkConfig Service Service AuthHeader []string Server string diff --git a/httpClient.go b/httpClient.go index 51057378..c420108c 100644 --- a/httpClient.go +++ b/httpClient.go @@ -16,7 +16,7 @@ var ( errNilHistogram = errors.New("histogram cannot be nil") ) -func nopHTTPClient(next Client) Client { +func nopClient(next Client) Client { return next } diff --git a/main.go b/main.go index 9a73d3e0..58416a99 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,7 @@ func caduceus(arguments []string, run bool) error { goschtalt.UnmarshalFunc[sallust.Config]("logging"), goschtalt.UnmarshalFunc[candlelight.Config]("tracing"), goschtalt.UnmarshalFunc[touchstone.Config]("prometheus"), - goschtalt.UnmarshalFunc[SenderConfig]("sender"), + goschtalt.UnmarshalFunc[SinkConfig]("sender"), goschtalt.UnmarshalFunc[Service]("service"), goschtalt.UnmarshalFunc[[]string]("authHeader"), goschtalt.UnmarshalFunc[bool]("previousVersionSupport"), diff --git a/sinkSender.go b/sinkSender.go index 57153d29..f10350e2 100644 --- a/sinkSender.go +++ b/sinkSender.go @@ -115,9 +115,9 @@ type SinkSender struct { clientMiddleware func(Client) Client } -func newSinkSender(sw *SinkWrapper) (s Sender, err error) { +func newSinkSender(sw *SinkWrapper, listener ListenerStub) (s Sender, err error) { if sw.clientMiddleware == nil { - sw.clientMiddleware = nopHTTPClient + sw.clientMiddleware = nopClient } if sw.client == nil { err = errors.New("nil Client") @@ -134,21 +134,19 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) { return } - // decoratedLogger := osf.Logger.With(zap.String("webhook.address", osf.Listener.Webhook.Address)) + decoratedLogger := sw.logger.With(zap.String("webhook.address", listener.Webhook.Address)) sinkSender := &SinkSender{ - // id: osf.Listener.Webhook.Config.URL, - listener: sw.listener, - client: sw.client, - queueSize: sw.config.QueueSizePerSender, - cutOffPeriod: sw.config.CutOffPeriod, - // deliverUntil: osf.Listener.Webhook.Until, - // logger: decoratedLogger, + client: sw.client, + queueSize: sw.config.QueueSizePerSender, + cutOffPeriod: sw.config.CutOffPeriod, + deliverUntil: listener.Webhook.Until, + logger: decoratedLogger, deliveryRetries: sw.config.DeliveryRetries, deliveryInterval: sw.config.DeliveryInterval, maxWorkers: sw.config.NumWorkersPerSender, failureMsg: FailureMessage{ - Original: sw.listener, + Original: listener, Text: failureText, CutOffPeriod: sw.config.CutOffPeriod.String(), QueueSize: sw.config.QueueSizePerSender, @@ -160,7 +158,7 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) { } // Don't share the secret with others when there is an error. - // caduceusOutboundSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" + sinkSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" CreateOutbounderMetrics(sw.metrics, sinkSender) @@ -170,9 +168,9 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) { sinkSender.queue.Store(make(chan *wrp.Message, sw.config.QueueSizePerSender)) - // if err = caduceusOutboundSender.Update(osf.Listener); nil != err { - // return - // } + if err = sinkSender.Update(listener); nil != err { + return + } sinkSender.workers = semaphore.New(sinkSender.maxWorkers) sinkSender.wg.Add(1) @@ -207,14 +205,14 @@ func (s *SinkSender) Update(wh ListenerStub) (err error) { events = append(events, re) } if len(events) < 1 { - err = errors.New("events must not be empty.") + err = errors.New("events must not be empty") return } // Create the matcher regex objects matcher := []*regexp.Regexp{} for _, item := range wh.Webhook.Matcher.DeviceID { - if ".*" == item { + if item == ".*" { // Match everything - skip the filtering matcher = []*regexp.Regexp{} break @@ -263,7 +261,7 @@ func (s *SinkSender) Update(wh ListenerStub) (err error) { s.matcher = matcher } - if 0 == urlCount { + if urlCount == 0 { s.urls = ring.New(1) s.urls.Value = s.id } else { diff --git a/sinkWrapper.go b/sinkWrapper.go index dd4899aa..d0aaa6e1 100644 --- a/sinkWrapper.go +++ b/sinkWrapper.go @@ -23,7 +23,7 @@ type SinkWrapperIn struct { fx.In Tracing candlelight.Tracing - SenderConfig SenderConfig + SinkConfig SinkConfig WrapperMetrics SinkWrapperMetrics SenderMetrics SinkSenderMetrics Logger *zap.Logger @@ -43,24 +43,26 @@ type Wrapper interface { // Wrapper contains the configuration that will be shared with each outbound sender. It contains no external parameters. type SinkWrapper struct { - // The amount of time to let expired OutboundSenders linger before + // The amount of time to let expired SinkSenders linger before // shutting them down and cleaning up the resources associated with them. linger time.Duration - // The logger implementation to share with OutboundSenders. + // The logger implementation to share with sinkSenders. logger *zap.Logger + //the configuration needed for eash sinkSender + config SinkConfig + mutex *sync.RWMutex senders map[string]Sender eventType *prometheus.CounterVec queryLatency prometheus.ObserverVec wg sync.WaitGroup shutdown chan struct{} - config SenderConfig metrics SinkSenderMetrics - client Client //should this be a part of wrapper or sender? - listener ListenerStub //should this be a part of wrapper or sender? - clientMiddleware func(Client) Client //should this be a part of wrapper or sender? + client Client //TODO: keeping here for now - but might move to SinkSender in a later PR + clientMiddleware func(Client) Client //TODO: keeping here for now - but might move to SinkSender in a later PR + } func ProvideWrapper() fx.Option { @@ -74,16 +76,16 @@ func ProvideWrapper() fx.Option { func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { sw = &SinkWrapper{ - linger: in.SenderConfig.Linger, + linger: in.SinkConfig.Linger, logger: in.Logger, eventType: in.WrapperMetrics.EventType, queryLatency: in.WrapperMetrics.QueryLatency, - config: in.SenderConfig, + config: in.SinkConfig, metrics: in.SenderMetrics, } - if in.SenderConfig.Linger <= 0 { - linger := fmt.Sprintf("linger not positive: %v", in.SenderConfig.Linger) + if in.SinkConfig.Linger <= 0 { + linger := fmt.Sprintf("linger not positive: %v", in.SinkConfig.Linger) err = errors.New(linger) sw = nil return @@ -98,7 +100,7 @@ func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { } // no longer being initialized at start up - needs to be initialized by the creation of the outbound sender -func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { +func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { tr = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, MaxIdleConnsPerHost: config.NumWorkersPerSender, @@ -136,14 +138,14 @@ func (sw *SinkWrapper) Update(list []ListenerStub) { sender, ok := sw.senders[inValue.ID] if !ok { // osf.Sender = sw.sender - sw.listener = inValue.Listener + listener := inValue.Listener metricWrapper, err := newMetricWrapper(time.Now, sw.queryLatency, inValue.ID) if err != nil { continue } sw.clientMiddleware = metricWrapper.roundTripper - ss, err := newSinkSender(sw) + ss, err := newSinkSender(sw, listener) if nil == err { sw.senders[inValue.ID] = ss } From 8a14214ebc97d89a8e532fc570efab782051d3ae Mon Sep 17 00:00:00 2001 From: maura fortino Date: Fri, 16 Feb 2024 12:20:19 -0500 Subject: [PATCH 08/10] updated the providewrapper function with the sink metrics and removed the providesinkmetrics function --- main.go | 1 - metrics.go | 27 --------------------------- metrics_test.go | 8 -------- sinkSender.go | 1 + sinkWrapper.go | 43 +++++++++++++++++++++++++++---------------- 5 files changed, 28 insertions(+), 52 deletions(-) diff --git a/main.go b/main.go index 58416a99..f8f08ed8 100644 --- a/main.go +++ b/main.go @@ -140,7 +140,6 @@ func caduceus(arguments []string, run bool) error { touchstone.Provide(), touchhttp.Provide(), ProvideMetrics(), - ProvideSinkMetrics(), // ancla.ProvideMetrics(), //TODO: need to add back in once we fix the ancla/argus dependency issue ) diff --git a/metrics.go b/metrics.go index 516704d2..e9e7be75 100644 --- a/metrics.go +++ b/metrics.go @@ -152,30 +152,3 @@ func ProvideMetrics() fx.Option { }, EventLabel), ) } - -func ProvideSinkMetrics() fx.Option { - return fx.Provide( - func(in SenderMetricsIn) (SinkWrapperMetrics, SinkSenderMetrics) { - senderMetrics := SinkSenderMetrics{ - DeliveryCounter: in.DeliveryCounter, - DeliveryRetryCounter: in.DeliveryRetryCounter, - DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, - CutOffCounter: in.CutOffCounter, - SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter, - DropsDueToPanic: in.DropsDueToPanic, - ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge, - ConsumerDropUntilGauge: in.ConsumerDropUntilGauge, - ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge, - ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge, - OutgoingQueueDepth: in.OutgoingQueueDepth, - ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, - } - wrapperMetrics := SinkWrapperMetrics{ - QueryLatency: in.QueryLatency, - EventType: in.EventType, - } - - return wrapperMetrics, senderMetrics - }, - ) -} diff --git a/metrics_test.go b/metrics_test.go index a3b957b5..8092f9f0 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -32,11 +32,3 @@ func TestMetrics(t *testing.T) { assert.NotNil(m) } - -func TestProvideSenderMetrics(t *testing.T) { - assert := assert.New(t) - - m := ProvideSinkMetrics() - - assert.NotNil(m) -} diff --git a/sinkSender.go b/sinkSender.go index f10350e2..1ba81360 100644 --- a/sinkSender.go +++ b/sinkSender.go @@ -70,6 +70,7 @@ type SinkSenderMetrics struct { ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec OutgoingQueueDepth *prometheus.GaugeVec ConsumerRenewalTimeGauge *prometheus.GaugeVec + QueryLatency prometheus.ObserverVec } // CaduceusOutboundSender is the outbound sender object. diff --git a/sinkWrapper.go b/sinkWrapper.go index d0aaa6e1..3a1e5065 100644 --- a/sinkWrapper.go +++ b/sinkWrapper.go @@ -22,16 +22,11 @@ import ( type SinkWrapperIn struct { fx.In - Tracing candlelight.Tracing - SinkConfig SinkConfig - WrapperMetrics SinkWrapperMetrics - SenderMetrics SinkSenderMetrics - Logger *zap.Logger -} - -type SinkWrapperMetrics struct { - QueryLatency prometheus.ObserverVec - EventType *prometheus.CounterVec + Tracing candlelight.Tracing + SinkConfig SinkConfig + SenderMetrics SinkSenderMetrics + EventType *prometheus.CounterVec + Logger *zap.Logger } // SinkWrapper interface is needed for unit testing. @@ -67,6 +62,23 @@ type SinkWrapper struct { func ProvideWrapper() fx.Option { return fx.Provide( + func(in SenderMetricsIn) SinkSenderMetrics { + senderMetrics := SinkSenderMetrics{ + DeliveryCounter: in.DeliveryCounter, + DeliveryRetryCounter: in.DeliveryRetryCounter, + DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, + CutOffCounter: in.CutOffCounter, + SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter, + DropsDueToPanic: in.DropsDueToPanic, + ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge, + ConsumerDropUntilGauge: in.ConsumerDropUntilGauge, + ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge, + ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge, + OutgoingQueueDepth: in.OutgoingQueueDepth, + ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, + } + return senderMetrics + }, func(in SinkWrapperIn) (*SinkWrapper, error) { csw, err := NewSinkWrapper(in) return csw, err @@ -76,12 +88,11 @@ func ProvideWrapper() fx.Option { func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { sw = &SinkWrapper{ - linger: in.SinkConfig.Linger, - logger: in.Logger, - eventType: in.WrapperMetrics.EventType, - queryLatency: in.WrapperMetrics.QueryLatency, - config: in.SinkConfig, - metrics: in.SenderMetrics, + linger: in.SinkConfig.Linger, + logger: in.Logger, + eventType: in.EventType, + config: in.SinkConfig, + metrics: in.SenderMetrics, } if in.SinkConfig.Linger <= 0 { From b236db590955febfc8110c215f583d85567d1717 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Fri, 16 Feb 2024 12:21:16 -0500 Subject: [PATCH 09/10] added queryLatency to sender metrics as it's not used by the wrapper --- sinkWrapper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sinkWrapper.go b/sinkWrapper.go index 3a1e5065..9e4d0b6e 100644 --- a/sinkWrapper.go +++ b/sinkWrapper.go @@ -76,6 +76,7 @@ func ProvideWrapper() fx.Option { ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge, OutgoingQueueDepth: in.OutgoingQueueDepth, ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, + QueryLatency: in.QueryLatency, } return senderMetrics }, From 659b1fe0756b0c4fc9f309e156d88ba43633fffb Mon Sep 17 00:00:00 2001 From: maura fortino Date: Fri, 16 Feb 2024 12:24:55 -0500 Subject: [PATCH 10/10] updated queryLatency --- metrics.go | 1 - sinkWrapper.go | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/metrics.go b/metrics.go index e9e7be75..5e563205 100644 --- a/metrics.go +++ b/metrics.go @@ -50,7 +50,6 @@ const ( type SenderMetricsIn struct { fx.In QueryLatency prometheus.ObserverVec `name:"query_duration_histogram_seconds"` - EventType *prometheus.CounterVec `name:"incoming_event_type_count"` DeliveryCounter *prometheus.CounterVec `name:"delivery_count"` DeliveryRetryCounter *prometheus.CounterVec `name:"delivery_retry_count"` DeliveryRetryMaxGauge *prometheus.GaugeVec `name:"delivery_retry_max"` diff --git a/sinkWrapper.go b/sinkWrapper.go index 9e4d0b6e..a8676bab 100644 --- a/sinkWrapper.go +++ b/sinkWrapper.go @@ -51,7 +51,6 @@ type SinkWrapper struct { mutex *sync.RWMutex senders map[string]Sender eventType *prometheus.CounterVec - queryLatency prometheus.ObserverVec wg sync.WaitGroup shutdown chan struct{} metrics SinkSenderMetrics @@ -151,7 +150,7 @@ func (sw *SinkWrapper) Update(list []ListenerStub) { if !ok { // osf.Sender = sw.sender listener := inValue.Listener - metricWrapper, err := newMetricWrapper(time.Now, sw.queryLatency, inValue.ID) + metricWrapper, err := newMetricWrapper(time.Now, sw.metrics.QueryLatency, inValue.ID) if err != nil { continue