From 5b147288d386748eb47089910aef7658c2ff56b3 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Mon, 15 Apr 2024 10:43:58 -0400 Subject: [PATCH] updated new caduceus with some of the updates from old caduceus --- .github/workflows/ci.yml | 2 +- internal/client/httpClient.go | 18 ++++--- internal/client/httpClient_test.go | 27 +++++----- internal/handler/http.go | 4 +- internal/handler/http_test.go | 16 +++--- internal/metrics/metrics.go | 81 ++++++++++++++++++++++++++++-- internal/sink/matcher.go | 5 +- internal/sink/sink.go | 28 ++++++++--- internal/sink/sinkSender.go | 4 +- internal/sink/sinkSender_test.go | 50 +++++++++--------- internal/sink/sinkWrapper.go | 5 +- internal/sink/sinkWrapper_test.go | 58 ++++++++++----------- routes.go | 1 + 13 files changed, 197 insertions(+), 102 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 825d6ea1..ff024fd0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: uses: xmidt-org/shared-go/.github/workflows/ci.yml@5bc4b83f25ff4c944cd6253ba189e50d1997ab3c # v4.1.0 with: release-type: program - release-docker: true + release-docker: false release-docker-latest: true release-docker-major: true release-docker-minor: true diff --git a/internal/client/httpClient.go b/internal/client/httpClient.go index 43d02d6b..da9a4631 100644 --- a/internal/client/httpClient.go +++ b/internal/client/httpClient.go @@ -23,7 +23,7 @@ type metricWrapper struct { id string } -func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { +func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec) (*metricWrapper, error) { if now == nil { now = time.Now } @@ -33,7 +33,6 @@ func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, return &metricWrapper{ now: now, queryLatency: queryLatency, - id: id, }, nil } @@ -42,16 +41,19 @@ func (m *metricWrapper) RoundTripper(next Client) Client { startTime := m.now() resp, err := next.Do(req) endTime := m.now() - code := metrics.NetworkError - - if err == nil { + code := metrics.GenericDoReason + reason := metrics.NoErrReason + if err != nil { + reason = metrics.GetDoErrReason(err) + if resp != nil { + code = strconv.Itoa(resp.StatusCode) + } + } else { code = strconv.Itoa(resp.StatusCode) } // find time difference, add to metric - var latency = endTime.Sub(startTime) - m.queryLatency.With(prometheus.Labels{metrics.UrlLabel: m.id, metrics.CodeLabel: code}).Observe(latency.Seconds()) - + m.queryLatency.With(prometheus.Labels{metrics.UrlLabel: req.URL.String(), metrics.CodeLabel: code, metrics.ReasonLabel: reason}).Observe(endTime.Sub(startTime).Seconds()) return resp, err }) } diff --git a/internal/client/httpClient_test.go b/internal/client/httpClient_test.go index ce8e9b8e..a0589e0d 100644 --- a/internal/client/httpClient_test.go +++ b/internal/client/httpClient_test.go @@ -19,7 +19,6 @@ package client // errTest := errors.New("test error") // date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) // date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - // tests := []struct { // description string // startTime time.Time @@ -33,32 +32,34 @@ package client // description: "Success", // startTime: date1, // endTime: date2, -// expectedCode: "200", +// expectedCode: strconv.Itoa(http.StatusOK), // request: exampleRequest(1), // expectedErr: nil, // expectedResponse: &http.Response{ -// StatusCode: 200, +// StatusCode: http.StatusOK, // }, // }, // { // description: "503 Service Unavailable", // startTime: date1, // endTime: date2, -// expectedCode: "503", +// expectedCode: strconv.Itoa(http.StatusServiceUnavailable), // request: exampleRequest(1), // expectedErr: nil, // expectedResponse: &http.Response{ -// StatusCode: 503, +// StatusCode: http.StatusServiceUnavailable, // }, // }, // { -// description: "Network Error", -// startTime: date1, -// endTime: date2, -// expectedCode: "network_err", -// request: exampleRequest(1), -// expectedErr: errTest, -// expectedResponse: nil, +// description: "Network Error", +// startTime: date1, +// endTime: date2, +// expectedCode: strconv.Itoa(http.StatusServiceUnavailable), +// request: exampleRequest(1), +// expectedErr: errors.New(genericDoReason), +// expectedResponse: &http.Response{ +// StatusCode: http.StatusServiceUnavailable, +// }, // }, // } @@ -69,7 +70,7 @@ package client // fakeTime := mockTime(tc.startTime, tc.endTime) // fakeHandler := new(mockHandler) // fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"code", tc.expectedCode} +// histogramFunctionCall := []string{metrics.UrlLabel, tc.request.URL.String(), metrics.ReasonLabel, metrics.GetDoErrReason(tc.expectedErr), metrics.CodeLabel, tc.expectedCode} // fakeLatency := date2.Sub(date1) // fakeHist.On("With", histogramFunctionCall).Return().Once() // fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() diff --git a/internal/handler/http.go b/internal/handler/http.go index 13621c9e..8a14f310 100644 --- a/internal/handler/http.go +++ b/internal/handler/http.go @@ -154,7 +154,7 @@ func (sh *ServerHandler) handleRequest(msg *wrp.Message) { func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() - sh.telemetry.IncomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) + sh.telemetry.IncomingQueueLatency.With(prometheus.Labels{metrics.EventLabel: eventType}).Observe(endTime.Sub(startTime).Seconds()) } func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { @@ -179,7 +179,7 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { } if reason != "" { - sh.telemetry.ModifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0) + sh.telemetry.ModifiedWRPCount.With(prometheus.Labels{metrics.ReasonLabel: reason}).Add(1.0) } return msg diff --git a/internal/handler/http_test.go b/internal/handler/http_test.go index 901db7ee..ec526ac9 100644 --- a/internal/handler/http_test.go +++ b/internal/handler/http_test.go @@ -109,7 +109,7 @@ func TestServerHandler(t *testing.T) { fakeTime := mocks.Time(tc.startTime, tc.endTime) fakeHist := new(mocks.Histogram) - histogramFunctionCall := []string{"event", tc.expectedEventType} + histogramFunctionCall := []string{metrics.EventLabel, tc.expectedEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -167,11 +167,11 @@ func TestServerHandlerFixWrp(t *testing.T) { fakeIncomingContentTypeCount.On("Add", 1.0).Return() fakeModifiedWRPCount := new(mocks.Counter) - fakeModifiedWRPCount.On("With", []string{"reason", metrics.BothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() + fakeModifiedWRPCount.On("With", []string{metrics.ReasonLabel, metrics.BothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() fakeModifiedWRPCount.On("Add", 1.0).Return().Once() fakeHist := new(mocks.Histogram) - histogramFunctionCall := []string{"event", "bob"} + histogramFunctionCall := []string{metrics.EventLabel, "bob"} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -220,7 +220,7 @@ func TestServerHandlerFull(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mocks.Histogram) - histogramFunctionCall := []string{"event", metrics.UnknownEventType} + histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -275,7 +275,7 @@ func TestServerEmptyPayload(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mocks.Histogram) - histogramFunctionCall := []string{"event", metrics.UnknownEventType} + histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -331,7 +331,7 @@ func TestServerUnableToReadBody(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mocks.Histogram) - histogramFunctionCall := []string{"event", metrics.UnknownEventType} + histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -387,7 +387,7 @@ func TestServerInvalidBody(t *testing.T) { fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() fakeHist := new(mocks.Histogram) - histogramFunctionCall := []string{"event", metrics.UnknownEventType} + histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -423,7 +423,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - histogramFunctionCall := []string{"event", metrics.UnknownEventType} + histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType} fakeLatency := date2.Sub(date1) assert := assert.New(t) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 353c415e..628deefd 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -3,6 +3,12 @@ package metrics import ( + "context" + "errors" + "net" + "net/url" + "strings" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/fx" @@ -33,18 +39,39 @@ const ( ) const ( + NoErrReason = "no_err" + GenericDoReason = "do_error" EmptyContentTypeReason = "empty_content_type" EmptyUUIDReason = "empty_uuid" BothEmptyReason = "empty_uuid_and_content_type" NetworkError = "network_err" UnknownEventType = "unknown" -) + // metric labels -const ( - ReasonLabel = "reason" + CodeLabel = "code" UrlLabel = "url" EventLabel = "event" - CodeLabel = "code" + ReasonLabel = "reason" + + // metric label values + // dropped messages reasons + genericDoReason = "do_error" + deadlineExceededReason = "context_deadline_exceeded" + contextCanceledReason = "context_canceled" + addressErrReason = "address_error" + parseAddrErrReason = "parse_address_error" + invalidAddrReason = "invalid_address" + dnsErrReason = "dns_error" + hostNotFoundReason = "host_not_found" + connClosedReason = "connection_closed" + opErrReason = "op_error" + networkErrReason = "unknown_network_err" + UpdateRequestURLFailedReason = "update_request_url_failed" + connectionUnexpectedlyClosedEOFReason = "connection_unexpectedly_closed_eof" + noErrReason = "no_err" + + // dropped message codes + MessageDroppedCode = "message_dropped" ) // MetricsIn will be populated automatically by the ProvideMetrics function @@ -170,3 +197,49 @@ func Provide() fx.Option { }, EventLabel), ) } + +func GetDoErrReason(err error) string { + var d *net.DNSError + if err == nil { + return NoErrReason + } else if errors.Is(err, context.DeadlineExceeded) { + return deadlineExceededReason + + } else if errors.Is(err, context.Canceled) { + return contextCanceledReason + + } else if errors.Is(err, &net.AddrError{}) { + return addressErrReason + + } else if errors.Is(err, &net.ParseError{}) { + return parseAddrErrReason + + } else if errors.Is(err, net.InvalidAddrError("")) { + return invalidAddrReason + + } else if errors.As(err, &d) { + if d.IsNotFound { + return hostNotFoundReason + } + + return dnsErrReason + } else if errors.Is(err, net.ErrClosed) { + return connClosedReason + + } else if errors.Is(err, &net.OpError{}) { + return opErrReason + + } else if errors.Is(err, net.UnknownNetworkError("")) { + return networkErrReason + + } + // nolint: errorlint + if err, ok := err.(*url.Error); ok { + if strings.TrimSpace(strings.ToLower(err.Unwrap().Error())) == "eof" { + return connectionUnexpectedlyClosedEOFReason + } + + } + + return GenericDoReason +} diff --git a/internal/sink/matcher.go b/internal/sink/matcher.go index 15d15133..0cf0bd59 100644 --- a/internal/sink/matcher.go +++ b/internal/sink/matcher.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/wrp-go/v3" "go.uber.org/zap" ) @@ -93,7 +94,7 @@ func (m1 *MatcherV1) Update(l ListenerV1) error { for i := 0; i < urlCount; i++ { _, err := url.Parse(l.Registration.Config.AlternativeURLs[i]) if err != nil { - m1.logger.Error("failed to update url", zap.Any("url", l.Registration.Config.AlternativeURLs[i]), zap.Error(err)) + m1.logger.Error("failed to update url", zap.Any(metrics.UrlLabel, l.Registration.Config.AlternativeURLs[i]), zap.Error(err)) return err } } @@ -160,7 +161,7 @@ func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool { if matcher != nil { matchDevice = false for _, deviceRegex := range matcher { - if deviceRegex.MatchString(msg.Source) { + if deviceRegex.MatchString(msg.Source) || deviceRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { matchDevice = true break } diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 6f9128b9..bb9bfc5c 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -86,7 +86,7 @@ func (v1 *WebhookV1) Send(urls *ring.Ring, secret, acceptType string, msg *wrp.M if err != nil { // Report drop // s.droppedInvalidConfig.Add(1.0) - v1.logger.Error("Invalid URL", zap.String("url", urls.Value.(string)), zap.String("id", v1.id), zap.Error(err)) + v1.logger.Error("Invalid URL", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.String("id", v1.id), zap.Error(err)) return err } @@ -125,11 +125,22 @@ func (v1 *WebhookV1) Send(urls *ring.Ring, secret, acceptType string, msg *wrp.M ) resp, err := client.Do(req) - code := "failure" + var deliveryCounterLabels []string + code := metrics.MessageDroppedCode + reason := metrics.NoErrReason logger := v1.logger if err != nil { // Report failure - // s.droppedNetworkErrCounter.Add(1.0) + //TODO: add droppedMessage to webhook metrics and remove from sink sender? + // v1.droppedMessage.Add(1.0) + reason = metrics.GetDoErrReason(err) + if resp != nil { + code = strconv.Itoa(resp.StatusCode) + } + + logger = v1.logger.With(zap.String(metrics.ReasonLabel, reason), zap.Error(err)) + deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event} + // v1.droppedMessage.With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1) logger.Error("Dropped Network Error", zap.Error(err)) return err } else { @@ -142,9 +153,12 @@ func (v1 *WebhookV1) Send(urls *ring.Ring, secret, acceptType string, msg *wrp.M io.Copy(io.Discard, resp.Body) resp.Body.Close() } + + deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event} } - // s.deliveryCounter.With(prometheus.Labels{UrlLabel: s.id, CodeLabel: code, EventLabel: event}).Add(1.0) - logger.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String("code", code), zap.String("url", req.URL.String())) + //TODO: do we add deliveryCounter to webhook metrics and remove from sink sender? + // v1.deliveryCounter.With(prometheus.Labels{deliveryCounterLabels}).Add(1.0) + logger.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String(metrics.CodeLabel, code), zap.String(metrics.UrlLabel, req.URL.String())) return nil } @@ -165,6 +179,8 @@ func (v1 *WebhookV1) updateRequest(urls *ring.Ring) func(*http.Request) *http.Re tmp, err := url.Parse(urls.Value.(string)) if err != nil { v1.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err)) + //TODO: do we add droppedMessage metric to webhook and remove from sink sender? + // v1.droppedMessage.With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1) } request.URL = tmp return request @@ -176,7 +192,7 @@ func (v1 *WebhookV1) onAttempt(request *http.Request, event string) retry.OnAtte return func(attempt retry.Attempt[*http.Response]) { if attempt.Retries > 0 { // s.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0) - v1.logger.Debug("retrying HTTP transaction", zap.String("url", request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + v1.logger.Debug("retrying HTTP transaction", zap.String(metrics.UrlLabel, request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) } } diff --git a/internal/sink/sinkSender.go b/internal/sink/sinkSender.go index e7e91668..720b228a 100644 --- a/internal/sink/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -84,7 +84,7 @@ type SinkMetrics struct { droppedCutoffCounter prometheus.Counter droppedExpiredCounter prometheus.Counter droppedExpiredBeforeQueueCounter prometheus.Counter - droppedNetworkErrCounter prometheus.Counter + droppedMessage prometheus.Counter droppedInvalidConfig prometheus.Counter droppedPanic prometheus.Counter cutOffCounter prometheus.Counter @@ -454,7 +454,7 @@ func (s *sender) CreateMetrics(m metrics.Metrics) { s.droppedExpiredBeforeQueueCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"}) s.droppedCutoffCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}) s.droppedInvalidConfig = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}) - s.droppedNetworkErrCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) + s.droppedMessage = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) s.droppedPanic = m.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}) s.queueDepthGauge = m.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}) s.renewalTimeGauge = m.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) diff --git a/internal/sink/sinkSender_test.go b/internal/sink/sinkSender_test.go index 7dd524f2..1ef99143 100644 --- a/internal/sink/sinkSender_test.go +++ b/internal/sink/sinkSender_test.go @@ -59,8 +59,8 @@ package sink // // // 3. Trigger the On method on that "mockMetric" with various different cases of that metric, // // // in both senderWrapper_test.go and outboundSender_test.go // // // i.e: -// // // case 1: On("With", []string{"event", iot} -// // // case 2: On("With", []string{"event", unknown} +// // // case 1: On("With", []string{metrics.EventLabel, iot} +// // // case 2: On("With", []string{metrics.EventLabel, unknown} // // // 4. Mimic the metric behavior using On: // // // fakeSlow.On("Add", 1.0).Return() // // func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []string) *OutboundSenderFactory { @@ -89,34 +89,34 @@ package sink // // // test dc metric // // fakeDC := new(mockCounter) -// // fakeDC.On("With", []string{"url", w.Webhook.Config.URL, "code", "200", "event", "test"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "200", "event", "iot"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "200", "event", "unknown"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "failure", "event", "iot"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "event", "test"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "event", "iot"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "event", "unknown"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "201"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "202"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "204"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "429", "event", "iot"}).Return(fakeDC). -// // On("With", []string{"url", w.Webhook.Config.URL, "code", "failure"}).Return(fakeDC) +// // fakeDC.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "200", metrics.EventLabel, "test"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "200", metrics.EventLabel, "iot"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "200", metrics.EventLabel, "unknown"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "failure", metrics.EventLabel, "iot"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.EventLabel, "test"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.EventLabel, "iot"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.EventLabel, "unknown"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "201"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "202"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "204"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "429", metrics.EventLabel, "iot"}).Return(fakeDC). +// // On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "failure"}).Return(fakeDC) // // fakeDC.On("Add", 1.0).Return() // // fakeDC.On("Add", 0.0).Return() // // // test slow metric // // fakeSlow := new(mockCounter) -// // fakeSlow.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeSlow) +// // fakeSlow.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL}).Return(fakeSlow) // // fakeSlow.On("Add", 1.0).Return() // // // test dropped metric // // fakeDroppedSlow := new(mockCounter) -// // fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "queue_full"}).Return(fakeDroppedSlow) -// // fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "cut_off"}).Return(fakeDroppedSlow) -// // fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "expired"}).Return(fakeDroppedSlow) -// // fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "expired_before_queueing"}).Return(fakeDroppedSlow) -// // fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "invalid_config"}).Return(fakeDroppedSlow) -// // fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow) +// // fakeDroppedSlow.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.ReasonLabel, "queue_full"}).Return(fakeDroppedSlow) +// // fakeDroppedSlow.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.ReasonLabel, "cut_off"}).Return(fakeDroppedSlow) +// // fakeDroppedSlow.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.ReasonLabel, "expired"}).Return(fakeDroppedSlow) +// // fakeDroppedSlow.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.ReasonLabel, "expired_before_queueing"}).Return(fakeDroppedSlow) +// // fakeDroppedSlow.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.ReasonLabel, "invalid_config"}).Return(fakeDroppedSlow) +// // fakeDroppedSlow.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.ReasonLabel, "network_err"}).Return(fakeDroppedSlow) // // fakeDroppedSlow.On("Add", mock.Anything).Return() // // // IncomingContentType cases @@ -129,18 +129,18 @@ package sink // // // QueueDepth case // // fakeQdepth := new(mockGauge) -// // fakeQdepth.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeQdepth) +// // fakeQdepth.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL}).Return(fakeQdepth) // // fakeQdepth.On("Add", 1.0).Return().On("Add", -1.0).Return() // // // DropsDueToPanic case // // fakePanicDrop := new(mockCounter) -// // fakePanicDrop.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakePanicDrop) +// // fakePanicDrop.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL}).Return(fakePanicDrop) // // fakePanicDrop.On("Add", 1.0).Return() // // // Fake Latency // // fakeLatency := new(mockHistogram) -// // fakeLatency.On("With", []string{"url", w.Webhook.Config.URL, "code", "200"}).Return(fakeLatency) -// // fakeLatency.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeLatency) +// // fakeLatency.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL, metrics.CodeLabel, "200"}).Return(fakeLatency) +// // fakeLatency.On("With", []string{metrics.UrlLabel, w.Webhook.Config.URL}).Return(fakeLatency) // // fakeLatency.On("Observe", 1.0).Return() // // // Build a registry and register all fake metrics, these are synymous with the metrics in diff --git a/internal/sink/sinkWrapper.go b/internal/sink/sinkWrapper.go index 37fd1419..c953a4f4 100644 --- a/internal/sink/sinkWrapper.go +++ b/internal/sink/sinkWrapper.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/go-kit/kit/metrics/provider" "github.com/prometheus/client_golang/prometheus" "github.com/xmidt-org/caduceus/internal/client" "github.com/xmidt-org/caduceus/internal/metrics" @@ -155,7 +156,7 @@ func (w *wrapper) Update(list []Listener) { var err error listener := inValue.Listener - metricWrapper, err := client.NewMetricWrapper(time.Now, w.metrics.QueryLatency, inValue.ID) + metricWrapper, err := client.NewMetricWrapper(time.Now, w.metrics.QueryLatency.With(prometheus.Labels{metrics.UrlLabel: inValue.ID})) if err != nil { continue @@ -184,7 +185,7 @@ func (w *wrapper) Queue(msg *wrp.Message) { w.mutex.RLock() defer w.mutex.RUnlock() - w.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1) + w.eventType.With(prometheus.Labels{metrics.EventLabel: msg.FindEventStringSubMatch()}).Add(1) for _, v := range w.senders { v.Queue(msg) diff --git a/internal/sink/sinkWrapper_test.go b/internal/sink/sinkWrapper_test.go index 5bceb2cf..8c62e6f9 100644 --- a/internal/sink/sinkWrapper_test.go +++ b/internal/sink/sinkWrapper_test.go @@ -49,42 +49,42 @@ func (t *swTransport) RoundTrip(req *http.Request) (*http.Response, error) { // fakeGauge := new(mockGauge) // fakeGauge.On("Add", 1.0).Return(). // On("Add", -1.0).Return(). -// //On("With", []string{"url", "unknown"}).Return(fakeGauge). -// On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeGauge). -// On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeGauge) +// //On("With", []string{metrics.UrlLabel, "unknown"}).Return(fakeGauge). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo"}).Return(fakeGauge). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo"}).Return(fakeGauge) // // Fake Latency // fakeLatency := new(mockHistogram) -// fakeLatency.On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeLatency) -// fakeLatency.On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeLatency) -// fakeLatency.On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeLatency) -// fakeLatency.On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeLatency) +// fakeLatency.On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.CodeLabel, "200"}).Return(fakeLatency) +// fakeLatency.On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.CodeLabel, "200"}).Return(fakeLatency) +// fakeLatency.On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo"}).Return(fakeLatency) +// fakeLatency.On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo"}).Return(fakeLatency) // fakeLatency.On("Observe", 1.0).Return() // fakeIgnore := new(mockCounter) // fakeIgnore.On("Add", 1.0).Return().On("Add", 0.0).Return(). -// On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "event", "unknown"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "event", "unknown"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "reason", "cut_off"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "reason", "queue_full"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired_before_queueing"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "reason", "network_err"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "reason", "invalid_config"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "reason", "cut_off"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "reason", "queue_full"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired_before_queueing"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "reason", "invalid_config"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:8888/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). -// On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). -// On("With", []string{"event", "iot"}).Return(fakeIgnore). -// On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore). -// On("With", []string{"event", "bob/magic/dog"}).Return(fakeIgnore). -// On("With", []string{"event", "unknown"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.EventLabel, "unknown"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.EventLabel, "unknown"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.ReasonLabel, "cut_off"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.ReasonLabel, "queue_full"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.ReasonLabel, "expired"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.ReasonLabel, "expired_before_queueing"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.ReasonLabel, "network_err"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.ReasonLabel, "invalid_config"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.ReasonLabel, "cut_off"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.ReasonLabel, "queue_full"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.ReasonLabel, "expired"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.ReasonLabel, "expired_before_queueing"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.ReasonLabel, "network_err"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.ReasonLabel, "invalid_config"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:8888/foo", metrics.CodeLabel, "200", metrics.EventLabel, "unknown"}).Return(fakeIgnore). +// On("With", []string{metrics.UrlLabel, "http://localhost:9999/foo", metrics.CodeLabel, "200", metrics.EventLabel, "unknown"}).Return(fakeIgnore). +// On("With", []string{metrics.EventLabel, "iot"}).Return(fakeIgnore). +// On("With", []string{metrics.EventLabel, "test/extra-stuff"}).Return(fakeIgnore). +// On("With", []string{metrics.EventLabel, "bob/magic/dog"}).Return(fakeIgnore). +// On("With", []string{metrics.EventLabel, "unknown"}).Return(fakeIgnore). // On("With", []string{"content_type", "msgpack"}).Return(fakeIgnore). // On("With", []string{"content_type", "json"}).Return(fakeIgnore). // On("With", []string{"content_type", "http"}).Return(fakeIgnore). diff --git a/routes.go b/routes.go index 1159d7ab..e3a5b222 100644 --- a/routes.go +++ b/routes.go @@ -81,6 +81,7 @@ func provideCoreOption(server string, in RoutesIn) arrangehttp.Option[http.Serve otelmux.WithPropagators(in.Tracing.Propagator()), } + // TODO: should probably customize things a bit mux.Use(recovery.Middleware(recovery.WithStatusCode(555)), otelmux.Middleware("server_primary", options...), candlelight.EchoFirstTraceNodeInfo(in.Tracing.Propagator(), true))