diff --git a/config.go b/config.go index 31fba38d..eda9e714 100644 --- a/config.go +++ b/config.go @@ -219,4 +219,7 @@ var defaultConfig = Config{ Tracing: candlelight.Config{ ApplicationName: applicationName, }, + Sender: SenderConfig{ + Linger: 3 * time.Minute, + }, } diff --git a/http.go b/http.go index 272338d7..59e67f60 100644 --- a/http.go +++ b/http.go @@ -40,20 +40,20 @@ type ServerHandler struct { } type HandlerTelemetryIn struct { fx.In - ErrorRequests prometheus.Counter `name:"error_request_body_counter"` - EmptyRequests prometheus.Counter `name:"empty_request_boyd_counter"` - InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"` - IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"` - ModifiedWRPCount prometheus.CounterVec `name:"modified_wrp_count"` - IncomingQueueLatency prometheus.HistogramVec `name:"incoming_queue_latency_histogram_seconds"` + ErrorRequests prometheus.Counter `name:"error_request_body_count"` + EmptyRequests prometheus.Counter `name:"empty_request_body_count"` + InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"` + IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"` + ModifiedWRPCount *prometheus.CounterVec `name:"modified_wrp_count"` + IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"` } type HandlerTelemetry struct { errorRequests prometheus.Counter emptyRequests prometheus.Counter invalidCount prometheus.Counter incomingQueueDepthMetric prometheus.Gauge - modifiedWRPCount prometheus.CounterVec - incomingQueueLatency prometheus.HistogramVec + modifiedWRPCount *prometheus.CounterVec + incomingQueueLatency prometheus.ObserverVec } func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { @@ -144,7 +144,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() - sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(float64(endTime.Sub(startTime).Seconds())) + sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) } func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { diff --git a/httpClient.go b/httpClient.go index e478bb44..1ce85498 100644 --- a/httpClient.go +++ b/httpClient.go @@ -33,15 +33,15 @@ func (d doerFunc) Do(req *http.Request) (*http.Response, error) { type metricWrapper struct { now func() time.Time - queryLatency prometheus.HistogramVec + queryLatency prometheus.ObserverVec id string } -func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) { +func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { if now == nil { now = time.Now } - if queryLatency.MetricVec == nil { + if queryLatency == nil { return nil, errNilHistogram } return &metricWrapper{ diff --git a/http_test.go b/http_test.go index 755d8637..3cf5895e 100644 --- a/http_test.go +++ b/http_test.go @@ -1,5 +1,5 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 +// // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// // SPDX-License-Identifier: Apache-2.0 package main // import ( @@ -13,8 +13,8 @@ package main // "github.com/stretchr/testify/assert" // "github.com/stretchr/testify/mock" +// "go.uber.org/zap/zaptest" -// "github.com/xmidt-org/webpa-common/v2/adapter" // "github.com/xmidt-org/wrp-go/v3" // ) @@ -88,7 +88,7 @@ package main // for _, tc := range tcs { // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // if !tc.throwStatusBadRequest { // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), @@ -112,7 +112,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // errorRequests: fakeErrorRequests, // emptyRequests: fakeEmptyRequests, @@ -145,7 +145,7 @@ package main // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).Return().Once() @@ -172,7 +172,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // errorRequests: fakeErrorRequests, // emptyRequests: fakeEmptyRequests, @@ -206,7 +206,7 @@ package main // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) @@ -221,7 +221,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // incomingQueueDepthMetric: fakeQueueDepth, // maxOutstanding: 1, @@ -259,7 +259,7 @@ package main // req := httptest.NewRequest("POST", "localhost:8080", r) // req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) @@ -276,7 +276,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // emptyRequests: fakeEmptyRequests, // incomingQueueDepthMetric: fakeQueueDepth, @@ -314,7 +314,7 @@ package main // req := httptest.NewRequest("POST", "localhost:8080", r) // req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() @@ -331,7 +331,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // errorRequests: fakeErrorRequests, // incomingQueueDepthMetric: fakeQueueDepth, @@ -368,7 +368,7 @@ package main // req := httptest.NewRequest("POST", "localhost:8080", r) // req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), // mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() @@ -386,7 +386,7 @@ package main // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // invalidCount: fakeInvalidCount, // incomingQueueDepthMetric: fakeQueueDepth, @@ -420,13 +420,13 @@ package main // assert := assert.New(t) -// logger := adapter.DefaultLogger().Logger +// logger := zaptest.NewLogger(t) // fakeHandler := new(mockHandler) // fakeQueueDepth := new(mockGauge) // serverWrapper := &ServerHandler{ -// Logger: logger, +// log: logger, // caduceusHandler: fakeHandler, // incomingQueueDepthMetric: fakeQueueDepth, // maxOutstanding: 1, diff --git a/main.go b/main.go index 185554a1..18525b81 100644 --- a/main.go +++ b/main.go @@ -140,6 +140,7 @@ func caduceus(arguments []string, run bool) error { touchstone.Provide(), touchhttp.Provide(), ProvideMetrics(), + ProvideSenderMetrics(), // ancla.ProvideMetrics(), //TODO: need to add back in once we fix the ancla/argus dependency issue ) diff --git a/main_test.go b/main_test.go index 2fc15712..8e1c72b9 100644 --- a/main_test.go +++ b/main_test.go @@ -1,83 +1,132 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 + package main import ( - "os" "testing" + + _ "github.com/goschtalt/goschtalt/pkg/typical" + _ "github.com/goschtalt/yaml-decoder" + _ "github.com/goschtalt/yaml-encoder" + "github.com/stretchr/testify/assert" + "github.com/xmidt-org/sallust" ) -func TestMain(m *testing.M) { - os.Exit(m.Run()) +func Test_provideCLI(t *testing.T) { + tests := []struct { + description string + args cliArgs + want CLI + exits bool + expectedErr error + }{ + { + description: "no arguments, everything works", + }, { + description: "dev mode", + args: cliArgs{"-d"}, + want: CLI{Dev: true}, + }, { + description: "invalid argument", + args: cliArgs{"-w"}, + exits: true, + }, { + description: "invalid argument", + args: cliArgs{"-d", "-w"}, + exits: true, + }, { + description: "help", + args: cliArgs{"-h"}, + exits: true, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) + + if tc.exits { + assert.Panics(func() { + _, _ = provideCLIWithOpts(tc.args, true) + }) + } else { + got, err := provideCLI(tc.args) + + assert.ErrorIs(err, tc.expectedErr) + want := tc.want + assert.Equal(&want, got) + } + }) + } +} + +func Test_caduceus(t *testing.T) { + tests := []struct { + description string + args []string + panic bool + expectedErr error + }{ + { + description: "show config and exit", + args: []string{"-s"}, + panic: true, + }, { + description: "show help and exit", + args: []string{"-h"}, + panic: true, + }, { + description: "do everything but run", + args: []string{"-f", "minimal.yaml"}, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) + + if tc.panic { + assert.Panics(func() { + _ = caduceus(tc.args, false) + }) + } else { + err := caduceus(tc.args, false) + + assert.ErrorIs(err, tc.expectedErr) + } + }) + } } -// func TestPrintVersionInfo(t *testing.T) { -// testCases := []struct { -// name string -// expectedOutput []string -// overrideValues func() -// lineCount int -// }{ -// { -// "default", -// []string{ -// "caduceus:", -// "version: \tundefined", -// "go version: \tgo", -// "built time: \tundefined", -// "git commit: \tundefined", -// "os/arch: \t", -// }, -// func() {}, -// 6, -// }, -// { -// "set values", -// []string{ -// "caduceus:", -// "version: \t1.0.0\n", -// "go version: \tgo", -// "built time: \tsome time\n", -// "git commit: \tgit sha\n", -// "os/arch: \t", -// }, -// func() { -// Version = "1.0.0" -// BuildTime = "some time" -// GitCommit = "git sha" -// }, -// 6, -// }, -// } -// for _, tc := range testCases { -// t.Run(tc.name, func(t *testing.T) { -// resetGlobals() -// tc.overrideValues() -// buf := &bytes.Buffer{} -// printVersionInfo(buf) -// count := 0 -// for { -// line, err := buf.ReadString(byte('\n')) -// if err != nil { -// break -// } -// assert.Contains(t, line, tc.expectedOutput[count]) -// if strings.Contains(line, "\t") { -// keyAndValue := strings.Split(line, "\t") -// // The value after the tab should have more than 2 characters -// // 1) the first character of the value and the new line -// assert.True(t, len(keyAndValue[1]) > 2) -// } -// count++ -// } -// assert.Equal(t, tc.lineCount, count) -// resetGlobals() -// }) -// } -// } +func Test_provideLogger(t *testing.T) { + tests := []struct { + description string + cli *CLI + cfg sallust.Config + expectedErr error + }{ + { + description: "validate empty config", + cfg: sallust.Config{}, + cli: &CLI{}, + }, { + description: "validate dev config", + cfg: sallust.Config{}, + cli: &CLI{Dev: true}, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) -// func resetGlobals() { -// Version = "undefined" -// BuildTime = "undefined" -// GitCommit = "undefined" -// } + got, err := provideLogger(tc.cli, tc.cfg) + + if tc.expectedErr == nil { + assert.NotNil(got) + assert.NoError(err) + return + } + assert.ErrorIs(err, tc.expectedErr) + assert.Nil(got) + }) + } +} diff --git a/metrics.go b/metrics.go index e822ce50..68494350 100644 --- a/metrics.go +++ b/metrics.go @@ -6,7 +6,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/fx" - // nolint:staticcheck "github.com/xmidt-org/touchstone" ) @@ -50,20 +49,20 @@ const ( type SenderMetricsIn struct { fx.In - QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"` - EventType prometheus.CounterVec `name:"incoming_event_type_count"` - DeliveryCounter prometheus.CounterVec `name:"delivery_count"` - DeliveryRetryCounter prometheus.CounterVec `name:"DeliveryRetryCounter"` - DeliveryRetryMaxGauge prometheus.GaugeVec `name:"delivery_retry_max"` - CutOffCounter prometheus.CounterVec `name:"slow_consumer_cut_off_count"` - SlowConsumerDroppedMsgCounter prometheus.CounterVec `name:"slow_consumer_dropped_message_count"` - DropsDueToPanic prometheus.CounterVec `name:"drops_due_to_panic"` - ConsumerDeliverUntilGauge prometheus.GaugeVec `name:"consumer_deliver_until"` - ConsumerDropUntilGauge prometheus.GaugeVec `name:"consumer_drop_until"` - ConsumerDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers"` - ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers_max"` - OutgoingQueueDepth prometheus.GaugeVec `name:"outgoing_queue_depths"` - ConsumerRenewalTimeGauge prometheus.GaugeVec `name:"consumer_renewal_time"` + QueryLatency prometheus.ObserverVec `name:"query_duration_histogram_seconds"` + EventType *prometheus.CounterVec `name:"incoming_event_type_count"` + DeliveryCounter *prometheus.CounterVec `name:"delivery_count"` + DeliveryRetryCounter *prometheus.CounterVec `name:"delivery_retry_count"` + DeliveryRetryMaxGauge *prometheus.GaugeVec `name:"delivery_retry_max"` + CutOffCounter *prometheus.CounterVec `name:"slow_consumer_cut_off_count"` + SlowConsumerDroppedMsgCounter *prometheus.CounterVec `name:"slow_consumer_dropped_message_count"` + DropsDueToPanic *prometheus.CounterVec `name:"drops_due_to_panic"` + ConsumerDeliverUntilGauge *prometheus.GaugeVec `name:"consumer_deliver_until"` + ConsumerDropUntilGauge *prometheus.GaugeVec `name:"consumer_drop_until"` + ConsumerDeliveryWorkersGauge *prometheus.GaugeVec `name:"consumer_delivery_workers"` + ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec `name:"consumer_delivery_workers_max"` + OutgoingQueueDepth *prometheus.GaugeVec `name:"outgoing_queue_depths"` + ConsumerRenewalTimeGauge *prometheus.GaugeVec `name:"consumer_renewal_time"` } // TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called diff --git a/metrics_test.go b/metrics_test.go index 8092f9f0..db6c3ddf 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -32,3 +32,11 @@ func TestMetrics(t *testing.T) { assert.NotNil(m) } + +func TestProvideSenderMetrics(t *testing.T) { + assert := assert.New(t) + + m := ProvideSenderMetrics() + + assert.NotNil(m) +} diff --git a/outboundSender.go b/outboundSender.go index c4ccc6ff..3117a935 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -64,18 +64,18 @@ type OutboundSenderFactory struct { } type OutboundSenderMetrics struct { - DeliveryCounter prometheus.CounterVec - DeliveryRetryCounter prometheus.CounterVec - DeliveryRetryMaxGauge prometheus.GaugeVec - CutOffCounter prometheus.CounterVec - SlowConsumerDroppedMsgCounter prometheus.CounterVec - DropsDueToPanic prometheus.CounterVec - ConsumerDeliverUntilGauge prometheus.GaugeVec - ConsumerDropUntilGauge prometheus.GaugeVec - ConsumerDeliveryWorkersGauge prometheus.GaugeVec - ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec - OutgoingQueueDepth prometheus.GaugeVec - ConsumerRenewalTimeGauge prometheus.GaugeVec + DeliveryCounter *prometheus.CounterVec + DeliveryRetryCounter *prometheus.CounterVec + DeliveryRetryMaxGauge *prometheus.GaugeVec + CutOffCounter *prometheus.CounterVec + SlowConsumerDroppedMsgCounter *prometheus.CounterVec + DropsDueToPanic *prometheus.CounterVec + ConsumerDeliverUntilGauge *prometheus.GaugeVec + ConsumerDropUntilGauge *prometheus.GaugeVec + ConsumerDeliveryWorkersGauge *prometheus.GaugeVec + ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec + OutgoingQueueDepth *prometheus.GaugeVec + ConsumerRenewalTimeGauge *prometheus.GaugeVec } // CaduceusOutboundSender is the outbound sender object. diff --git a/routes.go b/routes.go index 9bea1d3e..03369b81 100644 --- a/routes.go +++ b/routes.go @@ -68,18 +68,17 @@ func provideCoreOption(server string, in RoutesIn) arrangehttp.Option[http.Serve mux := chi.NewMux() - // TODO: should probably customize things a bit - mux.Use(recovery.Middleware(recovery.WithStatusCode(555))) - options := []otelmux.Option{ otelmux.WithTracerProvider(in.Tracing.TracerProvider()), otelmux.WithPropagators(in.Tracing.Propagator()), } + // TODO: should probably customize things a bit + mux.Use(recovery.Middleware(recovery.WithStatusCode(555)), otelmux.Middleware("server_primary", options...), + candlelight.EchoFirstTraceNodeInfo(in.Tracing.Propagator(), true)) + mux.Method("POST", urlPrefix+"/notify", in.Handler) if server == "primary" { - mux.Use(otelmux.Middleware("server_primary", options...), - candlelight.EchoFirstTraceNodeInfo(in.Tracing.Propagator(), true)) s.Handler = in.PrimaryMetrics.Then(mux) } else { s.Handler = in.AlternateMetrics.Then(mux) diff --git a/senderWrapper.go b/senderWrapper.go index 9dfa5e95..3e47f6d6 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -27,12 +27,11 @@ type CaduceusSenderWrapperIn struct { WrapperMetrics SenderWrapperMetrics OutbounderMetrics OutboundSenderMetrics Logger *zap.Logger - OutbounderFactory OutboundSenderFactory } type SenderWrapperMetrics struct { - QueryLatency prometheus.HistogramVec - EventType prometheus.CounterVec + QueryLatency prometheus.ObserverVec + EventType *prometheus.CounterVec } type SenderWrapper interface { @@ -69,8 +68,8 @@ type CaduceusSenderWrapper struct { mutex *sync.RWMutex senders map[string]OutboundSender - eventType prometheus.CounterVec - queryLatency prometheus.HistogramVec + eventType *prometheus.CounterVec + queryLatency prometheus.ObserverVec wg sync.WaitGroup shutdown chan struct{} @@ -111,17 +110,20 @@ func NewSenderWrapper(tr http.RoundTripper, in CaduceusSenderWrapperIn) (csw *Ca eventType: in.WrapperMetrics.EventType, queryLatency: in.WrapperMetrics.QueryLatency, } - - csw.outbounderSetUp.Config = in.SenderConfig - csw.outbounderSetUp.Logger = in.Logger - csw.outbounderSetUp.Metrics = in.OutbounderMetrics + obsSetUp := &OutboundSenderFactory{ + Config: in.SenderConfig, + Logger: in.Logger, + Metrics: in.OutbounderMetrics, + } + csw.outbounderSetUp = obsSetUp csw.sender = doerFunc((&http.Client{ Transport: tr, Timeout: in.SenderConfig.ClientTimeout, }).Do) if in.SenderConfig.Linger <= 0 { - err = errors.New("linger must be positive") + linger := fmt.Sprintf("linger not positive: %v", in.SenderConfig.Linger) + err = errors.New(linger) csw = nil return }