diff --git a/caduceus_type.go b/caduceus_type.go index cd2b6bfc..3ccfcdcb 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -16,24 +16,52 @@ type CaduceusConfig struct { AuthHeader []string NumWorkerThreads int JobQueueSize int - Sender SenderConfig + Sink SinkConfig JWTValidators []JWTValidator AllowInsecureTLS bool } -type SenderConfig struct { - NumWorkersPerSender int - QueueSizePerSender int - CutOffPeriod time.Duration - Linger time.Duration - ClientTimeout time.Duration +type SinkConfig struct { + // The number of workers to assign to each SinkSender created. + NumWorkersPerSender int + + // The queue size to assign to each SinkSender created. + QueueSizePerSender int + + // The cut off time to assign to each SinkSender created. + CutOffPeriod time.Duration + + // The amount of time to let expired SinkSenders linger before + // shutting them down and cleaning up the resources associated with them. + Linger time.Duration + + // Number of delivery retries before giving up + DeliveryRetries int + + // Time in between delivery retries + DeliveryInterval time.Duration + + // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message + // has no partner IDs. + CustomPIDs []string + + // DisablePartnerIDs dictates whether or not to enforce the partner ID check. + DisablePartnerIDs bool + + // ClientTimeout specifies a time limit for requests made by the SinkSender's Client + ClientTimeout time.Duration + + //DisableClientHostnameValidation used in HTTP Client creation DisableClientHostnameValidation bool - ResponseHeaderTimeout time.Duration - IdleConnTimeout time.Duration - DeliveryRetries int - DeliveryInterval time.Duration - CustomPIDs []string - DisablePartnerIDs bool + + // ResponseHeaderTimeout specifies the amount of time to wait for a server's response headers after fully + // writing the request + ResponseHeaderTimeout time.Duration + + //IdleConnTimeout is the maximum amount of time an idle + // (keep-alive) connection will remain idle before closing + // itself. + IdleConnTimeout time.Duration } type RequestHandler interface { @@ -41,11 +69,11 @@ type RequestHandler interface { } type CaduceusHandler struct { - senderWrapper SenderWrapper + wrapper Wrapper *zap.Logger } func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { ch.Logger.Info("Worker received a request, now passing to sender", zap.Int("workerId", workerID)) - ch.senderWrapper.Queue(msg) + ch.wrapper.Queue(msg) } diff --git a/caduceus_type_test.go b/caduceus_type_test.go index 84580531..73ea159a 100644 --- a/caduceus_type_test.go +++ b/caduceus_type_test.go @@ -18,7 +18,7 @@ func TestCaduceusHandler(t *testing.T) { fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once() testHandler := CaduceusHandler{ - senderWrapper: fakeSenderWrapper, + wrapper: fakeSenderWrapper, Logger: logger, } diff --git a/client.go b/client.go new file mode 100644 index 00000000..dfad9c57 --- /dev/null +++ b/client.go @@ -0,0 +1,9 @@ +package main + +import "net/http" + +// Client is the interface used to requests messages to the desired location. +// The Client can be either and HTTP Client or a Kafka Producer. +type Client interface { + Do(*http.Request) (*http.Response, error) +} diff --git a/config.go b/config.go index 31fba38d..182bacc3 100644 --- a/config.go +++ b/config.go @@ -26,7 +26,7 @@ type Config struct { Servers Servers ArgusClientTimeout HttpClientTimeout JWTValidator JWTValidator - Sender SenderConfig + Sink SinkConfig Service Service AuthHeader []string Server string diff --git a/go.mod b/go.mod index 36314c58..02c5b94d 100644 --- a/go.mod +++ b/go.mod @@ -22,8 +22,10 @@ require ( github.com/xmidt-org/candlelight v0.0.21 github.com/xmidt-org/clortho v0.0.4 github.com/xmidt-org/httpaux v0.4.0 + github.com/xmidt-org/retry v0.0.3 github.com/xmidt-org/sallust v0.2.2 github.com/xmidt-org/touchstone v0.1.3 + github.com/xmidt-org/webhook-schema v0.1.0 github.com/xmidt-org/webpa-common/v2 v2.2.2 github.com/xmidt-org/wrp-go/v3 v3.2.3 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1 @@ -86,6 +88,7 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/xmidt-org/chronon v0.1.1 // indirect + github.com/xmidt-org/urlegit v0.1.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect diff --git a/go.sum b/go.sum index 678d69eb..90865caa 100644 --- a/go.sum +++ b/go.sum @@ -1690,6 +1690,8 @@ github.com/xmidt-org/httpaux v0.2.1/go.mod h1:mviIlg5fHGb3lAv3l0sbiwVG/q9rqvXaud github.com/xmidt-org/httpaux v0.3.2/go.mod h1:qmlPisXf80FTi3y4gX43eYbCVruSQyvu+FPx1jzvQG8= github.com/xmidt-org/httpaux v0.4.0 h1:cAL/MzIBpSsv4xZZeq/Eu1J5M3vfNe49xr41mP3COKU= github.com/xmidt-org/httpaux v0.4.0/go.mod h1:UypqZwuZV1nn8D6+K1JDb+im9IZrLNg/2oO/Bgiybxc= +github.com/xmidt-org/retry v0.0.3 h1:wvmBnEEn1OKwSZaQtr1RZ2Vey8JIvP72mGTgR+3wPiM= +github.com/xmidt-org/retry v0.0.3/go.mod h1:I7FO3VVrxPckNuotwGYZIxfBnmjMSyOTitTKNL0VkIA= github.com/xmidt-org/sallust v0.1.5/go.mod h1:azcKBypudADIeZ3Em8zGjVq3yQ7n4ueSvM/degHMIxo= github.com/xmidt-org/sallust v0.2.0/go.mod h1:HCQQn7po8czynjxtNVyZ5vzWuTqJyJwnPWkQoqBX67s= github.com/xmidt-org/sallust v0.2.1/go.mod h1:68C0DLwD5xlhRznXTWmfUhx0etyrFpSOzYGU7jzmpzs= @@ -1703,6 +1705,10 @@ github.com/xmidt-org/touchstone v0.1.1/go.mod h1:7Rgqs44l1VndkvFUZewr8WpItzxfJSx github.com/xmidt-org/touchstone v0.1.2/go.mod h1:2xVJVO8FE393Aofw/FD8Cu9wXES4n1AlJP109Nk7/gg= github.com/xmidt-org/touchstone v0.1.3 h1:AtqUBa8U4lyVQj6eRZ9Uo8NShkWWq3StObRRkrv7N0Q= github.com/xmidt-org/touchstone v0.1.3/go.mod h1:vzFD11v5urS3RKVP0jDva/j9aHEjwVZCg3nNMdRSk6E= +github.com/xmidt-org/urlegit v0.1.0 h1:WZLlWo0e5JNZabLEi7/1+sK/np9qrH9XnoB+ZdsHieM= +github.com/xmidt-org/urlegit v0.1.0/go.mod h1:ih/VtgW3xfpV7FNIrHUpNdP0GapcfLOND8y0JwH51vA= +github.com/xmidt-org/webhook-schema v0.1.0 h1:QYutPymtGd6OvukVHTDwpQIEvmk5uNnN8CgbppS089A= +github.com/xmidt-org/webhook-schema v0.1.0/go.mod h1:x3G1lmhryIbr6QXLyzagVkcfY1ZhBxGlP0CdvRD3zZI= github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8= diff --git a/http.go b/http.go index 272338d7..5cbe98b2 100644 --- a/http.go +++ b/http.go @@ -20,9 +20,9 @@ import ( type ServerHandlerIn struct { fx.In - CaduceusSenderWrapper *CaduceusSenderWrapper - Logger *zap.Logger - Telemetry *HandlerTelemetry + SinkWrapper *SinkWrapper + Logger *zap.Logger + Telemetry *HandlerTelemetry } type ServerHandlerOut struct { @@ -40,20 +40,20 @@ type ServerHandler struct { } type HandlerTelemetryIn struct { fx.In - ErrorRequests prometheus.Counter `name:"error_request_body_counter"` - EmptyRequests prometheus.Counter `name:"empty_request_boyd_counter"` - InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"` - IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"` - ModifiedWRPCount prometheus.CounterVec `name:"modified_wrp_count"` - IncomingQueueLatency prometheus.HistogramVec `name:"incoming_queue_latency_histogram_seconds"` + ErrorRequests prometheus.Counter `name:"error_request_body_count"` + EmptyRequests prometheus.Counter `name:"empty_request_body_count"` + InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"` + IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"` + ModifiedWRPCount *prometheus.CounterVec `name:"modified_wrp_count"` + IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"` } type HandlerTelemetry struct { errorRequests prometheus.Counter emptyRequests prometheus.Counter invalidCount prometheus.Counter incomingQueueDepthMetric prometheus.Gauge - modifiedWRPCount prometheus.CounterVec - incomingQueueLatency prometheus.HistogramVec + modifiedWRPCount *prometheus.CounterVec + incomingQueueLatency prometheus.ObserverVec } func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { @@ -144,7 +144,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() - sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(float64(endTime.Sub(startTime).Seconds())) + sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) } func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { @@ -189,18 +189,18 @@ func ProvideHandler() fx.Option { }, func(in ServerHandlerIn) (ServerHandlerOut, error) { //Hard coding maxOutstanding and incomingQueueDepth for now - handler, err := New(in.CaduceusSenderWrapper, in.Logger, in.Telemetry, 0.0, 0.0) + handler, err := New(in.SinkWrapper, in.Logger, in.Telemetry, 0.0, 0.0) return ServerHandlerOut{ Handler: handler, }, err }, ) } -func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { +func New(sw *SinkWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { return &ServerHandler{ caduceusHandler: &CaduceusHandler{ - senderWrapper: senderWrapper, - Logger: log, + wrapper: sw, + Logger: log, }, telemetry: t, maxOutstanding: maxOutstanding, diff --git a/httpClient.go b/httpClient.go index e478bb44..c420108c 100644 --- a/httpClient.go +++ b/httpClient.go @@ -16,11 +16,7 @@ var ( errNilHistogram = errors.New("histogram cannot be nil") ) -type httpClient interface { - Do(*http.Request) (*http.Response, error) -} - -func nopHTTPClient(next httpClient) httpClient { +func nopClient(next Client) Client { return next } @@ -33,15 +29,15 @@ func (d doerFunc) Do(req *http.Request) (*http.Response, error) { type metricWrapper struct { now func() time.Time - queryLatency prometheus.HistogramVec + queryLatency prometheus.ObserverVec id string } -func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) { +func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { if now == nil { now = time.Now } - if queryLatency.MetricVec == nil { + if queryLatency == nil { return nil, errNilHistogram } return &metricWrapper{ @@ -51,7 +47,7 @@ func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec }, nil } -func (m *metricWrapper) roundTripper(next httpClient) httpClient { +func (m *metricWrapper) roundTripper(next Client) Client { return doerFunc(func(req *http.Request) (*http.Response, error) { startTime := m.now() resp, err := next.Do(req) diff --git a/http_test.go b/http_test.go index 755d8637..ac91c1c2 100644 --- a/http_test.go +++ b/http_test.go @@ -1,474 +1,474 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 +// // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// // SPDX-License-Identifier: Apache-2.0 package main -// import ( -// "bytes" -// "io" -// "net/http" -// "net/http/httptest" -// "testing" -// "testing/iotest" -// "time" - -// "github.com/stretchr/testify/assert" -// "github.com/stretchr/testify/mock" - -// "github.com/xmidt-org/webpa-common/v2/adapter" -// "github.com/xmidt-org/wrp-go/v3" -// ) - -// func exampleRequest(msgType int, list ...string) *http.Request { -// var buffer bytes.Buffer - -// trans := "1234" -// ct := wrp.MimeTypeMsgpack -// url := "localhost:8080" - -// for i := range list { -// switch { -// case i == 0: -// trans = list[i] -// case i == 1: -// ct = list[i] -// case i == 2: -// url = list[i] -// } - -// } -// wrp.NewEncoder(&buffer, wrp.Msgpack).Encode( -// &wrp.Message{ -// Type: wrp.MessageType(msgType), -// Source: "mac:112233445566/lmlite", -// TransactionUUID: trans, -// ContentType: ct, -// Destination: "event:bob/magic/dog", -// Payload: []byte("Hello, world."), -// }) - -// r := bytes.NewReader(buffer.Bytes()) -// req := httptest.NewRequest("POST", url, r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// return req -// } - -// func TestServerHandler(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// tcs := []struct { -// desc string -// expectedResponse int -// request *http.Request -// throwStatusBadRequest bool -// expectedEventType string -// startTime time.Time -// endTime time.Time -// }{ -// { -// desc: "TestServeHTTPHappyPath", -// expectedResponse: http.StatusAccepted, -// request: exampleRequest(4), -// expectedEventType: "bob", -// startTime: date1, -// endTime: date2, -// }, -// { -// desc: "TestServeHTTPInvalidMessageType", -// expectedResponse: http.StatusBadRequest, -// request: exampleRequest(1), -// throwStatusBadRequest: true, -// expectedEventType: unknownEventType, -// startTime: date1, -// endTime: date2, -// }, -// } - -// for _, tc := range tcs { -// assert := assert.New(t) - -// logger := adapter.DefaultLogger().Logger -// fakeHandler := new(mockHandler) -// if !tc.throwStatusBadRequest { -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).Return().Times(1) -// } - -// fakeEmptyRequests := new(mockCounter) -// fakeErrorRequests := new(mockCounter) -// fakeInvalidCount := new(mockCounter) -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) -// if tc.throwStatusBadRequest { -// fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() -// } - -// fakeTime := mockTime(tc.startTime, tc.endTime) -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", tc.expectedEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// Logger: logger, -// caduceusHandler: fakeHandler, -// errorRequests: fakeErrorRequests, -// emptyRequests: fakeEmptyRequests, -// invalidCount: fakeInvalidCount, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: fakeTime, -// } -// t.Run(tc.desc, func(t *testing.T) { -// w := httptest.NewRecorder() - -// serverWrapper.ServeHTTP(w, tc.request) -// resp := w.Result() - -// assert.Equal(tc.expectedResponse, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHandler.AssertExpectations(t) -// fakeHist.AssertExpectations(t) -// }) -// } -// } - -// func TestServerHandlerFixWrp(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// logger := adapter.DefaultLogger().Logger -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).Return().Once() - -// fakeEmptyRequests := new(mockCounter) -// fakeErrorRequests := new(mockCounter) -// fakeInvalidCount := new(mockCounter) -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) - -// fakeIncomingContentTypeCount := new(mockCounter) -// fakeIncomingContentTypeCount.On("With", []string{"content_type", wrp.MimeTypeMsgpack}).Return(fakeIncomingContentTypeCount) -// fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount) -// fakeIncomingContentTypeCount.On("Add", 1.0).Return() - -// fakeModifiedWRPCount := new(mockCounter) -// fakeModifiedWRPCount.On("With", []string{"reason", bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() -// fakeModifiedWRPCount.On("Add", 1.0).Return().Once() - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", "bob"} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// Logger: logger, -// caduceusHandler: fakeHandler, -// errorRequests: fakeErrorRequests, -// emptyRequests: fakeEmptyRequests, -// invalidCount: fakeInvalidCount, -// modifiedWRPCount: fakeModifiedWRPCount, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPHappyPath", func(t *testing.T) { -// w := httptest.NewRecorder() - -// serverWrapper.ServeHTTP(w, exampleRequest(4, "", "")) -// resp := w.Result() - -// assert.Equal(http.StatusAccepted, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHandler.AssertExpectations(t) -// fakeHist.AssertExpectations(t) -// }) -// } - -// func TestServerHandlerFull(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// logger := adapter.DefaultLogger().Logger -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// Logger: logger, -// caduceusHandler: fakeHandler, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Act like we have 1 in flight */ -// serverWrapper.incomingQueueDepth = 1 - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, exampleRequest(4)) -// resp := w.Result() - -// assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHist.AssertExpectations(t) -// }) -// } - -// func TestServerEmptyPayload(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// var buffer bytes.Buffer -// r := bytes.NewReader(buffer.Bytes()) -// req := httptest.NewRequest("POST", "localhost:8080", r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// logger := adapter.DefaultLogger().Logger -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - -// fakeEmptyRequests := new(mockCounter) -// fakeEmptyRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// Logger: logger, -// caduceusHandler: fakeHandler, -// emptyRequests: fakeEmptyRequests, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusBadRequest, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// fakeHist.AssertExpectations(t) -// }) -// } - -// func TestServerUnableToReadBody(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// var buffer bytes.Buffer -// r := iotest.TimeoutReader(bytes.NewReader(buffer.Bytes())) - -// _, _ = r.Read(nil) -// req := httptest.NewRequest("POST", "localhost:8080", r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// logger := adapter.DefaultLogger().Logger -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - -// fakeErrorRequests := new(mockCounter) -// fakeErrorRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// Logger: logger, -// caduceusHandler: fakeHandler, -// errorRequests: fakeErrorRequests, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusBadRequest, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// }) -// fakeHist.AssertExpectations(t) -// } - -// func TestServerInvalidBody(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// assert := assert.New(t) - -// r := bytes.NewReader([]byte("Invalid payload.")) - -// _, _ = r.Read(nil) -// req := httptest.NewRequest("POST", "localhost:8080", r) -// req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - -// logger := adapter.DefaultLogger().Logger -// fakeHandler := new(mockHandler) -// fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), -// mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - -// fakeQueueDepth := new(mockGauge) -// fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - -// fakeInvalidCount := new(mockCounter) -// fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() - -// fakeHist := new(mockHistogram) -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// serverWrapper := &ServerHandler{ -// Logger: logger, -// caduceusHandler: fakeHandler, -// invalidCount: fakeInvalidCount, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// incomingQueueLatency: fakeHist, -// now: mockTime(date1, date2), -// } - -// t.Run("TestServeHTTPTooMany", func(t *testing.T) { -// w := httptest.NewRecorder() - -// /* Make the call that goes over the limit */ -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusBadRequest, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// }) -// fakeHist.AssertExpectations(t) -// } - -// func TestHandlerUnsupportedMediaType(t *testing.T) { -// date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) -// date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - -// histogramFunctionCall := []string{"event", unknownEventType} -// fakeLatency := date2.Sub(date1) - -// assert := assert.New(t) - -// logger := adapter.DefaultLogger().Logger -// fakeHandler := new(mockHandler) - -// fakeQueueDepth := new(mockGauge) - -// serverWrapper := &ServerHandler{ -// Logger: logger, -// caduceusHandler: fakeHandler, -// incomingQueueDepthMetric: fakeQueueDepth, -// maxOutstanding: 1, -// } -// testCases := []struct { -// name string -// headers []string -// }{ -// { -// name: "No Content Type Header", -// }, { -// name: "Wrong Content Type Header", -// headers: []string{"application/json"}, -// }, { -// name: "Multiple Content Type Headers", -// headers: []string{"application/msgpack", "application/msgpack", "application/msgpack"}, -// }, -// } - -// for _, testCase := range testCases { -// t.Run(testCase.name, func(t *testing.T) { -// fakeHist := new(mockHistogram) -// serverWrapper.incomingQueueLatency = fakeHist -// serverWrapper.now = mockTime(date1, date2) -// fakeHist.On("With", histogramFunctionCall).Return().Once() -// fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - -// w := httptest.NewRecorder() -// req := exampleRequest(4) -// req.Header.Del("Content-Type") -// for _, h := range testCase.headers { -// req.Header.Add("Content-Type", h) -// } -// serverWrapper.ServeHTTP(w, req) -// resp := w.Result() - -// assert.Equal(http.StatusUnsupportedMediaType, resp.StatusCode) -// if nil != resp.Body { -// io.Copy(io.Discard, resp.Body) -// resp.Body.Close() -// } -// }) -// } - -// } +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "testing" + "testing/iotest" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap/zaptest" + + "github.com/xmidt-org/wrp-go/v3" +) + +func exampleRequest(msgType int, list ...string) *http.Request { + var buffer bytes.Buffer + + trans := "1234" + ct := wrp.MimeTypeMsgpack + url := "localhost:8080" + + for i := range list { + switch { + case i == 0: + trans = list[i] + case i == 1: + ct = list[i] + case i == 2: + url = list[i] + } + + } + wrp.NewEncoder(&buffer, wrp.Msgpack).Encode( + &wrp.Message{ + Type: wrp.MessageType(msgType), + Source: "mac:112233445566/lmlite", + TransactionUUID: trans, + ContentType: ct, + Destination: "event:bob/magic/dog", + Payload: []byte("Hello, world."), + }) + + r := bytes.NewReader(buffer.Bytes()) + req := httptest.NewRequest("POST", url, r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + return req +} + +func TestServerHandler(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + tcs := []struct { + desc string + expectedResponse int + request *http.Request + throwStatusBadRequest bool + expectedEventType string + startTime time.Time + endTime time.Time + }{ + { + desc: "TestServeHTTPHappyPath", + expectedResponse: http.StatusAccepted, + request: exampleRequest(4), + expectedEventType: "bob", + startTime: date1, + endTime: date2, + }, + { + desc: "TestServeHTTPInvalidMessageType", + expectedResponse: http.StatusBadRequest, + request: exampleRequest(1), + throwStatusBadRequest: true, + expectedEventType: unknownEventType, + startTime: date1, + endTime: date2, + }, + } + + for _, tc := range tcs { + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + if !tc.throwStatusBadRequest { + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).Return().Times(1) + } + + fakeEmptyRequests := new(mockCounter) + fakeErrorRequests := new(mockCounter) + fakeInvalidCount := new(mockCounter) + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) + if tc.throwStatusBadRequest { + fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() + } + + fakeTime := mockTime(tc.startTime, tc.endTime) + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", tc.expectedEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + errorRequests: fakeErrorRequests, + emptyRequests: fakeEmptyRequests, + invalidCount: fakeInvalidCount, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: fakeTime, + } + t.Run(tc.desc, func(t *testing.T) { + w := httptest.NewRecorder() + + serverWrapper.ServeHTTP(w, tc.request) + resp := w.Result() + + assert.Equal(tc.expectedResponse, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHandler.AssertExpectations(t) + fakeHist.AssertExpectations(t) + }) + } +} + +func TestServerHandlerFixWrp(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).Return().Once() + + fakeEmptyRequests := new(mockCounter) + fakeErrorRequests := new(mockCounter) + fakeInvalidCount := new(mockCounter) + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) + + fakeIncomingContentTypeCount := new(mockCounter) + fakeIncomingContentTypeCount.On("With", []string{"content_type", wrp.MimeTypeMsgpack}).Return(fakeIncomingContentTypeCount) + fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount) + fakeIncomingContentTypeCount.On("Add", 1.0).Return() + + fakeModifiedWRPCount := new(mockCounter) + fakeModifiedWRPCount.On("With", []string{"reason", bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() + fakeModifiedWRPCount.On("Add", 1.0).Return().Once() + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", "bob"} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + errorRequests: fakeErrorRequests, + emptyRequests: fakeEmptyRequests, + invalidCount: fakeInvalidCount, + modifiedWRPCount: fakeModifiedWRPCount, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPHappyPath", func(t *testing.T) { + w := httptest.NewRecorder() + + serverWrapper.ServeHTTP(w, exampleRequest(4, "", "")) + resp := w.Result() + + assert.Equal(http.StatusAccepted, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHandler.AssertExpectations(t) + fakeHist.AssertExpectations(t) + }) +} + +func TestServerHandlerFull(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) + + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Act like we have 1 in flight */ + serverWrapper.incomingQueueDepth = 1 + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, exampleRequest(4)) + resp := w.Result() + + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHist.AssertExpectations(t) + }) +} + +func TestServerEmptyPayload(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + var buffer bytes.Buffer + r := bytes.NewReader(buffer.Bytes()) + req := httptest.NewRequest("POST", "localhost:8080", r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) + + fakeEmptyRequests := new(mockCounter) + fakeEmptyRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + emptyRequests: fakeEmptyRequests, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusBadRequest, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + fakeHist.AssertExpectations(t) + }) +} + +func TestServerUnableToReadBody(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + var buffer bytes.Buffer + r := iotest.TimeoutReader(bytes.NewReader(buffer.Bytes())) + + _, _ = r.Read(nil) + req := httptest.NewRequest("POST", "localhost:8080", r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() + + fakeErrorRequests := new(mockCounter) + fakeErrorRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + errorRequests: fakeErrorRequests, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusBadRequest, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + fakeHist.AssertExpectations(t) +} + +func TestServerInvalidBody(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + assert := assert.New(t) + + r := bytes.NewReader([]byte("Invalid payload.")) + + _, _ = r.Read(nil) + req := httptest.NewRequest("POST", "localhost:8080", r) + req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), + mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() + + fakeQueueDepth := new(mockGauge) + fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) + + fakeInvalidCount := new(mockCounter) + fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() + + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + invalidCount: fakeInvalidCount, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + incomingQueueLatency: fakeHist, + now: mockTime(date1, date2), + } + + t.Run("TestServeHTTPTooMany", func(t *testing.T) { + w := httptest.NewRecorder() + + /* Make the call that goes over the limit */ + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusBadRequest, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + fakeHist.AssertExpectations(t) +} + +func TestHandlerUnsupportedMediaType(t *testing.T) { + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + histogramFunctionCall := []string{"event", unknownEventType} + fakeLatency := date2.Sub(date1) + + assert := assert.New(t) + + logger := zaptest.NewLogger(t) + fakeHandler := new(mockHandler) + + fakeQueueDepth := new(mockGauge) + + serverWrapper := &ServerHandler{ + log: logger, + caduceusHandler: fakeHandler, + incomingQueueDepthMetric: fakeQueueDepth, + maxOutstanding: 1, + } + testCases := []struct { + name string + headers []string + }{ + { + name: "No Content Type Header", + }, { + name: "Wrong Content Type Header", + headers: []string{"application/json"}, + }, { + name: "Multiple Content Type Headers", + headers: []string{"application/msgpack", "application/msgpack", "application/msgpack"}, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + fakeHist := new(mockHistogram) + serverWrapper.incomingQueueLatency = fakeHist + serverWrapper.now = mockTime(date1, date2) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + w := httptest.NewRecorder() + req := exampleRequest(4) + req.Header.Del("Content-Type") + for _, h := range testCase.headers { + req.Header.Add("Content-Type", h) + } + serverWrapper.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(http.StatusUnsupportedMediaType, resp.StatusCode) + if nil != resp.Body { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + } + +} diff --git a/listenerStub.go b/listenerStub.go index f259eb56..a91c84a8 100644 --- a/listenerStub.go +++ b/listenerStub.go @@ -2,57 +2,140 @@ // SPDX-License-Identifier: Apache-2.0 package main -import "time" +import ( + "time" -//This is a stub for the ancla listener. This will be removed once we can add ancla back into caduceus + webhook "github.com/xmidt-org/webhook-schema" +) + +//This is a stub for the webhook and kafka listeners. This will be removed once the webhook-schema configuration is approved type ListenerStub struct { PartnerIds []string - Webhook Webhook + Webhook webhook.Registration } +// Webhook is a substructure with data related to event delivery. type Webhook struct { - // Address is the subscription request origin HTTP Address. - Address string `json:"registered_from_address"` + // Accept is the encoding type of outgoing events. The following encoding types are supported, otherwise + // a 406 response code is returned: application/octet-stream, application/json, application/jsonl, application/msgpack. + // Note: An `Accept` of application/octet-stream or application/json will result in a single response for batch sizes of 0 or 1 + // and batch sizes greater than 1 will result in a multipart response. An `Accept` of application/jsonl or application/msgpack + // will always result in a single response with a list of batched events for any batch size. + Accept string `json:"accept"` - // Config contains data to inform how events are delivered. - Config DeliveryConfig `json:"config"` + // AcceptEncoding is the content type of outgoing events. The following content types are supported, otherwise + // a 406 response code is returned: gzip. + AcceptEncoding string `json:"accept_encoding"` - // FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow. - // Optional, set to "" to disable notifications. - FailureURL string `json:"failure_url"` + // Secret is the string value. + // (Optional, set to "" to disable behavior). + Secret string `json:"secret,omitempty"` - // Events is the list of regular expressions to match an event type against. - Events []string `json:"events"` + // SecretHash is the hash algorithm to be used. Only sha256 HMAC and sha512 HMAC are supported. + // (Optional). + // The Default value is the largest sha HMAC supported, sha512 HMAC. + SecretHash string `json:"secret_hash"` - // Matcher type contains values to match against the metadata. - Matcher MetadataMatcherConfig `json:"matcher,omitempty"` + // If true, response will use the device content-type and wrp payload as its body + // Otherwise, response will Accecpt as the content-type and wrp message as its body + // Default: False (the entire wrp message is sent) + PayloadOnly bool `json:"payload_only"` - // Duration describes how long the subscription lasts once added. - Duration time.Duration `json:"duration"` + // ReceiverUrls is the list of receiver urls that will be used where as if the first url fails, + // then the second url would be used and so on. + // Note: either `ReceiverURLs` or `DNSSrvRecord` must be used but not both. + ReceiverURLs []string `json:"receiver_urls"` - // Until describes the time this subscription expires. - Until time.Time `json:"until"` + // DNSSrvRecord is the substructure for configuration related to load balancing. + // Note: either `ReceiverURLs` or `DNSSrvRecord` must be used but not both. + DNSSrvRecord struct { + // FQDNs is a list of FQDNs pointing to dns srv records + FQDNs []string `json:"fqdns"` + + // LoadBalancingScheme is the scheme to use for load balancing. Either the + // srv record attribute `weight` or `priortiy` can be used. + LoadBalancingScheme string `json:"load_balancing_scheme"` + } `json:"dns_srv_record"` } -// DeliveryConfig is a Webhook substructure with data related to event delivery. -type DeliveryConfig struct { - // URL is the HTTP URL to deliver messages to. - URL string `json:"url"` +// Kafka is a substructure with data related to event delivery. +type Kafka struct { + // Accept is content type value to set WRP messages to (unless already specified in the WRP). + Accept string `json:"accept"` - // ContentType is content type value to set WRP messages to (unless already specified in the WRP). - ContentType string `json:"content_type"` + // BootstrapServers is a list of kafka broker addresses. + BootstrapServers []string `json:"bootstrap_servers"` - // Secret is the string value for the SHA1 HMAC. - // (Optional, set to "" to disable behavior). - Secret string `json:"secret,omitempty"` + // TODO: figure out which kafka configuration substructures we want to expose to users (to be set by users) + // going to be based on https://pkg.go.dev/github.com/IBM/sarama#Config + // this substructures also includes auth related secrets, noted `MaxOpenRequests` will be excluded since it's already exposed + KafkaProducer struct{} `json:"kafka_producer"` +} - // AlternativeURLs is a list of explicit URLs that should be round robin through on failure cases to the main URL. - AlternativeURLs []string `json:"alt_urls,omitempty"` +type BatchHint struct { + // MaxLingerDuration is the maximum delay for batching if MaxMesasges has not been reached. + // Default value will set no maximum value. + MaxLingerDuration time.Duration `json:"max_linger_duration"` + // MaxMesasges is the maximum number of events that will be sent in a single batch. + // Default value will set no maximum value. + MaxMesasges int `json:"max_messages"` } -// MetadataMatcherConfig is Webhook substructure with config to match event metadata. -type MetadataMatcherConfig struct { - // DeviceID is the list of regular expressions to match device id type against. - DeviceID []string `json:"device_id"` +// FieldRegex is a substructure with data related to regular expressions. +type FieldRegex struct { + // Field is the wrp field to be used for regex. + // All wrp field can be used, refer to the schema for examples. + Field string `json:"field"` + + // FieldRegex is the regular expression to match `Field` against to. + Regex string `json:"regex"` +} + +type ContactInfo struct { + Name string `json:"name"` + Phone string `json:"phone"` + Email string `json:"email"` +} + +// RegistrationV2 is a special struct for unmarshaling sink information as part of a sink registration request. +type RegistrationV2 struct { + // ContactInfo contains contact information used to reach the owner of the registration. + // (Optional). + ContactInfo ContactInfo `json:"contact_info,omitempty"` + + // CanonicalName is the canonical name of the registration request. + // Reusing a CanonicalName will override the configurations set in that previous + // registration request with the same CanonicalName. + CanonicalName string `json:"canonical_name"` + + // Address is the subscription request origin HTTP Address. + Address string `json:"registered_from_address"` + + // Webhooks contains data to inform how events are delivered to multiple urls. + Webhooks []Webhook `json:"webhooks"` + + // Kafkas contains data to inform how events are delivered to multiple kafkas. + Kafkas []Kafka `json:"kafkas"` + + // Hash is a substructure for configuration related to distributing events among sinks. + // Note. Any failures due to a bad regex feild or regex expression will result in a silent failure. + Hash FieldRegex `json:"hash"` + + // BatchHint is the substructure for configuration related to event batching. + // (Optional, if omited then batches of singal events will be sent) + // Default value will disable batch. All zeros will also disable batch. + BatchHint BatchHint `json:"batch_hints"` + + // FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow. + // Optional, set to "" to disable notifications. + FailureURL string `json:"failure_url"` + + // Matcher is the list of regular expressions to match incoming events against to. + // Note. Any failures due to a bad regex feild or regex expression will result in a silent failure. + Matcher []FieldRegex `json:"matcher,omitempty"` + + // Expires describes the time this subscription expires. + // TODO: list of supported formats + Expires time.Time `json:"expires"` } diff --git a/main.go b/main.go index 185554a1..f8f08ed8 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,7 @@ func caduceus(arguments []string, run bool) error { goschtalt.UnmarshalFunc[sallust.Config]("logging"), goschtalt.UnmarshalFunc[candlelight.Config]("tracing"), goschtalt.UnmarshalFunc[touchstone.Config]("prometheus"), - goschtalt.UnmarshalFunc[SenderConfig]("sender"), + goschtalt.UnmarshalFunc[SinkConfig]("sender"), goschtalt.UnmarshalFunc[Service]("service"), goschtalt.UnmarshalFunc[[]string]("authHeader"), goschtalt.UnmarshalFunc[bool]("previousVersionSupport"), @@ -136,7 +136,7 @@ func caduceus(arguments []string, run bool) error { arrangehttp.ProvideServer("servers.alternate"), ProvideHandler(), - ProvideSenderWrapper(), + ProvideWrapper(), touchstone.Provide(), touchhttp.Provide(), ProvideMetrics(), diff --git a/main_test.go b/main_test.go index 2fc15712..2e138e31 100644 --- a/main_test.go +++ b/main_test.go @@ -1,83 +1,132 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 + package main import ( - "os" "testing" + + _ "github.com/goschtalt/goschtalt/pkg/typical" + _ "github.com/goschtalt/yaml-decoder" + _ "github.com/goschtalt/yaml-encoder" + "github.com/stretchr/testify/assert" + "github.com/xmidt-org/sallust" ) -func TestMain(m *testing.M) { - os.Exit(m.Run()) +func Test_provideCLI(t *testing.T) { + tests := []struct { + description string + args cliArgs + want CLI + exits bool + expectedErr error + }{ + { + description: "no arguments, everything works", + }, { + description: "dev mode", + args: cliArgs{"-d"}, + want: CLI{Dev: true}, + }, { + description: "invalid argument", + args: cliArgs{"-w"}, + exits: true, + }, { + description: "invalid argument", + args: cliArgs{"-d", "-w"}, + exits: true, + }, { + description: "help", + args: cliArgs{"-h"}, + exits: true, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) + + if tc.exits { + assert.Panics(func() { + _, _ = provideCLIWithOpts(tc.args, true) + }) + } else { + got, err := provideCLI(tc.args) + + assert.ErrorIs(err, tc.expectedErr) + want := tc.want + assert.Equal(&want, got) + } + }) + } +} + +func Test_caduceus(t *testing.T) { + tests := []struct { + description string + args []string + panic bool + expectedErr error + }{ + { + description: "show config and exit", + args: []string{"-s"}, + panic: true, + }, { + description: "show help and exit", + args: []string{"-h"}, + panic: true, + }, { + description: "do everything but run", + args: []string{"-f", "caduceus.yaml"}, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) + + if tc.panic { + assert.Panics(func() { + _ = caduceus(tc.args, false) + }) + } else { + err := caduceus(tc.args, false) + + assert.ErrorIs(err, tc.expectedErr) + } + }) + } } -// func TestPrintVersionInfo(t *testing.T) { -// testCases := []struct { -// name string -// expectedOutput []string -// overrideValues func() -// lineCount int -// }{ -// { -// "default", -// []string{ -// "caduceus:", -// "version: \tundefined", -// "go version: \tgo", -// "built time: \tundefined", -// "git commit: \tundefined", -// "os/arch: \t", -// }, -// func() {}, -// 6, -// }, -// { -// "set values", -// []string{ -// "caduceus:", -// "version: \t1.0.0\n", -// "go version: \tgo", -// "built time: \tsome time\n", -// "git commit: \tgit sha\n", -// "os/arch: \t", -// }, -// func() { -// Version = "1.0.0" -// BuildTime = "some time" -// GitCommit = "git sha" -// }, -// 6, -// }, -// } -// for _, tc := range testCases { -// t.Run(tc.name, func(t *testing.T) { -// resetGlobals() -// tc.overrideValues() -// buf := &bytes.Buffer{} -// printVersionInfo(buf) -// count := 0 -// for { -// line, err := buf.ReadString(byte('\n')) -// if err != nil { -// break -// } -// assert.Contains(t, line, tc.expectedOutput[count]) -// if strings.Contains(line, "\t") { -// keyAndValue := strings.Split(line, "\t") -// // The value after the tab should have more than 2 characters -// // 1) the first character of the value and the new line -// assert.True(t, len(keyAndValue[1]) > 2) -// } -// count++ -// } -// assert.Equal(t, tc.lineCount, count) -// resetGlobals() -// }) -// } -// } +func Test_provideLogger(t *testing.T) { + tests := []struct { + description string + cli *CLI + cfg sallust.Config + expectedErr error + }{ + { + description: "validate empty config", + cfg: sallust.Config{}, + cli: &CLI{}, + }, { + description: "validate dev config", + cfg: sallust.Config{}, + cli: &CLI{Dev: true}, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) -// func resetGlobals() { -// Version = "undefined" -// BuildTime = "undefined" -// GitCommit = "undefined" -// } + got, err := provideLogger(tc.cli, tc.cfg) + + if tc.expectedErr == nil { + assert.NotNil(got) + assert.NoError(err) + return + } + assert.ErrorIs(err, tc.expectedErr) + assert.Nil(got) + }) + } +} diff --git a/metrics.go b/metrics.go index e822ce50..5e563205 100644 --- a/metrics.go +++ b/metrics.go @@ -6,7 +6,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/fx" - // nolint:staticcheck "github.com/xmidt-org/touchstone" ) @@ -50,20 +49,19 @@ const ( type SenderMetricsIn struct { fx.In - QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"` - EventType prometheus.CounterVec `name:"incoming_event_type_count"` - DeliveryCounter prometheus.CounterVec `name:"delivery_count"` - DeliveryRetryCounter prometheus.CounterVec `name:"DeliveryRetryCounter"` - DeliveryRetryMaxGauge prometheus.GaugeVec `name:"delivery_retry_max"` - CutOffCounter prometheus.CounterVec `name:"slow_consumer_cut_off_count"` - SlowConsumerDroppedMsgCounter prometheus.CounterVec `name:"slow_consumer_dropped_message_count"` - DropsDueToPanic prometheus.CounterVec `name:"drops_due_to_panic"` - ConsumerDeliverUntilGauge prometheus.GaugeVec `name:"consumer_deliver_until"` - ConsumerDropUntilGauge prometheus.GaugeVec `name:"consumer_drop_until"` - ConsumerDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers"` - ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers_max"` - OutgoingQueueDepth prometheus.GaugeVec `name:"outgoing_queue_depths"` - ConsumerRenewalTimeGauge prometheus.GaugeVec `name:"consumer_renewal_time"` + QueryLatency prometheus.ObserverVec `name:"query_duration_histogram_seconds"` + DeliveryCounter *prometheus.CounterVec `name:"delivery_count"` + DeliveryRetryCounter *prometheus.CounterVec `name:"delivery_retry_count"` + DeliveryRetryMaxGauge *prometheus.GaugeVec `name:"delivery_retry_max"` + CutOffCounter *prometheus.CounterVec `name:"slow_consumer_cut_off_count"` + SlowConsumerDroppedMsgCounter *prometheus.CounterVec `name:"slow_consumer_dropped_message_count"` + DropsDueToPanic *prometheus.CounterVec `name:"drops_due_to_panic"` + ConsumerDeliverUntilGauge *prometheus.GaugeVec `name:"consumer_deliver_until"` + ConsumerDropUntilGauge *prometheus.GaugeVec `name:"consumer_drop_until"` + ConsumerDeliveryWorkersGauge *prometheus.GaugeVec `name:"consumer_delivery_workers"` + ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec `name:"consumer_delivery_workers_max"` + OutgoingQueueDepth *prometheus.GaugeVec `name:"outgoing_queue_depths"` + ConsumerRenewalTimeGauge *prometheus.GaugeVec `name:"consumer_renewal_time"` } // TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called @@ -153,30 +151,3 @@ func ProvideMetrics() fx.Option { }, EventLabel), ) } - -func ProvideSenderMetrics() fx.Option { - return fx.Provide( - func(in SenderMetricsIn) (SenderWrapperMetrics, OutboundSenderMetrics) { - outbounderMetrics := OutboundSenderMetrics{ - DeliveryCounter: in.DeliveryCounter, - DeliveryRetryCounter: in.DeliveryRetryCounter, - DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, - CutOffCounter: in.CutOffCounter, - SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter, - DropsDueToPanic: in.DropsDueToPanic, - ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge, - ConsumerDropUntilGauge: in.ConsumerDropUntilGauge, - ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge, - ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge, - OutgoingQueueDepth: in.OutgoingQueueDepth, - ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, - } - wrapperMetrics := SenderWrapperMetrics{ - QueryLatency: in.QueryLatency, - EventType: in.EventType, - } - - return wrapperMetrics, outbounderMetrics - }, - ) -} diff --git a/mocks_test.go b/mocks_test.go index 6f69a97a..5a2c19b8 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -4,7 +4,9 @@ package main import ( "time" + "unicode/utf8" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" "github.com/xmidt-org/wrp-go/v3" ) @@ -46,3 +48,24 @@ func mockTime(one, two time.Time) func() time.Time { return one } } + +type mockCounter struct { + mock.Mock +} + +func (m *mockCounter) Add(delta float64) { + m.Called(delta) +} + +func (m *mockCounter) Inc (){ + m.Called(1) +} +func (m *mockCounter) With(labelValues ...string) prometheus.Counter { + for _, v := range labelValues { + if !utf8.ValidString(v) { + panic("not UTF-8") + } + } + args := m.Called(labelValues) + return args.Get(0).(prometheus.Counter) +} diff --git a/routes.go b/routes.go index 9bea1d3e..03369b81 100644 --- a/routes.go +++ b/routes.go @@ -68,18 +68,17 @@ func provideCoreOption(server string, in RoutesIn) arrangehttp.Option[http.Serve mux := chi.NewMux() - // TODO: should probably customize things a bit - mux.Use(recovery.Middleware(recovery.WithStatusCode(555))) - options := []otelmux.Option{ otelmux.WithTracerProvider(in.Tracing.TracerProvider()), otelmux.WithPropagators(in.Tracing.Propagator()), } + // TODO: should probably customize things a bit + mux.Use(recovery.Middleware(recovery.WithStatusCode(555)), otelmux.Middleware("server_primary", options...), + candlelight.EchoFirstTraceNodeInfo(in.Tracing.Propagator(), true)) + mux.Method("POST", urlPrefix+"/notify", in.Handler) if server == "primary" { - mux.Use(otelmux.Middleware("server_primary", options...), - candlelight.EchoFirstTraceNodeInfo(in.Tracing.Propagator(), true)) s.Handler = in.PrimaryMetrics.Then(mux) } else { s.Handler = in.AlternateMetrics.Then(mux) diff --git a/senderWrapper.go b/senderWrapper.go deleted file mode 100644 index 9dfa5e95..00000000 --- a/senderWrapper.go +++ /dev/null @@ -1,265 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 -package main - -import ( - "crypto/tls" - "errors" - "fmt" - "net/http" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/xmidt-org/candlelight" - "github.com/xmidt-org/wrp-go/v3" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - "go.uber.org/fx" - "go.uber.org/zap" -) - -// SenderWrapperFactory configures the CaduceusSenderWrapper for creation -type CaduceusSenderWrapperIn struct { - fx.In - - Tracing candlelight.Tracing - SenderConfig SenderConfig - WrapperMetrics SenderWrapperMetrics - OutbounderMetrics OutboundSenderMetrics - Logger *zap.Logger - OutbounderFactory OutboundSenderFactory -} - -type SenderWrapperMetrics struct { - QueryLatency prometheus.HistogramVec - EventType prometheus.CounterVec -} - -type SenderWrapper interface { - // Update([]ancla.InternalWebhook) - Queue(*wrp.Message) - Shutdown(bool) -} - -// CaduceusSenderWrapper contains no external parameters. -type CaduceusSenderWrapper struct { - // The http client Do() function to share with OutboundSenders. - sender httpClient - // The number of workers to assign to each OutboundSender created. - numWorkersPerSender int - - // The queue size to assign to each OutboundSender created. - queueSizePerSender int - - // Number of delivery retries before giving up - deliveryRetries int - - // Time in between delivery retries - deliveryInterval time.Duration - - // The cut off time to assign to each OutboundSender created. - cutOffPeriod time.Duration - - // The amount of time to let expired OutboundSenders linger before - // shutting them down and cleaning up the resources associated with them. - linger time.Duration - - // The logger implementation to share with OutboundSenders. - logger *zap.Logger - - mutex *sync.RWMutex - senders map[string]OutboundSender - eventType prometheus.CounterVec - queryLatency prometheus.HistogramVec - wg sync.WaitGroup - shutdown chan struct{} - - // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message - // has no partner IDs. - customPIDs []string - - // DisablePartnerIDs dictates whether or not to enforce the partner ID check. - disablePartnerIDs bool - outbounderSetUp *OutboundSenderFactory -} - -func ProvideSenderWrapper() fx.Option { - return fx.Provide( - func(in CaduceusSenderWrapperIn) http.RoundTripper { - return NewRoundTripper(in.SenderConfig, in.Tracing) - }, - func(tr http.RoundTripper, in CaduceusSenderWrapperIn) (*CaduceusSenderWrapper, error) { - csw, err := NewSenderWrapper(tr, in) - return csw, err - }, - ) -} - -// New produces a new CaduceusSenderWrapper -// based on the SenderConfig -func NewSenderWrapper(tr http.RoundTripper, in CaduceusSenderWrapperIn) (csw *CaduceusSenderWrapper, err error) { - csw = &CaduceusSenderWrapper{ - numWorkersPerSender: in.SenderConfig.NumWorkersPerSender, - queueSizePerSender: in.SenderConfig.QueueSizePerSender, - deliveryRetries: in.SenderConfig.DeliveryRetries, - deliveryInterval: in.SenderConfig.DeliveryInterval, - cutOffPeriod: in.SenderConfig.CutOffPeriod, - linger: in.SenderConfig.Linger, - logger: in.Logger, - customPIDs: in.SenderConfig.CustomPIDs, - disablePartnerIDs: in.SenderConfig.DisablePartnerIDs, - eventType: in.WrapperMetrics.EventType, - queryLatency: in.WrapperMetrics.QueryLatency, - } - - csw.outbounderSetUp.Config = in.SenderConfig - csw.outbounderSetUp.Logger = in.Logger - csw.outbounderSetUp.Metrics = in.OutbounderMetrics - csw.sender = doerFunc((&http.Client{ - Transport: tr, - Timeout: in.SenderConfig.ClientTimeout, - }).Do) - - if in.SenderConfig.Linger <= 0 { - err = errors.New("linger must be positive") - csw = nil - return - } - - csw.senders = make(map[string]OutboundSender) - csw.shutdown = make(chan struct{}) - - csw.wg.Add(1) - go undertaker(csw) - - return -} - -func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { - tr = &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, - MaxIdleConnsPerHost: config.NumWorkersPerSender, - ResponseHeaderTimeout: config.ResponseHeaderTimeout, - IdleConnTimeout: config.IdleConnTimeout, - } - - tr = otelhttp.NewTransport(tr, - otelhttp.WithPropagators(tracing.Propagator()), - otelhttp.WithTracerProvider(tracing.TracerProvider()), - ) - return -} - -// Commenting out while until ancla/argus dependency issue is fixed. -// Update is called when we get changes to our webhook listeners with either -// additions, or updates. This code takes care of building new OutboundSenders -// and maintaining the existing OutboundSenders. -func (sw *CaduceusSenderWrapper) Update(list []ListenerStub) { - - ids := make([]struct { - Listener ListenerStub - ID string - }, len(list)) - - for i, v := range list { - ids[i].Listener = v - ids[i].ID = v.Webhook.Config.URL - } - - sw.mutex.Lock() - defer sw.mutex.Unlock() - - for _, inValue := range ids { - sender, ok := sw.senders[inValue.ID] - if !ok { - osf := sw.outbounderSetUp - osf.Sender = sw.sender - osf.Listener = inValue.Listener - metricWrapper, err := newMetricWrapper(time.Now, sw.queryLatency, inValue.ID) - - if err != nil { - continue - } - osf.ClientMiddleware = metricWrapper.roundTripper - obs, err := osf.New() - if nil == err { - sw.senders[inValue.ID] = obs - } - continue - } - fmt.Println(sender) - // sender.Update(inValue.Listener) //commenting out until argus/ancla fix - } -} - -// Queue is used to send all the possible outbound senders a request. This -// function performs the fan-out and filtering to multiple possible endpoints. -func (sw *CaduceusSenderWrapper) Queue(msg *wrp.Message) { - sw.mutex.RLock() - defer sw.mutex.RUnlock() - - sw.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1) - - for _, v := range sw.senders { - v.Queue(msg) - } -} - -// Shutdown closes down the delivery mechanisms and cleans up the underlying -// OutboundSenders either gently (waiting for delivery queues to empty) or not -// (dropping enqueued messages) -func (sw *CaduceusSenderWrapper) Shutdown(gentle bool) { - sw.mutex.Lock() - defer sw.mutex.Unlock() - for k, v := range sw.senders { - v.Shutdown(gentle) - delete(sw.senders, k) - } - close(sw.shutdown) -} - -// undertaker looks at the OutboundSenders periodically and prunes the ones -// that have been retired for too long, freeing up resources. -func undertaker(sw *CaduceusSenderWrapper) { - defer sw.wg.Done() - // Collecting unused OutboundSenders isn't a huge priority, so do it - // slowly. - ticker := time.NewTicker(2 * sw.linger) - for { - select { - case <-ticker.C: - threshold := time.Now().Add(-1 * sw.linger) - - // Actually shutting these down could take longer then we - // want to lock the mutex, so just remove them from the active - // list & shut them down afterwards. - deadList := createDeadlist(sw, threshold) - - // Shut them down - for _, v := range deadList { - v.Shutdown(false) - } - case <-sw.shutdown: - ticker.Stop() - return - } - } -} - -func createDeadlist(sw *CaduceusSenderWrapper, threshold time.Time) map[string]OutboundSender { - if sw == nil || threshold.IsZero() { - return nil - } - - deadList := make(map[string]OutboundSender) - sw.mutex.Lock() - defer sw.mutex.Unlock() - for k, v := range sw.senders { - retired := v.RetiredSince() - if threshold.After(retired) { - deadList[k] = v - delete(sw.senders, k) - } - } - return deadList -} diff --git a/outboundSender.go b/sinkSender.go similarity index 50% rename from outboundSender.go rename to sinkSender.go index c4ccc6ff..1ba81360 100644 --- a/outboundSender.go +++ b/sinkSender.go @@ -12,7 +12,9 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" + "net/url" "regexp" "strconv" "strings" @@ -20,11 +22,12 @@ import ( "sync/atomic" "time" - "github.com/xmidt-org/httpaux/retry" "go.uber.org/zap" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/retry" + "github.com/xmidt-org/retry/retryhttp" "github.com/xmidt-org/webpa-common/v2/semaphore" "github.com/xmidt-org/wrp-go/v3" "github.com/xmidt-org/wrp-go/v3/wrphttp" @@ -40,59 +43,51 @@ const failureText = `Unfortunately, your endpoint is not able to keep up with th // FailureMessage is a helper that lets us easily create a json struct to send // when we have to cut and endpoint off. type FailureMessage struct { - Text string `json:"text"` - // Original ancla.InternalWebhook `json:"webhook_registration"` //TODO: add back in once ancla/argus dependency issue is fixed. - CutOffPeriod string `json:"cut_off_period"` - QueueSize int `json:"queue_size"` - Workers int `json:"worker_count"` + Text string `json:"text"` + Original ListenerStub `json:"webhook_registration"` //TODO: remove listener stub once ancla/argus issues fixed + CutOffPeriod string `json:"cut_off_period"` + QueueSize int `json:"queue_size"` + Workers int `json:"worker_count"` } -type OutboundSender interface { +type Sender interface { // Update(ancla.InternalWebhook) error Shutdown(bool) RetiredSince() time.Time Queue(*wrp.Message) } -type OutboundSenderFactory struct { - Config SenderConfig - Logger *zap.Logger - Metrics OutboundSenderMetrics - Sender httpClient - Listener ListenerStub - ClientMiddleware func(httpClient) httpClient -} - -type OutboundSenderMetrics struct { - DeliveryCounter prometheus.CounterVec - DeliveryRetryCounter prometheus.CounterVec - DeliveryRetryMaxGauge prometheus.GaugeVec - CutOffCounter prometheus.CounterVec - SlowConsumerDroppedMsgCounter prometheus.CounterVec - DropsDueToPanic prometheus.CounterVec - ConsumerDeliverUntilGauge prometheus.GaugeVec - ConsumerDropUntilGauge prometheus.GaugeVec - ConsumerDeliveryWorkersGauge prometheus.GaugeVec - ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec - OutgoingQueueDepth prometheus.GaugeVec - ConsumerRenewalTimeGauge prometheus.GaugeVec +type SinkSenderMetrics struct { + DeliveryCounter *prometheus.CounterVec + DeliveryRetryCounter *prometheus.CounterVec + DeliveryRetryMaxGauge *prometheus.GaugeVec + CutOffCounter *prometheus.CounterVec + SlowConsumerDroppedMsgCounter *prometheus.CounterVec + DropsDueToPanic *prometheus.CounterVec + ConsumerDeliverUntilGauge *prometheus.GaugeVec + ConsumerDropUntilGauge *prometheus.GaugeVec + ConsumerDeliveryWorkersGauge *prometheus.GaugeVec + ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec + OutgoingQueueDepth *prometheus.GaugeVec + ConsumerRenewalTimeGauge *prometheus.GaugeVec + QueryLatency prometheus.ObserverVec } // CaduceusOutboundSender is the outbound sender object. -type CaduceusOutboundSender struct { - id string - urls *ring.Ring - // listener ancla.InternalWebhook //TODO: add back in once ancla/argus dependency issue is fixed +type SinkSender struct { + id string + urls *ring.Ring + listener ListenerStub deliverUntil time.Time dropUntil time.Time - sender httpClient + client Client events []*regexp.Regexp matcher []*regexp.Regexp queueSize int deliveryRetries int deliveryInterval time.Duration deliveryCounter prometheus.CounterVec - deliveryRetryCounter prometheus.Counter + deliveryRetryCounter *prometheus.CounterVec droppedQueueFullCounter prometheus.Counter droppedCutoffCounter prometheus.Counter droppedExpiredCounter prometheus.Counter @@ -118,217 +113,209 @@ type CaduceusOutboundSender struct { queue atomic.Value customPIDs []string disablePartnerIDs bool - clientMiddleware func(httpClient) httpClient + clientMiddleware func(Client) Client } -// New creates a new OutboundSender object from the factory, or returns an error. -func (osf *OutboundSenderFactory) New() (obs OutboundSender, err error) { - // if _, err = url.ParseRequestURI(osf.Listener.Webhook.Config.URL); nil != err { - // return - // } - - if osf.ClientMiddleware == nil { - osf.ClientMiddleware = nopHTTPClient +func newSinkSender(sw *SinkWrapper, listener ListenerStub) (s Sender, err error) { + if sw.clientMiddleware == nil { + sw.clientMiddleware = nopClient } - - if osf.Sender == nil { - err = errors.New("nil Sender()") + if sw.client == nil { + err = errors.New("nil Client") return } - if osf.Config.CutOffPeriod.Nanoseconds() == 0 { + if sw.config.CutOffPeriod.Nanoseconds() == 0 { err = errors.New("invalid CutOffPeriod") return } - if osf.Logger == nil { + if sw.logger == nil { err = errors.New("logger required") return } - // decoratedLogger := osf.Logger.With(zap.String("webhook.address", osf.Listener.Webhook.Address)) - - caduceusOutboundSender := &CaduceusOutboundSender{ - // id: osf.Listener.Webhook.Config.URL, - // listener: osf.Listener, - sender: osf.Sender, - queueSize: osf.Config.QueueSizePerSender, - cutOffPeriod: osf.Config.CutOffPeriod, - // deliverUntil: osf.Listener.Webhook.Until, - // logger: decoratedLogger, - deliveryRetries: osf.Config.DeliveryRetries, - deliveryInterval: osf.Config.DeliveryInterval, - maxWorkers: osf.Config.NumWorkersPerSender, + decoratedLogger := sw.logger.With(zap.String("webhook.address", listener.Webhook.Address)) + + sinkSender := &SinkSender{ + client: sw.client, + queueSize: sw.config.QueueSizePerSender, + cutOffPeriod: sw.config.CutOffPeriod, + deliverUntil: listener.Webhook.Until, + logger: decoratedLogger, + deliveryRetries: sw.config.DeliveryRetries, + deliveryInterval: sw.config.DeliveryInterval, + maxWorkers: sw.config.NumWorkersPerSender, failureMsg: FailureMessage{ - // Original: osf.Listener, + Original: listener, Text: failureText, - CutOffPeriod: osf.Config.CutOffPeriod.String(), - QueueSize: osf.Config.QueueSizePerSender, - Workers: osf.Config.NumWorkersPerSender, + CutOffPeriod: sw.config.CutOffPeriod.String(), + QueueSize: sw.config.QueueSizePerSender, + Workers: sw.config.NumWorkersPerSender, }, - customPIDs: osf.Config.CustomPIDs, - disablePartnerIDs: osf.Config.DisablePartnerIDs, - clientMiddleware: osf.ClientMiddleware, + customPIDs: sw.config.CustomPIDs, + disablePartnerIDs: sw.config.DisablePartnerIDs, + clientMiddleware: sw.clientMiddleware, } // Don't share the secret with others when there is an error. - // caduceusOutboundSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" + sinkSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - CreateOutbounderMetrics(osf.Metrics, caduceusOutboundSender) + CreateOutbounderMetrics(sw.metrics, sinkSender) // update queue depth and current workers gauge to make sure they start at 0 - caduceusOutboundSender.queueDepthGauge.Set(0) - caduceusOutboundSender.currentWorkersGauge.Set(0) + sinkSender.queueDepthGauge.Set(0) + sinkSender.currentWorkersGauge.Set(0) - caduceusOutboundSender.queue.Store(make(chan *wrp.Message, osf.Config.QueueSizePerSender)) + sinkSender.queue.Store(make(chan *wrp.Message, sw.config.QueueSizePerSender)) - // if err = caduceusOutboundSender.Update(osf.Listener); nil != err { - // return - // } + if err = sinkSender.Update(listener); nil != err { + return + } - caduceusOutboundSender.workers = semaphore.New(caduceusOutboundSender.maxWorkers) - caduceusOutboundSender.wg.Add(1) - go caduceusOutboundSender.dispatcher() + sinkSender.workers = semaphore.New(sinkSender.maxWorkers) + sinkSender.wg.Add(1) + go sinkSender.dispatcher() - obs = caduceusOutboundSender + s = sinkSender return } // Update applies user configurable values for the outbound sender when a // webhook is registered -//TODO: commenting out for now until argus/ancla dependency issue is fixed -// func (obs *CaduceusOutboundSender) Update(wh ancla.InternalWebhook) (err error) { - -// // Validate the failure URL, if present -// if "" != wh.Webhook.FailureURL { -// if _, err = url.ParseRequestURI(wh.Webhook.FailureURL); nil != err { -// return -// } -// } - -// // Create and validate the event regex objects -// // nolint:prealloc -// var events []*regexp.Regexp -// for _, event := range wh.Webhook.Events { -// var re *regexp.Regexp -// if re, err = regexp.Compile(event); nil != err { -// return -// } - -// events = append(events, re) -// } -// if len(events) < 1 { -// err = errors.New("events must not be empty.") -// return -// } - -// // Create the matcher regex objects -// matcher := []*regexp.Regexp{} -// for _, item := range wh.Webhook.Matcher.DeviceID { -// if ".*" == item { -// // Match everything - skip the filtering -// matcher = []*regexp.Regexp{} -// break -// } - -// var re *regexp.Regexp -// if re, err = regexp.Compile(item); nil != err { -// err = fmt.Errorf("invalid matcher item: '%s'", item) -// return -// } -// matcher = append(matcher, re) -// } - -// // Validate the various urls -// urlCount := len(wh.Webhook.Config.AlternativeURLs) -// for i := 0; i < urlCount; i++ { -// _, err = url.Parse(wh.Webhook.Config.AlternativeURLs[i]) -// if err != nil { -// obs.logger.Error("failed to update url", zap.Any("url", wh.Webhook.Config.AlternativeURLs[i]), zap.Error(err)) -// return -// } -// } - -// obs.renewalTimeGauge.Set(float64(time.Now().Unix())) - -// // write/update obs -// obs.mutex.Lock() - -// obs.listener = wh - -// obs.failureMsg.Original = wh -// // Don't share the secret with others when there is an error. -// obs.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - -// obs.listener.Webhook.FailureURL = wh.Webhook.FailureURL -// obs.deliverUntil = wh.Webhook.Until -// obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) - -// obs.events = events - -// obs.deliveryRetryMaxGauge.Set(float64(obs.deliveryRetries)) - -// // if matcher list is empty set it nil for Queue() logic -// obs.matcher = nil -// if 0 < len(matcher) { -// obs.matcher = matcher -// } - -// if 0 == urlCount { -// obs.urls = ring.New(1) -// obs.urls.Value = obs.id -// } else { -// r := ring.New(urlCount) -// for i := 0; i < urlCount; i++ { -// r.Value = wh.Webhook.Config.AlternativeURLs[i] -// r = r.Next() -// } -// obs.urls = r -// } - -// // Randomize where we start so all the instances don't synchronize -// r := rand.New(rand.NewSource(time.Now().UnixNano())) -// offset := r.Intn(obs.urls.Len()) -// for 0 < offset { -// obs.urls = obs.urls.Next() -// offset-- -// } - -// // Update this here in case we make this configurable later -// obs.maxWorkersGauge.Set(float64(obs.maxWorkers)) - -// obs.mutex.Unlock() - -// return -// } +// TODO: commenting out for now until argus/ancla dependency issue is fixed +func (s *SinkSender) Update(wh ListenerStub) (err error) { + + // Validate the failure URL, if present + if wh.Webhook.FailureURL != "" { + if _, err = url.ParseRequestURI(wh.Webhook.FailureURL); nil != err { + return + } + } + + // Create and validate the event regex objects + // nolint:prealloc + var events []*regexp.Regexp + for _, event := range wh.Webhook.Events { + var re *regexp.Regexp + if re, err = regexp.Compile(event); nil != err { + return + } + + events = append(events, re) + } + if len(events) < 1 { + err = errors.New("events must not be empty") + return + } + + // Create the matcher regex objects + matcher := []*regexp.Regexp{} + for _, item := range wh.Webhook.Matcher.DeviceID { + if item == ".*" { + // Match everything - skip the filtering + matcher = []*regexp.Regexp{} + break + } + + var re *regexp.Regexp + if re, err = regexp.Compile(item); nil != err { + err = fmt.Errorf("invalid matcher item: '%s'", item) + return + } + matcher = append(matcher, re) + } + + // Validate the various urls + urlCount := len(wh.Webhook.Config.AlternativeURLs) + for i := 0; i < urlCount; i++ { + _, err = url.Parse(wh.Webhook.Config.AlternativeURLs[i]) + if err != nil { + s.logger.Error("failed to update url", zap.Any("url", wh.Webhook.Config.AlternativeURLs[i]), zap.Error(err)) + return + } + } + + s.renewalTimeGauge.Set(float64(time.Now().Unix())) + + // write/update obs + s.mutex.Lock() + + s.listener = wh + + s.failureMsg.Original = wh + // Don't share the secret with others when there is an error. + s.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" + + s.listener.Webhook.FailureURL = wh.Webhook.FailureURL + s.deliverUntil = wh.Webhook.Until + s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix())) + + s.events = events + + s.deliveryRetryMaxGauge.Set(float64(s.deliveryRetries)) + + // if matcher list is empty set it nil for Queue() logic + s.matcher = nil + if 0 < len(matcher) { + s.matcher = matcher + } + + if urlCount == 0 { + s.urls = ring.New(1) + s.urls.Value = s.id + } else { + r := ring.New(urlCount) + for i := 0; i < urlCount; i++ { + r.Value = wh.Webhook.Config.AlternativeURLs[i] + r = r.Next() + } + s.urls = r + } + + // Randomize where we start so all the instances don't synchronize + r := rand.New(rand.NewSource(time.Now().UnixNano())) + offset := r.Intn(s.urls.Len()) + for 0 < offset { + s.urls = s.urls.Next() + offset-- + } + + // Update this here in case we make this configurable later + s.maxWorkersGauge.Set(float64(s.maxWorkers)) + + s.mutex.Unlock() + + return +} // Shutdown causes the CaduceusOutboundSender to stop its activities either gently or // abruptly based on the gentle parameter. If gentle is false, all queued // messages will be dropped without an attempt to send made. -func (obs *CaduceusOutboundSender) Shutdown(gentle bool) { +func (s *SinkSender) Shutdown(gentle bool) { if !gentle { // need to close the channel we're going to replace, in case it doesn't // have any events in it. - close(obs.queue.Load().(chan *wrp.Message)) - obs.Empty(obs.droppedExpiredCounter) + close(s.queue.Load().(chan *wrp.Message)) + s.Empty(s.droppedExpiredCounter) } - close(obs.queue.Load().(chan *wrp.Message)) - obs.wg.Wait() - - obs.mutex.Lock() - obs.deliverUntil = time.Time{} - obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) - obs.queueDepthGauge.Set(0) //just in case - obs.mutex.Unlock() + close(s.queue.Load().(chan *wrp.Message)) + s.wg.Wait() + + s.mutex.Lock() + s.deliverUntil = time.Time{} + s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix())) + s.queueDepthGauge.Set(0) //just in case + s.mutex.Unlock() } // RetiredSince returns the time the CaduceusOutboundSender retired (which could be in // the future). -func (obs *CaduceusOutboundSender) RetiredSince() time.Time { - obs.mutex.RLock() - deliverUntil := obs.deliverUntil - obs.mutex.RUnlock() +func (s *SinkSender) RetiredSince() time.Time { + s.mutex.RLock() + deliverUntil := s.deliverUntil + s.mutex.RUnlock() return deliverUntil } @@ -346,28 +333,28 @@ func overlaps(sl1 []string, sl2 []string) bool { // Queue is given a request to evaluate and optionally enqueue in the list // of messages to deliver. The request is checked to see if it matches the // criteria before being accepted or silently dropped. -func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { - obs.mutex.RLock() - deliverUntil := obs.deliverUntil - dropUntil := obs.dropUntil - events := obs.events - matcher := obs.matcher - obs.mutex.RUnlock() +func (s *SinkSender) Queue(msg *wrp.Message) { + s.mutex.RLock() + deliverUntil := s.deliverUntil + dropUntil := s.dropUntil + events := s.events + matcher := s.matcher + s.mutex.RUnlock() now := time.Now() - if !obs.isValidTimeWindow(now, dropUntil, deliverUntil) { - obs.logger.Debug("invalid time window for event", zap.Any("now", now), zap.Any("dropUntil", dropUntil), zap.Any("deliverUntil", deliverUntil)) + if !s.isValidTimeWindow(now, dropUntil, deliverUntil) { + s.logger.Debug("invalid time window for event", zap.Any("now", now), zap.Any("dropUntil", dropUntil), zap.Any("deliverUntil", deliverUntil)) return } //check the partnerIDs - if !obs.disablePartnerIDs { + if !s.disablePartnerIDs { if len(msg.PartnerIDs) == 0 { - msg.PartnerIDs = obs.customPIDs + msg.PartnerIDs = s.customPIDs } - // if !overlaps(obs.listener.PartnerIDs, msg.PartnerIDs) { - // obs.logger.Debug("parter id check failed", zap.Strings("webhook.partnerIDs", obs.listener.PartnerIDs), zap.Strings("event.partnerIDs", msg.PartnerIDs)) + // if !overlaps(s.listener.PartnerIDs, msg.PartnerIDs) { + // s.logger.Debug("parter id check failed", zap.Strings("webhook.partnerIDs", s.listener.PartnerIDs), zap.Strings("event.partnerIDs", msg.PartnerIDs)) // return // } } @@ -383,7 +370,7 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { } } if !matchEvent { - obs.logger.Debug("destination regex doesn't match", zap.String("event.dest", msg.Destination)) + s.logger.Debug("destination regex doesn't match", zap.String("event.dest", msg.Destination)) return } @@ -398,31 +385,31 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { } if !matchDevice { - obs.logger.Debug("device regex doesn't match", zap.String("event.source", msg.Source)) + s.logger.Debug("device regex doesn't match", zap.String("event.source", msg.Source)) return } select { - case obs.queue.Load().(chan *wrp.Message) <- msg: - obs.queueDepthGauge.Add(1.0) - obs.logger.Debug("event added to outbound queue", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + case s.queue.Load().(chan *wrp.Message) <- msg: + s.queueDepthGauge.Add(1.0) + s.logger.Debug("event added to outbound queue", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) default: - obs.logger.Debug("queue full. event dropped", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) - obs.queueOverflow() - obs.droppedQueueFullCounter.Add(1.0) + s.logger.Debug("queue full. event dropped", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + s.queueOverflow() + s.droppedQueueFullCounter.Add(1.0) } } -func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { +func (s *SinkSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { if !now.After(dropUntil) { // client was cut off - obs.droppedCutoffCounter.Add(1.0) + s.droppedCutoffCounter.Add(1.0) return false } if !now.Before(deliverUntil) { // outside delivery window - obs.droppedExpiredBeforeQueueCounter.Add(1.0) + s.droppedExpiredBeforeQueueCounter.Add(1.0) return false } @@ -433,15 +420,15 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti // a fresh one, counting any current messages in the queue as dropped. // It should never close a queue, as a queue not referenced anywhere will be // cleaned up by the garbage collector without needing to be closed. -func (obs *CaduceusOutboundSender) Empty(droppedCounter prometheus.Counter) { - droppedMsgs := obs.queue.Load().(chan *wrp.Message) - obs.queue.Store(make(chan *wrp.Message, obs.queueSize)) +func (s *SinkSender) Empty(droppedCounter prometheus.Counter) { + droppedMsgs := s.queue.Load().(chan *wrp.Message) + s.queue.Store(make(chan *wrp.Message, s.queueSize)) droppedCounter.Add(float64(len(droppedMsgs))) - obs.queueDepthGauge.Set(0.0) + s.queueDepthGauge.Set(0.0) } -func (obs *CaduceusOutboundSender) dispatcher() { - defer obs.wg.Done() +func (s *SinkSender) dispatcher() { + defer s.wg.Done() var ( msg *wrp.Message urls *ring.Ring @@ -453,7 +440,7 @@ Loop: for { // Always pull a new queue in case we have been cutoff or are shutting // down. - msgQueue := obs.queue.Load().(chan *wrp.Message) + msgQueue := s.queue.Load().(chan *wrp.Message) // nolint:gosimple select { // The dispatcher cannot get stuck blocking here forever (caused by an @@ -480,50 +467,50 @@ Loop: if !ok { break Loop } - obs.queueDepthGauge.Add(-1.0) - obs.mutex.RLock() - urls = obs.urls + s.queueDepthGauge.Add(-1.0) + s.mutex.RLock() + urls = s.urls // Move to the next URL to try 1st the next time. // This is okay because we run a single dispatcher and it's the // only one updating this field. - obs.urls = obs.urls.Next() - deliverUntil := obs.deliverUntil - dropUntil := obs.dropUntil - // secret = obs.listener.Webhook.Config.Secret - // accept = obs.listener.Webhook.Config.ContentType - obs.mutex.RUnlock() + s.urls = s.urls.Next() + deliverUntil := s.deliverUntil + dropUntil := s.dropUntil + // secret = s.listener.Webhook.Config.Secret + // accept = s.listener.Webhook.Config.ContentType + s.mutex.RUnlock() now := time.Now() if now.Before(dropUntil) { - obs.droppedCutoffCounter.Add(1.0) + s.droppedCutoffCounter.Add(1.0) continue } if now.After(deliverUntil) { - obs.Empty(obs.droppedExpiredCounter) + s.Empty(s.droppedExpiredCounter) continue } - obs.workers.Acquire() - obs.currentWorkersGauge.Add(1.0) + s.workers.Acquire() + s.currentWorkersGauge.Add(1.0) - go obs.send(urls, secret, accept, msg) + go s.send(urls, secret, accept, msg) } } - for i := 0; i < obs.maxWorkers; i++ { - obs.workers.Acquire() + for i := 0; i < s.maxWorkers; i++ { + s.workers.Acquire() } } // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa -func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType string, msg *wrp.Message) { +func (s *SinkSender) send(urls *ring.Ring, secret, acceptType string, msg *wrp.Message) { defer func() { if r := recover(); nil != r { - obs.droppedPanic.Add(1.0) - obs.logger.Error("goroutine send() panicked", zap.String("id", obs.id), zap.Any("panic", r)) + s.droppedPanic.Add(1.0) + s.logger.Error("goroutine send() panicked", zap.String("id", s.id), zap.Any("panic", r)) } - obs.workers.Release() - obs.currentWorkersGauge.Add(-1.0) + s.workers.Release() + s.currentWorkersGauge.Add(-1.0) }() payload := msg.Payload @@ -547,8 +534,8 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri req, err := http.NewRequest("POST", urls.Value.(string), payloadReader) if nil != err { // Report drop - obs.droppedInvalidConfig.Add(1.0) - obs.logger.Error("Invalid URL", zap.String("url", urls.Value.(string)), zap.String("id", obs.id), zap.Error(err)) + s.droppedInvalidConfig.Add(1.0) + s.logger.Error("Invalid URL", zap.String("url", urls.Value.(string)), zap.String("id", s.id), zap.Error(err)) return } @@ -578,28 +565,21 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri // find the event "short name" event := msg.FindEventStringSubMatch() - /*TODO: need middleware for: - Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event) - Logger - Update Request - */ - retryConfig := retry.Config{ - Retries: obs.deliveryRetries, - Interval: obs.deliveryInterval, - } - // Send it - obs.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) - - client := retry.New(retryConfig, obs.clientMiddleware(obs.sender)) + s.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + client, _ := retryhttp.NewClient( + retryhttp.WithHTTPClient(s.clientMiddleware(s.client)), + retryhttp.WithRunner(s.addRunner(req, event)), + retryhttp.WithRequesters(s.updateRequest(urls)), + ) resp, err := client.Do(req) code := "failure" - l := obs.logger + l := s.logger if nil != err { // Report failure - obs.droppedNetworkErrCounter.Add(1.0) - l = obs.logger.With(zap.Error(err)) + s.droppedNetworkErrCounter.Add(1.0) + l = s.logger.With(zap.Error(err)) } else { // Report Result code = strconv.Itoa(resp.StatusCode) @@ -611,37 +591,37 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri resp.Body.Close() } } - obs.deliveryCounter.With(prometheus.Labels{UrlLabel: obs.id, CodeLabel: code, EventLabel: event}).Add(1.0) + s.deliveryCounter.With(prometheus.Labels{UrlLabel: s.id, CodeLabel: code, EventLabel: event}).Add(1.0) l.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String("code", code), zap.String("url", req.URL.String())) } // queueOverflow handles the logic of what to do when a queue overflows: // cutting off the webhook for a time and sending a cut off notification // to the failure URL. -func (obs *CaduceusOutboundSender) queueOverflow() { - obs.mutex.Lock() - if time.Now().Before(obs.dropUntil) { - obs.mutex.Unlock() +func (s *SinkSender) queueOverflow() { + s.mutex.Lock() + if time.Now().Before(s.dropUntil) { + s.mutex.Unlock() return } - obs.dropUntil = time.Now().Add(obs.cutOffPeriod) - obs.dropUntilGauge.Set(float64(obs.dropUntil.Unix())) - // secret := obs.listener.Webhook.Config.Secret + s.dropUntil = time.Now().Add(s.cutOffPeriod) + s.dropUntilGauge.Set(float64(s.dropUntil.Unix())) + // secret := s.listener.Webhook.Config.Secret secret := "placeholderSecret" - failureMsg := obs.failureMsg - // failureURL := obs.listener.Webhook.FailureURL + failureMsg := s.failureMsg + // failureURL := s.listener.Webhook.FailureURL failureURL := "placeholderURL" - obs.mutex.Unlock() + s.mutex.Unlock() - obs.cutOffCounter.Add(1.0) + s.cutOffCounter.Add(1.0) // We empty the queue but don't close the channel, because we're not // shutting down. - obs.Empty(obs.droppedCutoffCounter) + s.Empty(s.droppedCutoffCounter) msg, err := json.Marshal(failureMsg) if nil != err { - obs.logger.Error("Cut-off notification json.Marshal failed", zap.Any("failureMessage", obs.failureMsg), zap.String("for", obs.id), zap.Error(err)) + s.logger.Error("Cut-off notification json.Marshal failed", zap.Any("failureMessage", s.failureMsg), zap.String("for", s.id), zap.Error(err)) return } @@ -655,8 +635,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() { req, err := http.NewRequest("POST", failureURL, payload) if nil != err { // Failure - obs.logger.Error("Unable to send cut-off notification", zap.String("notification", - failureURL), zap.String("for", obs.id), zap.Error(err)) + s.logger.Error("Unable to send cut-off notification", zap.String("notification", + failureURL), zap.String("for", s.id), zap.Error(err)) return } req.Header.Set("Content-Type", wrp.MimeTypeJson) @@ -668,16 +648,16 @@ func (obs *CaduceusOutboundSender) queueOverflow() { req.Header.Set("X-Webpa-Signature", sig) } - resp, err := obs.sender.Do(req) + resp, err := s.client.Do(req) if nil != err { // Failure - obs.logger.Error("Unable to send cut-off notification", zap.String("notification", failureURL), zap.String("for", obs.id), zap.Error(err)) + s.logger.Error("Unable to send cut-off notification", zap.String("notification", failureURL), zap.String("for", s.id), zap.Error(err)) return } if nil == resp { // Failure - obs.logger.Error("Unable to send cut-off notification, nil response", zap.String("notification", failureURL)) + s.logger.Error("Unable to send cut-off notification, nil response", zap.String("notification", failureURL)) return } @@ -690,8 +670,45 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } } -func CreateOutbounderMetrics(m OutboundSenderMetrics, c *CaduceusOutboundSender) { - c.deliveryRetryCounter = m.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: c.id}) +func (s *SinkSender) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { + //TODO: need to handle error + runner, _ := retry.NewRunner[*http.Response]( + retry.WithPolicyFactory[*http.Response](retry.Config{ + Interval: s.deliveryInterval, + MaxRetries: s.deliveryRetries, + }), + retry.WithOnAttempt[*http.Response](s.onAttempt(request, event)), + ) + return runner + +} + +func (s *SinkSender) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { + return func(request *http.Request) *http.Request { + urls = urls.Next() + tmp, err := url.Parse(urls.Value.(string)) + if err != nil { + s.logger.Error("failed to update url", zap.String(UrlLabel, urls.Value.(string)), zap.Error(err)) + } + request.URL = tmp + return request + } +} + +func (s *SinkSender) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { + + return func(attempt retry.Attempt[*http.Response]) { + if attempt.Retries > 0 { + s.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: s.id, EventLabel: event}).Add(1.0) + s.logger.Debug("retrying HTTP transaction", zap.String("url", request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + } + + } + +} + +func CreateOutbounderMetrics(m SinkSenderMetrics, c *SinkSender) { + c.deliveryRetryCounter = m.DeliveryRetryCounter c.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{UrlLabel: c.id}) c.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{UrlLabel: c.id}) c.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: "queue_full"}) diff --git a/outboundSender_test.go b/sinkSender_test.go similarity index 100% rename from outboundSender_test.go rename to sinkSender_test.go diff --git a/sinkWrapper.go b/sinkWrapper.go new file mode 100644 index 00000000..a8676bab --- /dev/null +++ b/sinkWrapper.go @@ -0,0 +1,240 @@ +// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package main + +import ( + "crypto/tls" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/candlelight" + "github.com/xmidt-org/wrp-go/v3" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.uber.org/fx" + "go.uber.org/zap" +) + +// WrapperIn configures the Wrapper for creation +type SinkWrapperIn struct { + fx.In + + Tracing candlelight.Tracing + SinkConfig SinkConfig + SenderMetrics SinkSenderMetrics + EventType *prometheus.CounterVec + Logger *zap.Logger +} + +// SinkWrapper interface is needed for unit testing. +type Wrapper interface { + // Update([]ancla.InternalWebhook) + Queue(*wrp.Message) + Shutdown(bool) +} + +// Wrapper contains the configuration that will be shared with each outbound sender. It contains no external parameters. +type SinkWrapper struct { + // The amount of time to let expired SinkSenders linger before + // shutting them down and cleaning up the resources associated with them. + linger time.Duration + + // The logger implementation to share with sinkSenders. + logger *zap.Logger + + //the configuration needed for eash sinkSender + config SinkConfig + + mutex *sync.RWMutex + senders map[string]Sender + eventType *prometheus.CounterVec + wg sync.WaitGroup + shutdown chan struct{} + metrics SinkSenderMetrics + client Client //TODO: keeping here for now - but might move to SinkSender in a later PR + clientMiddleware func(Client) Client //TODO: keeping here for now - but might move to SinkSender in a later PR + +} + +func ProvideWrapper() fx.Option { + return fx.Provide( + func(in SenderMetricsIn) SinkSenderMetrics { + senderMetrics := SinkSenderMetrics{ + DeliveryCounter: in.DeliveryCounter, + DeliveryRetryCounter: in.DeliveryRetryCounter, + DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, + CutOffCounter: in.CutOffCounter, + SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter, + DropsDueToPanic: in.DropsDueToPanic, + ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge, + ConsumerDropUntilGauge: in.ConsumerDropUntilGauge, + ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge, + ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge, + OutgoingQueueDepth: in.OutgoingQueueDepth, + ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, + QueryLatency: in.QueryLatency, + } + return senderMetrics + }, + func(in SinkWrapperIn) (*SinkWrapper, error) { + csw, err := NewSinkWrapper(in) + return csw, err + }, + ) +} + +func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { + sw = &SinkWrapper{ + linger: in.SinkConfig.Linger, + logger: in.Logger, + eventType: in.EventType, + config: in.SinkConfig, + metrics: in.SenderMetrics, + } + + if in.SinkConfig.Linger <= 0 { + linger := fmt.Sprintf("linger not positive: %v", in.SinkConfig.Linger) + err = errors.New(linger) + sw = nil + return + } + sw.senders = make(map[string]Sender) + sw.shutdown = make(chan struct{}) + + sw.wg.Add(1) + go undertaker(sw) + + return +} + +// no longer being initialized at start up - needs to be initialized by the creation of the outbound sender +func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, + MaxIdleConnsPerHost: config.NumWorkersPerSender, + ResponseHeaderTimeout: config.ResponseHeaderTimeout, + IdleConnTimeout: config.IdleConnTimeout, + } + + tr = otelhttp.NewTransport(tr, + otelhttp.WithPropagators(tracing.Propagator()), + otelhttp.WithTracerProvider(tracing.TracerProvider()), + ) + return +} + +// Commenting out while until ancla/argus dependency issue is fixed. +// Update is called when we get changes to our webhook listeners with either +// additions, or updates. This code takes care of building new OutboundSenders +// and maintaining the existing OutboundSenders. +func (sw *SinkWrapper) Update(list []ListenerStub) { + + ids := make([]struct { + Listener ListenerStub + ID string + }, len(list)) + + for i, v := range list { + ids[i].Listener = v + ids[i].ID = v.Webhook.Config.ReceiverURL + } + + sw.mutex.Lock() + defer sw.mutex.Unlock() + + for _, inValue := range ids { + sender, ok := sw.senders[inValue.ID] + if !ok { + // osf.Sender = sw.sender + listener := inValue.Listener + metricWrapper, err := newMetricWrapper(time.Now, sw.metrics.QueryLatency, inValue.ID) + + if err != nil { + continue + } + sw.clientMiddleware = metricWrapper.roundTripper + ss, err := newSinkSender(sw, listener) + if nil == err { + sw.senders[inValue.ID] = ss + } + continue + } + fmt.Println(sender) + // sender.Update(inValue.Listener) //commenting out until argus/ancla fix + } +} + +// Queue is used to send all the possible outbound senders a request. This +// function performs the fan-out and filtering to multiple possible endpoints. +func (sw *SinkWrapper) Queue(msg *wrp.Message) { + sw.mutex.RLock() + defer sw.mutex.RUnlock() + + sw.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1) + + for _, v := range sw.senders { + v.Queue(msg) + } +} + +// Shutdown closes down the delivery mechanisms and cleans up the underlying +// OutboundSenders either gently (waiting for delivery queues to empty) or not +// (dropping enqueued messages) +func (sw *SinkWrapper) Shutdown(gentle bool) { + sw.mutex.Lock() + defer sw.mutex.Unlock() + for k, v := range sw.senders { + v.Shutdown(gentle) + delete(sw.senders, k) + } + close(sw.shutdown) +} + +// undertaker looks at the OutboundSenders periodically and prunes the ones +// that have been retired for too long, freeing up resources. +func undertaker(sw *SinkWrapper) { + defer sw.wg.Done() + // Collecting unused OutboundSenders isn't a huge priority, so do it + // slowly. + ticker := time.NewTicker(2 * sw.linger) + for { + select { + case <-ticker.C: + threshold := time.Now().Add(-1 * sw.linger) + + // Actually shutting these down could take longer then we + // want to lock the mutex, so just remove them from the active + // list & shut them down afterwards. + deadList := createDeadlist(sw, threshold) + + // Shut them down + for _, v := range deadList { + v.Shutdown(false) + } + case <-sw.shutdown: + ticker.Stop() + return + } + } +} + +func createDeadlist(sw *SinkWrapper, threshold time.Time) map[string]Sender { + if sw == nil || threshold.IsZero() { + return nil + } + + deadList := make(map[string]Sender) + sw.mutex.Lock() + defer sw.mutex.Unlock() + for k, v := range sw.senders { + retired := v.RetiredSince() + if threshold.After(retired) { + deadList[k] = v + delete(sw.senders, k) + } + } + return deadList +} diff --git a/senderWrapper_test.go b/sinkWrapper_test.go similarity index 100% rename from senderWrapper_test.go rename to sinkWrapper_test.go