Skip to content

Commit

Permalink
Merge pull request #479 from xmidt-org/main-rebase
Browse files Browse the repository at this point in the history
Main rebase
  • Loading branch information
denopink authored Apr 24, 2024
2 parents 15f8659 + e870f81 commit 9a1816f
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: xmidt-org/shared-go/.github/workflows/ci.yml@5bc4b83f25ff4c944cd6253ba189e50d1997ab3c # v4.1.0
with:
release-type: program
release-docker: true
release-docker: false
release-docker-latest: true
release-docker-major: true
release-docker-minor: true
Expand Down
18 changes: 10 additions & 8 deletions internal/client/httpClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type metricWrapper struct {
id string
}

func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) {
func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec) (*metricWrapper, error) {
if now == nil {
now = time.Now
}
Expand All @@ -33,7 +33,6 @@ func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec,
return &metricWrapper{
now: now,
queryLatency: queryLatency,
id: id,
}, nil
}

Expand All @@ -42,16 +41,19 @@ func (m *metricWrapper) RoundTripper(next Client) Client {
startTime := m.now()
resp, err := next.Do(req)
endTime := m.now()
code := metrics.NetworkError

if err == nil {
code := metrics.GenericDoReason
reason := metrics.NoErrReason
if err != nil {
reason = metrics.GetDoErrReason(err)
if resp != nil {
code = strconv.Itoa(resp.StatusCode)
}
} else {
code = strconv.Itoa(resp.StatusCode)
}

// find time difference, add to metric
var latency = endTime.Sub(startTime)
m.queryLatency.With(prometheus.Labels{metrics.UrlLabel: m.id, metrics.CodeLabel: code}).Observe(latency.Seconds())

m.queryLatency.With(prometheus.Labels{metrics.UrlLabel: req.URL.String(), metrics.CodeLabel: code, metrics.ReasonLabel: reason}).Observe(endTime.Sub(startTime).Seconds())
return resp, err
})
}
27 changes: 14 additions & 13 deletions internal/client/httpClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package client
// errTest := errors.New("test error")
// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC)
// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC)

// tests := []struct {
// description string
// startTime time.Time
Expand All @@ -33,32 +32,34 @@ package client
// description: "Success",
// startTime: date1,
// endTime: date2,
// expectedCode: "200",
// expectedCode: strconv.Itoa(http.StatusOK),
// request: exampleRequest(1),
// expectedErr: nil,
// expectedResponse: &http.Response{
// StatusCode: 200,
// StatusCode: http.StatusOK,
// },
// },
// {
// description: "503 Service Unavailable",
// startTime: date1,
// endTime: date2,
// expectedCode: "503",
// expectedCode: strconv.Itoa(http.StatusServiceUnavailable),
// request: exampleRequest(1),
// expectedErr: nil,
// expectedResponse: &http.Response{
// StatusCode: 503,
// StatusCode: http.StatusServiceUnavailable,
// },
// },
// {
// description: "Network Error",
// startTime: date1,
// endTime: date2,
// expectedCode: "network_err",
// request: exampleRequest(1),
// expectedErr: errTest,
// expectedResponse: nil,
// description: "Network Error",
// startTime: date1,
// endTime: date2,
// expectedCode: strconv.Itoa(http.StatusServiceUnavailable),
// request: exampleRequest(1),
// expectedErr: errors.New(genericDoReason),
// expectedResponse: &http.Response{
// StatusCode: http.StatusServiceUnavailable,
// },
// },
// }

Expand All @@ -69,7 +70,7 @@ package client
// fakeTime := mockTime(tc.startTime, tc.endTime)
// fakeHandler := new(mockHandler)
// fakeHist := new(mockHistogram)
// histogramFunctionCall := []string{"code", tc.expectedCode}
// histogramFunctionCall := []string{metrics.UrlLabel, tc.request.URL.String(), metrics.ReasonLabel, metrics.GetDoErrReason(tc.expectedErr), metrics.CodeLabel, tc.expectedCode}
// fakeLatency := date2.Sub(date1)
// fakeHist.On("With", histogramFunctionCall).Return().Once()
// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
Expand Down
4 changes: 2 additions & 2 deletions internal/handler/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (sh *ServerHandler) handleRequest(msg *wrp.Message) {

func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {
endTime := sh.now()
sh.telemetry.IncomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds())
sh.telemetry.IncomingQueueLatency.With(prometheus.Labels{metrics.EventLabel: eventType}).Observe(endTime.Sub(startTime).Seconds())
}

func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
Expand All @@ -179,7 +179,7 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
}

if reason != "" {
sh.telemetry.ModifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0)
sh.telemetry.ModifiedWRPCount.With(prometheus.Labels{metrics.ReasonLabel: reason}).Add(1.0)
}

