diff --git a/main.go b/caduceus.go similarity index 92% rename from main.go rename to caduceus.go index f8f08ed8..bdaa5431 100644 --- a/main.go +++ b/caduceus.go @@ -1,11 +1,10 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "fmt" "os" - "runtime/debug" "github.com/alecthomas/kong" "github.com/goschtalt/goschtalt" @@ -14,6 +13,10 @@ import ( _ "github.com/goschtalt/yaml-decoder" _ "github.com/goschtalt/yaml-encoder" "github.com/xmidt-org/arrange/arrangehttp" + "github.com/xmidt-org/caduceus/internal/handler" + "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/caduceus/internal/sink" + "github.com/xmidt-org/candlelight" "github.com/xmidt-org/sallust" "github.com/xmidt-org/touchstone" @@ -47,7 +50,7 @@ type CLI struct { // Provides a named type so it's a bit easier to flow through & use in fx. type cliArgs []string -func caduceus(arguments []string, run bool) error { +func Caduceus(arguments []string, run bool) error { var ( gscfg *goschtalt.Config @@ -74,7 +77,7 @@ func caduceus(arguments []string, run bool) error { goschtalt.UnmarshalFunc[sallust.Config]("logging"), goschtalt.UnmarshalFunc[candlelight.Config]("tracing"), goschtalt.UnmarshalFunc[touchstone.Config]("prometheus"), - goschtalt.UnmarshalFunc[SinkConfig]("sender"), + goschtalt.UnmarshalFunc[sink.Config]("sender"), goschtalt.UnmarshalFunc[Service]("service"), goschtalt.UnmarshalFunc[[]string]("authHeader"), goschtalt.UnmarshalFunc[bool]("previousVersionSupport"), @@ -135,11 +138,11 @@ func caduceus(arguments []string, run bool) error { arrangehttp.ProvideServer("servers.primary"), arrangehttp.ProvideServer("servers.alternate"), - ProvideHandler(), - ProvideWrapper(), + handler.Provide(), + sink.Provide(), touchstone.Provide(), touchhttp.Provide(), - ProvideMetrics(), + metrics.Provide(), // ancla.ProvideMetrics(), //TODO: need to add back in once we fix the ancla/argus dependency issue ) @@ -211,20 +214,3 @@ func provideCLIWithOpts(args cliArgs, testOpts bool) (*CLI, error) { return &cli, nil } - -func main() { - defer func() { - if r := recover(); r != nil { - fmt.Println("stacktrace from panic: \n" + string(debug.Stack())) - } - }() - - err := caduceus(os.Args[1:], true) - - if err == nil { - return - } - - fmt.Fprintln(os.Stderr, err) - os.Exit(-1) -} diff --git a/main_test.go b/caduceus_test.go similarity index 96% rename from main_test.go rename to caduceus_test.go index 2e138e31..3d61631b 100644 --- a/main_test.go +++ b/caduceus_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "testing" @@ -86,10 +86,10 @@ func Test_caduceus(t *testing.T) { if tc.panic { assert.Panics(func() { - _ = caduceus(tc.args, false) + _ = Caduceus(tc.args, false) }) } else { - err := caduceus(tc.args, false) + err := Caduceus(tc.args, false) assert.ErrorIs(err, tc.expectedErr) } diff --git a/caduceus_type_test.go b/caduceus_type_test.go deleted file mode 100644 index 73ea159a..00000000 --- a/caduceus_type_test.go +++ /dev/null @@ -1,30 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 -package main - -import ( - "testing" - - "github.com/stretchr/testify/mock" - "go.uber.org/zap/zaptest" - - "github.com/xmidt-org/wrp-go/v3" -) - -func TestCaduceusHandler(t *testing.T) { - logger := zaptest.NewLogger(t) - - fakeSenderWrapper := new(mockSenderWrapper) - fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once() - - testHandler := CaduceusHandler{ - wrapper: fakeSenderWrapper, - Logger: logger, - } - - t.Run("TestHandleRequest", func(t *testing.T) { - testHandler.HandleRequest(0, &wrp.Message{}) - - fakeSenderWrapper.AssertExpectations(t) - }) -} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 00000000..47ea8c3e --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,29 @@ +// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: LicenseRef-COMCAST + +package main + +import ( + "fmt" + "os" + "runtime/debug" + + "github.com/xmidt-org/caduceus" +) + +func main() { + defer func() { + if r := recover(); r != nil { + fmt.Println("stacktrace from panic: \n" + string(debug.Stack())) + } + }() + + err := caduceus.Caduceus(os.Args[1:], true) + + if err == nil { + return + } + + fmt.Fprintln(os.Stderr, err) + os.Exit(-1) +} diff --git a/config.go b/config.go index 97d46c57..91ca4429 100644 --- a/config.go +++ b/config.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "fmt" @@ -11,7 +11,10 @@ import ( "github.com/goschtalt/goschtalt" "github.com/xmidt-org/arrange/arrangehttp" "github.com/xmidt-org/arrange/arrangepprof" + "github.com/xmidt-org/bascule" + "github.com/xmidt-org/caduceus/internal/sink" "github.com/xmidt-org/candlelight" + "github.com/xmidt-org/clortho" "github.com/xmidt-org/sallust" "github.com/xmidt-org/touchstone" "gopkg.in/dealancer/validate.v2" @@ -26,7 +29,7 @@ type Config struct { Servers Servers ArgusClientTimeout HttpClientTimeout JWTValidator JWTValidator - Sink SinkConfig + SinkConfig sink.Config Service Service AuthHeader []string Server string @@ -114,6 +117,16 @@ type MetricsOption struct { Subsystem string } +// JWTValidator provides a convenient way to define jwt validator through config files +type JWTValidator struct { + // Config is used to create the clortho Resolver & Refresher for JWT verification keys + Config clortho.Config `json:"config"` + + // Leeway is used to set the amount of time buffer should be given to JWT + // time values, such as nbf + Leeway bascule.Leeway +} + // Collect and process the configuration files and env vars and // produce a configuration object. func provideConfig(cli *CLI) (*goschtalt.Config, error) { diff --git a/client.go b/internal/client/client.go similarity index 91% rename from client.go rename to internal/client/client.go index c6a6b6be..ac9b03df 100644 --- a/client.go +++ b/internal/client/client.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package client import "net/http" @@ -10,7 +10,7 @@ type Client interface { Do(*http.Request) (*http.Response, error) } -func nopClient(next Client) Client { +func NopClient(next Client) Client { return next } diff --git a/httpClient.go b/internal/client/httpClient.go similarity index 74% rename from httpClient.go rename to internal/client/httpClient.go index 0f6bccb2..43d02d6b 100644 --- a/httpClient.go +++ b/internal/client/httpClient.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package client import ( "errors" @@ -10,6 +10,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/caduceus/internal/metrics" ) var ( @@ -22,7 +23,7 @@ type metricWrapper struct { id string } -func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { +func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { if now == nil { now = time.Now } @@ -36,12 +37,12 @@ func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, }, nil } -func (m *metricWrapper) roundTripper(next Client) Client { +func (m *metricWrapper) RoundTripper(next Client) Client { return doerFunc(func(req *http.Request) (*http.Response, error) { startTime := m.now() resp, err := next.Do(req) endTime := m.now() - code := networkError + code := metrics.NetworkError if err == nil { code = strconv.Itoa(resp.StatusCode) @@ -49,7 +50,7 @@ func (m *metricWrapper) roundTripper(next Client) Client { // find time difference, add to metric var latency = endTime.Sub(startTime) - m.queryLatency.With(prometheus.Labels{UrlLabel: m.id, CodeLabel: code}).Observe(latency.Seconds()) + m.queryLatency.With(prometheus.Labels{metrics.UrlLabel: m.id, metrics.CodeLabel: code}).Observe(latency.Seconds()) return resp, err }) diff --git a/httpClient_test.go b/internal/client/httpClient_test.go similarity index 99% rename from httpClient_test.go rename to internal/client/httpClient_test.go index 9802023c..ce8e9b8e 100644 --- a/httpClient_test.go +++ b/internal/client/httpClient_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package client // import ( // "errors" diff --git a/http.go b/internal/handler/http.go similarity index 70% rename from http.go rename to internal/handler/http.go index 3712b9af..13621c9e 100644 --- a/http.go +++ b/internal/handler/http.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package handler import ( "io" @@ -13,16 +13,20 @@ import ( "github.com/prometheus/client_golang/prometheus" uuid "github.com/satori/go.uuid" - + "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/caduceus/internal/sink" "github.com/xmidt-org/sallust" "github.com/xmidt-org/wrp-go/v3" ) +type Handler interface { + ServeHTTP(http.ResponseWriter, *http.Request) +} type ServerHandlerIn struct { fx.In - SinkWrapper *SinkWrapper + SinkWrapper sink.Wrapper Logger *zap.Logger - Telemetry *HandlerTelemetry + Telemetry *Telemetry } type ServerHandlerOut struct { @@ -32,13 +36,14 @@ type ServerHandlerOut struct { // Below is the struct that will implement our ServeHTTP method type ServerHandler struct { - caduceusHandler RequestHandler - telemetry *HandlerTelemetry + sinkWrapper sink.Wrapper + logger *zap.Logger + telemetry *Telemetry incomingQueueDepth int64 maxOutstanding int64 now func() time.Time } -type HandlerTelemetryIn struct { +type TelemetryIn struct { fx.In ErrorRequests prometheus.Counter `name:"error_request_body_count"` EmptyRequests prometheus.Counter `name:"empty_request_body_count"` @@ -47,17 +52,17 @@ type HandlerTelemetryIn struct { ModifiedWRPCount *prometheus.CounterVec `name:"modified_wrp_count"` IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"` } -type HandlerTelemetry struct { - errorRequests prometheus.Counter - emptyRequests prometheus.Counter - invalidCount prometheus.Counter - incomingQueueDepthMetric prometheus.Gauge - modifiedWRPCount *prometheus.CounterVec - incomingQueueLatency prometheus.ObserverVec +type Telemetry struct { + ErrorRequests prometheus.Counter + EmptyRequests prometheus.Counter + InvalidCount prometheus.Counter + IncomingQueueDepthMetric prometheus.Gauge + ModifiedWRPCount *prometheus.CounterVec + IncomingQueueLatency prometheus.ObserverVec } func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { - eventType := unknownEventType + eventType := metrics.UnknownEventType logger := sallust.Get(request.Context()) // find time difference, add to metric after function finishes defer func(s time.Time) { @@ -85,19 +90,19 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R return } - sh.telemetry.incomingQueueDepthMetric.Add(1.0) - defer sh.telemetry.incomingQueueDepthMetric.Add(-1.0) + sh.telemetry.IncomingQueueDepthMetric.Add(1.0) + defer sh.telemetry.IncomingQueueDepthMetric.Add(-1.0) payload, err := io.ReadAll(request.Body) if err != nil { - sh.telemetry.errorRequests.Add(1.0) + sh.telemetry.ErrorRequests.Add(1.0) logger.Error("Unable to retrieve the request body.", zap.Error(err)) response.WriteHeader(http.StatusBadRequest) return } if len(payload) == 0 { - sh.telemetry.emptyRequests.Add(1.0) + sh.telemetry.EmptyRequests.Add(1.0) logger.Error("Empty payload.") response.WriteHeader(http.StatusBadRequest) response.Write([]byte("Empty payload.\n")) @@ -110,7 +115,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R err = decoder.Decode(msg) if err != nil || msg.MessageType() != 4 { // return a 400 - sh.telemetry.invalidCount.Add(1.0) + sh.telemetry.InvalidCount.Add(1.0) response.WriteHeader(http.StatusBadRequest) if err != nil { response.Write([]byte("Invalid payload format.\n")) @@ -125,7 +130,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R err = wrp.UTF8(msg) if err != nil { // return a 400 - sh.telemetry.invalidCount.Add(1.0) + sh.telemetry.InvalidCount.Add(1.0) response.WriteHeader(http.StatusBadRequest) response.Write([]byte("Strings must be UTF-8.\n")) logger.Debug("Strings must be UTF-8.") @@ -133,7 +138,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R } eventType = msg.FindEventStringSubMatch() - sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg)) + sh.handleRequest(sh.fixWrp(msg)) // return a 202 response.WriteHeader(http.StatusAccepted) @@ -142,9 +147,14 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R logger.Debug("event passed to senders.", zap.Any("event", msg)) } +func (sh *ServerHandler) handleRequest(msg *wrp.Message) { + sh.logger.Info("Worker received a request, now passing to sender") + sh.sinkWrapper.Queue(msg) +} + func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() - sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) + sh.telemetry.IncomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) } func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { @@ -155,36 +165,36 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { // use the one the source specified. if msg.ContentType == "" { msg.ContentType = wrp.MimeTypeJson - reason = emptyContentTypeReason + reason = metrics.EmptyContentTypeReason } // Ensure there is a transaction id even if we make one up if msg.TransactionUUID == "" { msg.TransactionUUID = uuid.NewV4().String() if reason == "" { - reason = emptyUUIDReason + reason = metrics.EmptyUUIDReason } else { - reason = bothEmptyReason + reason = metrics.BothEmptyReason } } if reason != "" { - sh.telemetry.modifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0) + sh.telemetry.ModifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0) } return msg } -func ProvideHandler() fx.Option { +func Provide() fx.Option { return fx.Provide( - func(in HandlerTelemetryIn) *HandlerTelemetry { - return &HandlerTelemetry{ - errorRequests: in.ErrorRequests, - emptyRequests: in.EmptyRequests, - invalidCount: in.InvalidCount, - incomingQueueDepthMetric: in.IncomingQueueDepthMetric, - modifiedWRPCount: in.ModifiedWRPCount, - incomingQueueLatency: in.IncomingQueueLatency, + func(in TelemetryIn) *Telemetry { + return &Telemetry{ + ErrorRequests: in.ErrorRequests, + EmptyRequests: in.EmptyRequests, + InvalidCount: in.InvalidCount, + IncomingQueueDepthMetric: in.IncomingQueueDepthMetric, + ModifiedWRPCount: in.ModifiedWRPCount, + IncomingQueueLatency: in.IncomingQueueLatency, } }, func(in ServerHandlerIn) (ServerHandlerOut, error) { @@ -196,12 +206,10 @@ func ProvideHandler() fx.Option { }, ) } -func New(sw *SinkWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { +func New(w sink.Wrapper, logger *zap.Logger, t *Telemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { return &ServerHandler{ - caduceusHandler: &CaduceusHandler{ - wrapper: sw, - Logger: log, - }, + sinkWrapper: w, + logger: logger, telemetry: t, maxOutstanding: maxOutstanding, incomingQueueDepth: incomingQueueDepth, diff --git a/http_test.go b/internal/handler/http_test.go similarity index 66% rename from http_test.go rename to internal/handler/http_test.go index ac91c1c2..901db7ee 100644 --- a/http_test.go +++ b/internal/handler/http_test.go @@ -1,6 +1,6 @@ // // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // // SPDX-License-Identifier: Apache-2.0 -package main +package handler_test import ( "bytes" @@ -13,6 +13,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/xmidt-org/caduceus/internal/handler" + "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/caduceus/internal/mocks" "go.uber.org/zap/zaptest" "github.com/xmidt-org/wrp-go/v3" @@ -79,7 +82,7 @@ func TestServerHandler(t *testing.T) { expectedResponse: http.StatusBadRequest, request: exampleRequest(1), throwStatusBadRequest: true, - expectedEventType: unknownEventType, + expectedEventType: metrics.UnknownEventType, startTime: date1, endTime: date2, }, @@ -87,45 +90,47 @@ func TestServerHandler(t *testing.T) { for _, tc := range tcs { assert := assert.New(t) - logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + + fakeHandler := new(mocks.Handler) if !tc.throwStatusBadRequest { fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).Return().Times(1) } - fakeEmptyRequests := new(mockCounter) - fakeErrorRequests := new(mockCounter) - fakeInvalidCount := new(mockCounter) - fakeQueueDepth := new(mockGauge) + fakeEmptyRequests := new(mocks.Counter) + fakeErrorRequests := new(mocks.Counter) + fakeInvalidCount := new(mocks.Counter) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) if tc.throwStatusBadRequest { fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() } - fakeTime := mockTime(tc.startTime, tc.endTime) - fakeHist := new(mockHistogram) + fakeTime := mocks.Time(tc.startTime, tc.endTime) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", tc.expectedEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, - errorRequests: fakeErrorRequests, - emptyRequests: fakeEmptyRequests, - invalidCount: fakeInvalidCount, - incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, - incomingQueueLatency: fakeHist, - now: fakeTime, + fakeTel := &handler.Telemetry{ + ErrorRequests: fakeErrorRequests, + EmptyRequests: fakeEmptyRequests, + InvalidCount: fakeInvalidCount, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = fakeTime + t.Run(tc.desc, func(t *testing.T) { w := httptest.NewRecorder() - serverWrapper.ServeHTTP(w, tc.request) + fakeHandler.ServeHTTP(w, tc.request) resp := w.Result() assert.Equal(tc.expectedResponse, resp.StatusCode) @@ -144,50 +149,50 @@ func TestServerHandlerFixWrp(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) - logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).Return().Once() - fakeEmptyRequests := new(mockCounter) - fakeErrorRequests := new(mockCounter) - fakeInvalidCount := new(mockCounter) - fakeQueueDepth := new(mockGauge) + fakeEmptyRequests := new(mocks.Counter) + fakeErrorRequests := new(mocks.Counter) + fakeInvalidCount := new(mocks.Counter) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) - fakeIncomingContentTypeCount := new(mockCounter) + fakeIncomingContentTypeCount := new(mocks.Counter) fakeIncomingContentTypeCount.On("With", []string{"content_type", wrp.MimeTypeMsgpack}).Return(fakeIncomingContentTypeCount) fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount) fakeIncomingContentTypeCount.On("Add", 1.0).Return() - fakeModifiedWRPCount := new(mockCounter) - fakeModifiedWRPCount.On("With", []string{"reason", bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() + fakeModifiedWRPCount := new(mocks.Counter) + fakeModifiedWRPCount.On("With", []string{"reason", metrics.BothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() fakeModifiedWRPCount.On("Add", 1.0).Return().Once() - fakeHist := new(mockHistogram) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", "bob"} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, - errorRequests: fakeErrorRequests, - emptyRequests: fakeEmptyRequests, - invalidCount: fakeInvalidCount, - modifiedWRPCount: fakeModifiedWRPCount, - incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, - incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + fakeTel := &handler.Telemetry{ + ErrorRequests: fakeErrorRequests, + EmptyRequests: fakeEmptyRequests, + InvalidCount: fakeInvalidCount, + IncomingQueueDepthMetric: fakeQueueDepth, + ModifiedWRPCount: fakeModifiedWRPCount, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) t.Run("TestServeHTTPHappyPath", func(t *testing.T) { w := httptest.NewRecorder() - serverWrapper.ServeHTTP(w, exampleRequest(4, "", "")) + fakeHandler.ServeHTTP(w, exampleRequest(4, "", "")) resp := w.Result() assert.Equal(http.StatusAccepted, resp.StatusCode) @@ -205,38 +210,39 @@ func TestServerHandlerFull(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) - logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + fakeHist := new(mocks.Histogram) + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, - incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, - incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + fakeTel := &handler.Telemetry{ + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Act like we have 1 in flight */ - serverWrapper.incomingQueueDepth = 1 + fakeHandler.IncomingQueueDepth = 1 /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, exampleRequest(4)) + fakeHandler.ServeHTTP(w, exampleRequest(4)) resp := w.Result() assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) @@ -253,43 +259,43 @@ func TestServerEmptyPayload(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) var buffer bytes.Buffer r := bytes.NewReader(buffer.Bytes()) req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) - fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), - mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) + fakeHandler := new(mocks.Handler) + fakeHandler.On("handleRequest", mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - fakeEmptyRequests := new(mockCounter) + fakeEmptyRequests := new(mocks.Counter) fakeEmptyRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + fakeHist := new(mocks.Histogram) + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, - emptyRequests: fakeEmptyRequests, - incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, - incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + fakeTel := &handler.Telemetry{ + EmptyRequests: fakeEmptyRequests, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusBadRequest, resp.StatusCode) @@ -306,6 +312,7 @@ func TestServerUnableToReadBody(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) var buffer bytes.Buffer r := iotest.TimeoutReader(bytes.NewReader(buffer.Bytes())) @@ -314,37 +321,37 @@ func TestServerUnableToReadBody(t *testing.T) { req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - fakeErrorRequests := new(mockCounter) + fakeErrorRequests := new(mocks.Counter) fakeErrorRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + fakeHist := new(mocks.Histogram) + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, - errorRequests: fakeErrorRequests, - incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, - incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + fakeTel := &handler.Telemetry{ + ErrorRequests: fakeErrorRequests, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusBadRequest, resp.StatusCode) @@ -361,6 +368,7 @@ func TestServerInvalidBody(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) r := bytes.NewReader([]byte("Invalid payload.")) @@ -368,38 +376,38 @@ func TestServerInvalidBody(t *testing.T) { req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeInvalidCount := new(mockCounter) + fakeInvalidCount := new(mocks.Counter) fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() - fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + fakeHist := new(mocks.Histogram) + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, - invalidCount: fakeInvalidCount, - incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, - incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + fakeTel := &handler.Telemetry{ + InvalidCount: fakeInvalidCount, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusBadRequest, resp.StatusCode) @@ -415,22 +423,25 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) assert := assert.New(t) - logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) - fakeQueueDepth := new(mockGauge) + fakeHandler := new(mocks.Handler) - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, - incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, + fakeQueueDepth := new(mocks.Gauge) + fakeTel := &handler.Telemetry{ + IncomingQueueDepthMetric: fakeQueueDepth, } + + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + testCases := []struct { name string headers []string @@ -448,9 +459,9 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - fakeHist := new(mockHistogram) - serverWrapper.incomingQueueLatency = fakeHist - serverWrapper.now = mockTime(date1, date2) + fakeHist := new(mocks.Histogram) + fakeHandler.Telemetry.IncomingQueueLatency = fakeHist + fakeHandler.Now = mocks.Time(date1, date2) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -460,7 +471,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { for _, h := range testCase.headers { req.Header.Add("Content-Type", h) } - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusUnsupportedMediaType, resp.StatusCode) diff --git a/primaryHandler.go b/internal/handler/primaryHandler.go similarity index 90% rename from primaryHandler.go rename to internal/handler/primaryHandler.go index 3c68c5bf..0de55d61 100644 --- a/primaryHandler.go +++ b/internal/handler/primaryHandler.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package handler import ( "bytes" @@ -22,6 +22,7 @@ import ( "github.com/xmidt-org/bascule/basculechecks" "github.com/xmidt-org/bascule/basculehelper" "github.com/xmidt-org/bascule/basculehttp" + "github.com/xmidt-org/caduceus/internal/logging" "github.com/xmidt-org/clortho" "github.com/xmidt-org/clortho/clorthozap" "github.com/xmidt-org/sallust" @@ -44,16 +45,6 @@ type CapabilityConfig struct { EndpointBuckets []string } -// JWTValidator provides a convenient way to define jwt validator through config files -type JWTValidator struct { - // Config is used to create the clortho Resolver & Refresher for JWT verification keys - Config clortho.Config `json:"config"` - - // Leeway is used to set the amount of time buffer should be given to JWT - // time values, such as nbf - Leeway bascule.Leeway -} - func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, sw *ServerHandler, router *mux.Router, prevVersionSupport bool) (*mux.Router, error) { auth, err := authenticationMiddleware(v, l) if err != nil { @@ -98,7 +89,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, logger.Debug("Created list of allowed basic auths", zap.Any("allowed", basicAllowed), zap.Any("config", basicAuth)) options := []basculehttp.COption{ - basculehttp.WithCLogger(getLogger), + basculehttp.WithCLogger(logging.GetLogger), // basculehttp.WithCErrorResponseFunc(listener.OnErrorResponse), } if len(basicAllowed) > 0 { @@ -219,7 +210,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, } authEnforcer := basculehttp.NewEnforcer( - basculehttp.WithELogger(getLogger), + basculehttp.WithELogger(logging.GetLogger), basculehttp.WithRules("Basic", bascule.Validators{ basculechecks.AllowAll(), }), @@ -227,8 +218,8 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, // basculehttp.WithEErrorResponseFunc(listener.OnErrorResponse), ) - authChain := alice.New(setLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later - authChainLegacy := alice.New(setLogger(logger), authConstructorLegacy, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener) commenting for now in case needed later + authChain := alice.New(logging.SetLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later + authChainLegacy := alice.New(logging.SetLogger(logger), authConstructorLegacy, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener) commenting for now in case needed later versionCompatibleAuth := alice.New(func(next http.Handler) http.Handler { return http.HandlerFunc(func(r http.ResponseWriter, req *http.Request) { diff --git a/basculeLogging.go b/internal/logging/basculeLogging.go similarity index 91% rename from basculeLogging.go rename to internal/logging/basculeLogging.go index d21dc17e..7a78d171 100644 --- a/basculeLogging.go +++ b/internal/logging/basculeLogging.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package logging import ( "context" @@ -12,6 +12,7 @@ import ( "github.com/xmidt-org/candlelight" "github.com/xmidt-org/sallust" + "go.uber.org/zap" ) @@ -27,7 +28,7 @@ func sanitizeHeaders(headers http.Header) (filtered http.Header) { return } -func setLogger(logger *zap.Logger) func(delegate http.Handler) http.Handler { +func SetLogger(logger *zap.Logger) func(delegate http.Handler) http.Handler { return func(delegate http.Handler) http.Handler { return http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { @@ -40,7 +41,7 @@ func setLogger(logger *zap.Logger) func(delegate http.Handler) http.Handler { } } -func getLogger(ctx context.Context) *zap.Logger { +func GetLogger(ctx context.Context) *zap.Logger { logger := sallust.Get(ctx).With(zap.Time("ts", time.Now().UTC()), zap.Any("caller", zap.WithCaller(true))) return logger } diff --git a/basculeLogging_test.go b/internal/logging/basculeLogging_test.go similarity index 98% rename from basculeLogging_test.go rename to internal/logging/basculeLogging_test.go index 98c2583f..6f6ce657 100644 --- a/basculeLogging_test.go +++ b/internal/logging/basculeLogging_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package logging import ( "net/http" diff --git a/metrics.go b/internal/metrics/metrics.go similarity index 96% rename from metrics.go rename to internal/metrics/metrics.go index b8e5d8ed..353c415e 100644 --- a/metrics.go +++ b/internal/metrics/metrics.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package metrics import ( "github.com/prometheus/client_golang/prometheus" @@ -33,11 +33,11 @@ const ( ) const ( - emptyContentTypeReason = "empty_content_type" - emptyUUIDReason = "empty_uuid" - bothEmptyReason = "empty_uuid_and_content_type" - networkError = "network_err" - unknownEventType = "unknown" + EmptyContentTypeReason = "empty_content_type" + EmptyUUIDReason = "empty_uuid" + BothEmptyReason = "empty_uuid_and_content_type" + NetworkError = "network_err" + UnknownEventType = "unknown" ) const ( @@ -84,7 +84,7 @@ type Metrics struct { } // TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called -func ProvideMetrics() fx.Option { +func Provide() fx.Option { return fx.Options( touchstone.Gauge(prometheus.GaugeOpts{ Name: IncomingQueueDepth, diff --git a/metrics_test.go b/internal/metrics/metrics_test.go similarity index 96% rename from metrics_test.go rename to internal/metrics/metrics_test.go index 8092f9f0..6dd46e9a 100644 --- a/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package metrics // Using Caduceus's test suite: // @@ -28,7 +28,7 @@ import ( func TestMetrics(t *testing.T) { assert := assert.New(t) - m := ProvideMetrics() + m := Provide() assert.NotNil(m) } diff --git a/mocks_test.go b/internal/mocks/mocks.go similarity index 51% rename from mocks_test.go rename to internal/mocks/mocks.go index 5a2c19b8..7d777e38 100644 --- a/mocks_test.go +++ b/internal/mocks/mocks.go @@ -1,44 +1,55 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package mocks import ( + "net/http" "time" "unicode/utf8" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" + "github.com/xmidt-org/caduceus/internal/handler" + "github.com/xmidt-org/caduceus/internal/sink" "github.com/xmidt-org/wrp-go/v3" + "go.uber.org/zap" ) // mockHandler only needs to mock the `HandleRequest` method -type mockHandler struct { +type Handler struct { mock.Mock + + SinkWrapper sink.Wrapper + Logger *zap.Logger + Telemetry *handler.Telemetry + IncomingQueueDepth int64 + MaxOutstanding int64 + Now func() time.Time } -func (m *mockHandler) HandleRequest(workerID int, msg *wrp.Message) { - m.Called(workerID, msg) +func (m *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + m.Called(r) } // mockSenderWrapper needs to mock things that the `SenderWrapper` does -type mockSenderWrapper struct { +type Wrapper struct { mock.Mock } -// func (m *mockSenderWrapper) Update(list []ancla.InternalWebhook) { +// func (m *MockSinkWrapper) Update(list []ancla.InternalWebhook) { // m.Called(list) // } -func (m *mockSenderWrapper) Queue(msg *wrp.Message) { +func (m *Wrapper) Queue(msg *wrp.Message) { m.Called(msg) } -func (m *mockSenderWrapper) Shutdown(gentle bool) { +func (m *Wrapper) Shutdown(gentle bool) { m.Called(gentle) } // mockTime provides two mock time values -func mockTime(one, two time.Time) func() time.Time { +func Time(one, two time.Time) func() time.Time { var called bool return func() time.Time { if called { @@ -49,18 +60,18 @@ func mockTime(one, two time.Time) func() time.Time { } } -type mockCounter struct { +type Counter struct { mock.Mock } -func (m *mockCounter) Add(delta float64) { +func (m *Counter) Add(delta float64) { m.Called(delta) } -func (m *mockCounter) Inc (){ +func (m *Counter) Inc() { m.Called(1) } -func (m *mockCounter) With(labelValues ...string) prometheus.Counter { +func (m *Counter) With(labelValues ...string) prometheus.Counter { for _, v := range labelValues { if !utf8.ValidString(v) { panic("not UTF-8") diff --git a/listenerStub.go b/internal/sink/listenerStub.go similarity index 99% rename from listenerStub.go rename to internal/sink/listenerStub.go index bd7cb723..ff2cdffa 100644 --- a/listenerStub.go +++ b/internal/sink/listenerStub.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "time" diff --git a/matcher.go b/internal/sink/matcher.go similarity index 96% rename from matcher.go rename to internal/sink/matcher.go index 59fbf098..15d15133 100644 --- a/matcher.go +++ b/internal/sink/matcher.go @@ -1,4 +1,6 @@ -package main +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package sink import ( "container/ring" diff --git a/sink.go b/internal/sink/sink.go similarity index 94% rename from sink.go rename to internal/sink/sink.go index 4921c336..6f9128b9 100644 --- a/sink.go +++ b/internal/sink/sink.go @@ -1,4 +1,6 @@ -package main +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package sink import ( "bytes" @@ -14,6 +16,7 @@ import ( "strings" "time" + "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/retry" "github.com/xmidt-org/retry/retryhttp" "github.com/xmidt-org/wrp-go/v3" @@ -36,7 +39,7 @@ type WebhookV1 struct { clientMiddleware func(http.Client) http.Client } -func NewWebhookV1(s *SinkSender) { +func NewWebhookV1(s *sender) { v1 := &WebhookV1{ id: s.id, deliveryInterval: s.deliveryInterval, @@ -161,7 +164,7 @@ func (v1 *WebhookV1) updateRequest(urls *ring.Ring) func(*http.Request) *http.Re urls = urls.Next() tmp, err := url.Parse(urls.Value.(string)) if err != nil { - v1.logger.Error("failed to update url", zap.String(UrlLabel, urls.Value.(string)), zap.Error(err)) + v1.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err)) } request.URL = tmp return request diff --git a/caduceus_type.go b/internal/sink/sinkConfig.go similarity index 74% rename from caduceus_type.go rename to internal/sink/sinkConfig.go index e2806df5..6c009a0b 100644 --- a/caduceus_type.go +++ b/internal/sink/sinkConfig.go @@ -1,18 +1,12 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink -import ( - "time" - - "go.uber.org/zap" - - "github.com/xmidt-org/wrp-go/v3" -) +import "time" // Below is the struct we're using to contain the data from a provided config file // TODO: Try to figure out how to make bucket ranges configurable -type SinkConfig struct { +type Config struct { // The number of workers to assign to each SinkSender created. NumWorkersPerSender int @@ -54,17 +48,3 @@ type SinkConfig struct { // itself. IdleConnTimeout time.Duration } - -type RequestHandler interface { - HandleRequest(workerID int, msg *wrp.Message) -} - -type CaduceusHandler struct { - wrapper Wrapper - *zap.Logger -} - -func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { - ch.Logger.Info("Worker received a request, now passing to sender", zap.Int("workerId", workerID)) - ch.wrapper.Queue(msg) -} diff --git a/sinkSender.go b/internal/sink/sinkSender.go similarity index 77% rename from sinkSender.go rename to internal/sink/sinkSender.go index 18de7753..e7e91668 100644 --- a/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "bytes" @@ -20,6 +20,8 @@ import ( "go.uber.org/zap" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/caduceus/internal/client" + "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/webpa-common/v2/semaphore" "github.com/xmidt-org/wrp-go/v3" ) @@ -47,7 +49,7 @@ type Sender interface { RetiredSince() time.Time Queue(*wrp.Message) } -type SinkSender struct { +type sender struct { id string queueSize int deliveryRetries int @@ -91,69 +93,69 @@ type SinkMetrics struct { currentWorkersGauge prometheus.Gauge } -func NewSinkSender(sw *SinkWrapper, l Listener) (sender *SinkSender, err error) { +func NewSender(w *wrapper, l Listener) (s *sender, err error) { - if sw.clientMiddleware == nil { - sw.clientMiddleware = nopClient + if w.clientMiddleware == nil { + w.clientMiddleware = client.NopClient } - if sw.client == nil { + if w.client == nil { err = errors.New("nil Client") return } - if sw.config.CutOffPeriod.Nanoseconds() == 0 { + if w.config.CutOffPeriod.Nanoseconds() == 0 { err = errors.New("invalid CutOffPeriod") return } - if sw.logger == nil { + if w.logger == nil { err = errors.New("logger required") return } id := l.GetId() - sender = &SinkSender{ + s = &sender{ id: id, listener: l, - queueSize: sw.config.QueueSizePerSender, + queueSize: w.config.QueueSizePerSender, deliverUntil: l.GetUntil(), // dropUntil: where is this being set in old caduceus?, - cutOffPeriod: sw.config.CutOffPeriod, - deliveryRetries: sw.config.DeliveryRetries, - deliveryInterval: sw.config.DeliveryInterval, - maxWorkers: sw.config.NumWorkersPerSender, + cutOffPeriod: w.config.CutOffPeriod, + deliveryRetries: w.config.DeliveryRetries, + deliveryInterval: w.config.DeliveryInterval, + maxWorkers: w.config.NumWorkersPerSender, failureMessage: FailureMessage{ Original: l, Text: failureText, - CutOffPeriod: sw.config.CutOffPeriod.String(), - QueueSize: sw.config.QueueSizePerSender, - Workers: sw.config.NumWorkersPerSender, + CutOffPeriod: w.config.CutOffPeriod.String(), + QueueSize: w.config.QueueSizePerSender, + Workers: w.config.NumWorkersPerSender, }, - customPIDs: sw.config.CustomPIDs, - disablePartnerIDs: sw.config.DisablePartnerIDs, + customPIDs: w.config.CustomPIDs, + disablePartnerIDs: w.config.DisablePartnerIDs, } - sender.CreateMetrics(sw.metrics) - sender.queueDepthGauge.Set(0) - sender.currentWorkersGauge.Set(0) + s.CreateMetrics(w.metrics) + s.queueDepthGauge.Set(0) + s.currentWorkersGauge.Set(0) //TODO: need to figure out how to set this up // Don't share the secret with others when there is an error. // sinkSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - sender.queue.Store(make(chan *wrp.Message, sw.config.QueueSizePerSender)) + s.queue.Store(make(chan *wrp.Message, w.config.QueueSizePerSender)) - if err = sender.Update(l); nil != err { + if err = s.Update(l); nil != err { return } - sender.workers = semaphore.New(sender.maxWorkers) - sender.wg.Add(1) - go sender.dispatcher() + s.workers = semaphore.New(s.maxWorkers) + s.wg.Add(1) + go s.dispatcher() return } -func (s *SinkSender) Update(l Listener) (err error) { +func (s *sender) Update(l Listener) (err error) { switch v := l.(type) { case *ListenerV1: m := &MatcherV1{} @@ -189,7 +191,7 @@ func (s *SinkSender) Update(l Listener) (err error) { // of messages to deliver. The request is checked to see if it matches the // criteria before being accepted or silently dropped. // TODO: can pass in message along with webhook information -func (s *SinkSender) Queue(msg *wrp.Message) { +func (s *sender) Queue(msg *wrp.Message) { s.mutex.RLock() deliverUntil := s.deliverUntil dropUntil := s.dropUntil @@ -231,7 +233,7 @@ func (s *SinkSender) Queue(msg *wrp.Message) { // Shutdown causes the CaduceusOutboundSender to stop its activities either gently or // abruptly based on the gentle parameter. If gentle is false, all queued // messages will be dropped without an attempt to send made. -func (s *SinkSender) Shutdown(gentle bool) { +func (s *sender) Shutdown(gentle bool) { if !gentle { // need to close the channel we're going to replace, in case it doesn't // have any events in it. @@ -250,7 +252,7 @@ func (s *SinkSender) Shutdown(gentle bool) { // RetiredSince returns the time the CaduceusOutboundSender retired (which could be in // the future). -func (s *SinkSender) RetiredSince() time.Time { +func (s *sender) RetiredSince() time.Time { s.mutex.RLock() deliverUntil := s.deliverUntil s.mutex.RUnlock() @@ -268,7 +270,7 @@ func overlaps(sl1 []string, sl2 []string) bool { return false } -func (s *SinkSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { +func (s *sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { if !now.After(dropUntil) { // client was cut off s.droppedCutoffCounter.Add(1.0) @@ -288,7 +290,7 @@ func (s *SinkSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) b // a fresh one, counting any current messages in the queue as dropped. // It should never close a queue, as a queue not referenced anywhere will be // cleaned up by the garbage collector without needing to be closed. -func (s *SinkSender) Empty(droppedCounter prometheus.Counter) { +func (s *sender) Empty(droppedCounter prometheus.Counter) { droppedMsgs := s.queue.Load().(chan *wrp.Message) s.queue.Store(make(chan *wrp.Message, s.queueSize)) droppedCounter.Add(float64(len(droppedMsgs))) @@ -298,7 +300,7 @@ func (s *SinkSender) Empty(droppedCounter prometheus.Counter) { // queueOverflow handles the logic of what to do when a queue overflows: // cutting off the webhook for a time and sending a cut off notification // to the failure URL. -func (s *SinkSender) queueOverflow() { +func (s *sender) queueOverflow() { s.mutex.Lock() if time.Now().Before(s.dropUntil) { s.mutex.Unlock() @@ -373,7 +375,7 @@ func (s *SinkSender) queueOverflow() { } } -func (s *SinkSender) dispatcher() { +func (s *sender) dispatcher() { defer s.wg.Done() var ( msg *wrp.Message @@ -443,22 +445,22 @@ Loop: } } -func (s *SinkSender) CreateMetrics(metrics Metrics) { - s.deliveryRetryCounter = metrics.DeliveryRetryCounter - s.deliveryRetryMaxGauge = metrics.DeliveryRetryMaxGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.cutOffCounter = metrics.CutOffCounter.With(prometheus.Labels{UrlLabel: s.id}) - s.droppedQueueFullCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "queue_full"}) - s.droppedExpiredCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "expired"}) - s.droppedExpiredBeforeQueueCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "expired_before_queueing"}) - s.droppedCutoffCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "cut_off"}) - s.droppedInvalidConfig = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "invalid_config"}) - s.droppedNetworkErrCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: networkError}) - s.droppedPanic = metrics.DropsDueToPanic.With(prometheus.Labels{UrlLabel: s.id}) - s.queueDepthGauge = metrics.OutgoingQueueDepth.With(prometheus.Labels{UrlLabel: s.id}) - s.renewalTimeGauge = metrics.ConsumerRenewalTimeGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.deliverUntilGauge = metrics.ConsumerDeliverUntilGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.dropUntilGauge = metrics.ConsumerDropUntilGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.currentWorkersGauge = metrics.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.maxWorkersGauge = metrics.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{UrlLabel: s.id}) +func (s *sender) CreateMetrics(m metrics.Metrics) { + s.deliveryRetryCounter = m.DeliveryRetryCounter + s.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"}) + s.droppedExpiredCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"}) + s.droppedExpiredBeforeQueueCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"}) + s.droppedCutoffCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}) + s.droppedInvalidConfig = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}) + s.droppedNetworkErrCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) + s.droppedPanic = m.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.queueDepthGauge = m.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.renewalTimeGauge = m.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.deliverUntilGauge = m.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.dropUntilGauge = m.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.currentWorkersGauge = m.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.maxWorkersGauge = m.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) } diff --git a/sinkSender_test.go b/internal/sink/sinkSender_test.go similarity index 99% rename from sinkSender_test.go rename to internal/sink/sinkSender_test.go index a45176c0..7dd524f2 100644 --- a/sinkSender_test.go +++ b/internal/sink/sinkSender_test.go @@ -1,6 +1,6 @@ -// // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// // SPDX-License-Identifier: Apache-2.0 -package main +// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package sink // import ( // "bytes" diff --git a/sinkWrapper.go b/internal/sink/sinkWrapper.go similarity index 66% rename from sinkWrapper.go rename to internal/sink/sinkWrapper.go index 991cd6a9..37fd1419 100644 --- a/sinkWrapper.go +++ b/internal/sink/sinkWrapper.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "crypto/tls" @@ -11,6 +11,9 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/caduceus/internal/client" + "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/candlelight" "github.com/xmidt-org/wrp-go/v3" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" @@ -19,14 +22,14 @@ import ( ) // WrapperIn configures the Wrapper for creation -type SinkWrapperIn struct { +type WrapperIn struct { fx.In - Tracing candlelight.Tracing - SinkConfig SinkConfig - Metrics Metrics - EventType *prometheus.CounterVec - Logger *zap.Logger + Tracing candlelight.Tracing + Config Config + Metrics metrics.Metrics + EventType *prometheus.CounterVec + Logger *zap.Logger } // SinkWrapper interface is needed for unit testing. @@ -37,7 +40,7 @@ type Wrapper interface { } // Wrapper contains the configuration that will be shared with each outbound sender. It contains no external parameters. -type SinkWrapper struct { +type wrapper struct { // The amount of time to let expired SinkSenders linger before // shutting them down and cleaning up the resources associated with them. linger time.Duration @@ -46,23 +49,23 @@ type SinkWrapper struct { logger *zap.Logger //the configuration needed for eash sinkSender - config SinkConfig + config Config mutex *sync.RWMutex senders map[string]Sender eventType *prometheus.CounterVec wg sync.WaitGroup shutdown chan struct{} - metrics Metrics - client Client //TODO: keeping here for now - but might move to SinkSender in a later PR - clientMiddleware func(Client) Client //TODO: keeping here for now - but might move to SinkSender in a later PR + metrics metrics.Metrics + client client.Client //TODO: keeping here for now - but might move to SinkSender in a later PR + clientMiddleware func(client.Client) client.Client //TODO: keeping here for now - but might move to SinkSender in a later PR } -func ProvideWrapper() fx.Option { +func Provide() fx.Option { return fx.Provide( - func(in MetricsIn) Metrics { - senderMetrics := Metrics{ + func(in metrics.MetricsIn) metrics.Metrics { + senderMetrics := metrics.Metrics{ DeliveryCounter: in.DeliveryCounter, DeliveryRetryCounter: in.DeliveryRetryCounter, DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, @@ -79,39 +82,39 @@ func ProvideWrapper() fx.Option { } return senderMetrics }, - func(in SinkWrapperIn) (*SinkWrapper, error) { - csw, err := NewSinkWrapper(in) - return csw, err + func(in WrapperIn) (Wrapper, error) { + w, err := NewWrapper(in) + return w, err }, ) } -func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { - sw = &SinkWrapper{ - linger: in.SinkConfig.Linger, +func NewWrapper(in WrapperIn) (wr Wrapper, err error) { + w := &wrapper{ + linger: in.Config.Linger, logger: in.Logger, eventType: in.EventType, - config: in.SinkConfig, + config: in.Config, metrics: in.Metrics, } - if in.SinkConfig.Linger <= 0 { - linger := fmt.Sprintf("linger not positive: %v", in.SinkConfig.Linger) + if in.Config.Linger <= 0 { + linger := fmt.Sprintf("linger not positive: %v", in.Config.Linger) err = errors.New(linger) - sw = nil + w = nil return } - sw.senders = make(map[string]Sender) - sw.shutdown = make(chan struct{}) + w.senders = make(map[string]Sender) + w.shutdown = make(chan struct{}) - sw.wg.Add(1) - go undertaker(sw) + w.wg.Add(1) + go undertaker(w) return } // no longer being initialized at start up - needs to be initialized by the creation of the outbound sender -func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { +func NewRoundTripper(config Config, tracing candlelight.Tracing) (tr http.RoundTripper) { tr = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, MaxIdleConnsPerHost: config.NumWorkersPerSender, @@ -130,7 +133,7 @@ func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.Ro // Update is called when we get changes to our webhook listeners with either // additions, or updates. This code takes care of building new OutboundSenders // and maintaining the existing OutboundSenders. -func (sw *SinkWrapper) Update(list []Listener) { +func (w *wrapper) Update(list []Listener) { ids := make([]struct { Listener Listener @@ -142,31 +145,31 @@ func (sw *SinkWrapper) Update(list []Listener) { ids[i].ID = v.GetId() } - sw.mutex.Lock() - defer sw.mutex.Unlock() + w.mutex.Lock() + defer w.mutex.Unlock() for _, inValue := range ids { - sender, ok := sw.senders[inValue.ID] + sender, ok := w.senders[inValue.ID] if !ok { var ss Sender var err error listener := inValue.Listener - metricWrapper, err := newMetricWrapper(time.Now, sw.metrics.QueryLatency, inValue.ID) + metricWrapper, err := client.NewMetricWrapper(time.Now, w.metrics.QueryLatency, inValue.ID) if err != nil { continue } - ss, err = NewSinkSender(sw, listener) - sw.clientMiddleware = metricWrapper.roundTripper + ss, err = NewSender(w, listener) + w.clientMiddleware = metricWrapper.RoundTripper // { // ss, err = newSinkSender(sw, r1) // } if err == nil { - sw.senders[inValue.ID] = ss + w.senders[inValue.ID] = ss } continue } @@ -177,13 +180,13 @@ func (sw *SinkWrapper) Update(list []Listener) { // Queue is used to send all the possible outbound senders a request. This // function performs the fan-out and filtering to multiple possible endpoints. -func (sw *SinkWrapper) Queue(msg *wrp.Message) { - sw.mutex.RLock() - defer sw.mutex.RUnlock() +func (w *wrapper) Queue(msg *wrp.Message) { + w.mutex.RLock() + defer w.mutex.RUnlock() - sw.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1) + w.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1) - for _, v := range sw.senders { + for _, v := range w.senders { v.Queue(msg) } } @@ -191,57 +194,57 @@ func (sw *SinkWrapper) Queue(msg *wrp.Message) { // Shutdown closes down the delivery mechanisms and cleans up the underlying // OutboundSenders either gently (waiting for delivery queues to empty) or not // (dropping enqueued messages) -func (sw *SinkWrapper) Shutdown(gentle bool) { - sw.mutex.Lock() - defer sw.mutex.Unlock() - for k, v := range sw.senders { +func (w *wrapper) Shutdown(gentle bool) { + w.mutex.Lock() + defer w.mutex.Unlock() + for k, v := range w.senders { v.Shutdown(gentle) - delete(sw.senders, k) + delete(w.senders, k) } - close(sw.shutdown) + close(w.shutdown) } // undertaker looks at the OutboundSenders periodically and prunes the ones // that have been retired for too long, freeing up resources. -func undertaker(sw *SinkWrapper) { - defer sw.wg.Done() +func undertaker(w *wrapper) { + defer w.wg.Done() // Collecting unused OutboundSenders isn't a huge priority, so do it // slowly. - ticker := time.NewTicker(2 * sw.linger) + ticker := time.NewTicker(2 * w.linger) for { select { case <-ticker.C: - threshold := time.Now().Add(-1 * sw.linger) + threshold := time.Now().Add(-1 * w.linger) // Actually shutting these down could take longer then we // want to lock the mutex, so just remove them from the active // list & shut them down afterwards. - deadList := createDeadlist(sw, threshold) + deadList := createDeadlist(w, threshold) // Shut them down for _, v := range deadList { v.Shutdown(false) } - case <-sw.shutdown: + case <-w.shutdown: ticker.Stop() return } } } -func createDeadlist(sw *SinkWrapper, threshold time.Time) map[string]Sender { - if sw == nil || threshold.IsZero() { +func createDeadlist(w *wrapper, threshold time.Time) map[string]Sender { + if w == nil || threshold.IsZero() { return nil } deadList := make(map[string]Sender) - sw.mutex.Lock() - defer sw.mutex.Unlock() - for k, v := range sw.senders { + w.mutex.Lock() + defer w.mutex.Unlock() + for k, v := range w.senders { retired := v.RetiredSince() if threshold.After(retired) { deadList[k] = v - delete(sw.senders, k) + delete(w.senders, k) } } return deadList diff --git a/sinkWrapper_test.go b/internal/sink/sinkWrapper_test.go similarity index 99% rename from sinkWrapper_test.go rename to internal/sink/sinkWrapper_test.go index 3b588744..5bceb2cf 100644 --- a/sinkWrapper_test.go +++ b/internal/sink/sinkWrapper_test.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "net/http" diff --git a/logger.go b/logger.go index d58804d7..6150c376 100644 --- a/logger.go +++ b/logger.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "github.com/xmidt-org/sallust" diff --git a/routes.go b/routes.go index 03369b81..1159d7ab 100644 --- a/routes.go +++ b/routes.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "fmt" @@ -10,6 +10,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/xmidt-org/arrange/arrangehttp" "github.com/xmidt-org/arrange/arrangepprof" + "github.com/xmidt-org/caduceus/internal/handler" "github.com/xmidt-org/candlelight" "github.com/xmidt-org/httpaux" "github.com/xmidt-org/httpaux/recovery" @@ -19,11 +20,18 @@ import ( "go.uber.org/fx" ) +const ( + apiVersion = "v4" + prevAPIVersion = "v3" + apiBase = "api/" + apiVersion + apiBaseDualVersion = "api/{version:" + apiVersion + "|" + prevAPIVersion + "}" +) + type RoutesIn struct { fx.In PrimaryMetrics touchhttp.ServerInstrumenter `name:"servers.primary.metrics"` AlternateMetrics touchhttp.ServerInstrumenter `name:"servers.alternate.metrics"` - Handler *ServerHandler + Handler *handler.ServerHandler Tracing candlelight.Tracing PreviousVersionSupport bool }