return msg
Expand Down
16 changes: 8 additions & 8 deletions internal/handler/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestServerHandler(t *testing.T) {

fakeTime := mocks.Time(tc.startTime, tc.endTime)
fakeHist := new(mocks.Histogram)
histogramFunctionCall := []string{"event", tc.expectedEventType}
histogramFunctionCall := []string{metrics.EventLabel, tc.expectedEventType}
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
Expand Down Expand Up @@ -167,11 +167,11 @@ func TestServerHandlerFixWrp(t *testing.T) {
fakeIncomingContentTypeCount.On("Add", 1.0).Return()

fakeModifiedWRPCount := new(mocks.Counter)
fakeModifiedWRPCount.On("With", []string{"reason", metrics.BothEmptyReason}).Return(fakeIncomingContentTypeCount).Once()
fakeModifiedWRPCount.On("With", []string{metrics.ReasonLabel, metrics.BothEmptyReason}).Return(fakeIncomingContentTypeCount).Once()
fakeModifiedWRPCount.On("Add", 1.0).Return().Once()

fakeHist := new(mocks.Histogram)
histogramFunctionCall := []string{"event", "bob"}
histogramFunctionCall := []string{metrics.EventLabel, "bob"}
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestServerHandlerFull(t *testing.T) {
fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4)

fakeHist := new(mocks.Histogram)
histogramFunctionCall := []string{"event", metrics.UnknownEventType}
histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType}
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestServerEmptyPayload(t *testing.T) {
fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4)

fakeHist := new(mocks.Histogram)
histogramFunctionCall := []string{"event", metrics.UnknownEventType}
histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType}
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestServerUnableToReadBody(t *testing.T) {
fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4)

fakeHist := new(mocks.Histogram)
histogramFunctionCall := []string{"event", metrics.UnknownEventType}
histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType}
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestServerInvalidBody(t *testing.T) {
fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once()

fakeHist := new(mocks.Histogram)
histogramFunctionCall := []string{"event", metrics.UnknownEventType}
histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType}
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) {
date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC)
date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC)

histogramFunctionCall := []string{"event", metrics.UnknownEventType}
histogramFunctionCall := []string{metrics.EventLabel, metrics.UnknownEventType}
fakeLatency := date2.Sub(date1)

assert := assert.New(t)
Expand Down
81 changes: 77 additions & 4 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
package metrics

import (
"context"
"errors"
"net"
"net/url"
"strings"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"

Expand Down Expand Up @@ -33,18 +39,39 @@ const (
)

const (
NoErrReason = "no_err"
GenericDoReason = "do_error"
EmptyContentTypeReason = "empty_content_type"
EmptyUUIDReason = "empty_uuid"
BothEmptyReason = "empty_uuid_and_content_type"
NetworkError = "network_err"
UnknownEventType = "unknown"
)
// metric labels

const (
ReasonLabel = "reason"
CodeLabel = "code"
UrlLabel = "url"
EventLabel = "event"
CodeLabel = "code"
ReasonLabel = "reason"

// metric label values
// dropped messages reasons
genericDoReason = "do_error"
deadlineExceededReason = "context_deadline_exceeded"
contextCanceledReason = "context_canceled"
addressErrReason = "address_error"
parseAddrErrReason = "parse_address_error"
invalidAddrReason = "invalid_address"
dnsErrReason = "dns_error"
hostNotFoundReason = "host_not_found"
connClosedReason = "connection_closed"
opErrReason = "op_error"
networkErrReason = "unknown_network_err"
UpdateRequestURLFailedReason = "update_request_url_failed"
connectionUnexpectedlyClosedEOFReason = "connection_unexpectedly_closed_eof"
noErrReason = "no_err"

// dropped message codes
MessageDroppedCode = "message_dropped"
)

// MetricsIn will be populated automatically by the ProvideMetrics function
Expand Down Expand Up @@ -170,3 +197,49 @@ func Provide() fx.Option {
}, EventLabel),
)
}

func GetDoErrReason(err error) string {
var d *net.DNSError
if err == nil {
return NoErrReason
} else if errors.Is(err, context.DeadlineExceeded) {
return deadlineExceededReason

} else if errors.Is(err, context.Canceled) {
return contextCanceledReason

} else if errors.Is(err, &net.AddrError{}) {
return addressErrReason

} else if errors.Is(err, &net.ParseError{}) {
return parseAddrErrReason

} else if errors.Is(err, net.InvalidAddrError("")) {
return invalidAddrReason

} else if errors.As(err, &d) {
if d.IsNotFound {
return hostNotFoundReason
}

return dnsErrReason
} else if errors.Is(err, net.ErrClosed) {
return connClosedReason

} else if errors.Is(err, &net.OpError{}) {
return opErrReason

} else if errors.Is(err, net.UnknownNetworkError("")) {
return networkErrReason

}
// nolint: errorlint
if err, ok := err.(*url.Error); ok {
if strings.TrimSpace(strings.ToLower(err.Unwrap().Error())) == "eof" {
return connectionUnexpectedlyClosedEOFReason
}

}

return GenericDoReason
}
5 changes: 3 additions & 2 deletions internal/sink/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/xmidt-org/caduceus/internal/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -93,7 +94,7 @@ func (m1 *MatcherV1) Update(l ListenerV1) error {
for i := 0; i < urlCount; i++ {
_, err := url.Parse(l.Registration.Config.AlternativeURLs[i])
if err != nil {
m1.logger.Error("failed to update url", zap.Any("url", l.Registration.Config.AlternativeURLs[i]), zap.Error(err))
m1.logger.Error("failed to update url", zap.Any(metrics.UrlLabel, l.Registration.Config.AlternativeURLs[i]), zap.Error(err))
return err
}
}
Expand Down Expand Up @@ -160,7 +161,7 @@ func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool {
if matcher != nil {
matchDevice = false
for _, deviceRegex := range matcher {
if deviceRegex.MatchString(msg.Source) {
if deviceRegex.MatchString(msg.Source) || deviceRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) {
matchDevice = true
break
}
Expand Down
Loading

0 comments on commit 9a1816f

Please sign in to comment.