Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/senders v2 #451

Merged
merged 12 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 43 additions & 15 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,64 @@ type CaduceusConfig struct {
AuthHeader []string
NumWorkerThreads int
JobQueueSize int
Sender SenderConfig
Sink SinkConfig
JWTValidators []JWTValidator
AllowInsecureTLS bool
}

type SenderConfig struct {
NumWorkersPerSender int
QueueSizePerSender int
CutOffPeriod time.Duration
Linger time.Duration
ClientTimeout time.Duration
type SinkConfig struct {
// The number of workers to assign to each SinkSender created.
NumWorkersPerSender int

// The queue size to assign to each SinkSender created.
QueueSizePerSender int

// The cut off time to assign to each SinkSender created.
CutOffPeriod time.Duration

// The amount of time to let expired SinkSenders linger before
// shutting them down and cleaning up the resources associated with them.
Linger time.Duration

// Number of delivery retries before giving up
DeliveryRetries int

// Time in between delivery retries
DeliveryInterval time.Duration

// CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message
// has no partner IDs.
CustomPIDs []string

// DisablePartnerIDs dictates whether or not to enforce the partner ID check.
DisablePartnerIDs bool

// ClientTimeout specifies a time limit for requests made by the SinkSender's Client
ClientTimeout time.Duration

//DisableClientHostnameValidation used in HTTP Client creation
DisableClientHostnameValidation bool
ResponseHeaderTimeout time.Duration
IdleConnTimeout time.Duration
DeliveryRetries int
DeliveryInterval time.Duration
CustomPIDs []string
DisablePartnerIDs bool

// ResponseHeaderTimeout specifies the amount of time to wait for a server's response headers after fully
// writing the request
ResponseHeaderTimeout time.Duration

//IdleConnTimeout is the maximum amount of time an idle
// (keep-alive) connection will remain idle before closing
// itself.
IdleConnTimeout time.Duration
}

type RequestHandler interface {
HandleRequest(workerID int, msg *wrp.Message)
}

type CaduceusHandler struct {
senderWrapper SenderWrapper
wrapper Wrapper
*zap.Logger
}

func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) {
ch.Logger.Info("Worker received a request, now passing to sender", zap.Int("workerId", workerID))
ch.senderWrapper.Queue(msg)
ch.wrapper.Queue(msg)
}
2 changes: 1 addition & 1 deletion caduceus_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestCaduceusHandler(t *testing.T) {
fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once()

testHandler := CaduceusHandler{
senderWrapper: fakeSenderWrapper,
wrapper: fakeSenderWrapper,
Logger: logger,
}

Expand Down
9 changes: 9 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import "net/http"

// Client is the interface used to requests messages to the desired location.
// The Client can be either and HTTP Client or a Kafka Producer.
type Client interface {
Do(*http.Request) (*http.Response, error)
}
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
Servers Servers
ArgusClientTimeout HttpClientTimeout
JWTValidator JWTValidator
Sender SenderConfig
Sink SinkConfig
Service Service
AuthHeader []string
Server string
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ require (
github.com/xmidt-org/candlelight v0.0.21
github.com/xmidt-org/clortho v0.0.4
github.com/xmidt-org/httpaux v0.4.0
github.com/xmidt-org/retry v0.0.3
github.com/xmidt-org/sallust v0.2.2
github.com/xmidt-org/touchstone v0.1.3
github.com/xmidt-org/webhook-schema v0.1.0
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1
Expand Down Expand Up @@ -86,6 +88,7 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/xmidt-org/chronon v0.1.1 // indirect
github.com/xmidt-org/urlegit v0.1.0 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,8 @@ github.com/xmidt-org/httpaux v0.2.1/go.mod h1:mviIlg5fHGb3lAv3l0sbiwVG/q9rqvXaud
github.com/xmidt-org/httpaux v0.3.2/go.mod h1:qmlPisXf80FTi3y4gX43eYbCVruSQyvu+FPx1jzvQG8=
github.com/xmidt-org/httpaux v0.4.0 h1:cAL/MzIBpSsv4xZZeq/Eu1J5M3vfNe49xr41mP3COKU=
github.com/xmidt-org/httpaux v0.4.0/go.mod h1:UypqZwuZV1nn8D6+K1JDb+im9IZrLNg/2oO/Bgiybxc=
github.com/xmidt-org/retry v0.0.3 h1:wvmBnEEn1OKwSZaQtr1RZ2Vey8JIvP72mGTgR+3wPiM=
github.com/xmidt-org/retry v0.0.3/go.mod h1:I7FO3VVrxPckNuotwGYZIxfBnmjMSyOTitTKNL0VkIA=
github.com/xmidt-org/sallust v0.1.5/go.mod h1:azcKBypudADIeZ3Em8zGjVq3yQ7n4ueSvM/degHMIxo=
github.com/xmidt-org/sallust v0.2.0/go.mod h1:HCQQn7po8czynjxtNVyZ5vzWuTqJyJwnPWkQoqBX67s=
github.com/xmidt-org/sallust v0.2.1/go.mod h1:68C0DLwD5xlhRznXTWmfUhx0etyrFpSOzYGU7jzmpzs=
Expand All @@ -1703,6 +1705,10 @@ github.com/xmidt-org/touchstone v0.1.1/go.mod h1:7Rgqs44l1VndkvFUZewr8WpItzxfJSx
github.com/xmidt-org/touchstone v0.1.2/go.mod h1:2xVJVO8FE393Aofw/FD8Cu9wXES4n1AlJP109Nk7/gg=
github.com/xmidt-org/touchstone v0.1.3 h1:AtqUBa8U4lyVQj6eRZ9Uo8NShkWWq3StObRRkrv7N0Q=
github.com/xmidt-org/touchstone v0.1.3/go.mod h1:vzFD11v5urS3RKVP0jDva/j9aHEjwVZCg3nNMdRSk6E=
github.com/xmidt-org/urlegit v0.1.0 h1:WZLlWo0e5JNZabLEi7/1+sK/np9qrH9XnoB+ZdsHieM=
github.com/xmidt-org/urlegit v0.1.0/go.mod h1:ih/VtgW3xfpV7FNIrHUpNdP0GapcfLOND8y0JwH51vA=
github.com/xmidt-org/webhook-schema v0.1.0 h1:QYutPymtGd6OvukVHTDwpQIEvmk5uNnN8CgbppS089A=
github.com/xmidt-org/webhook-schema v0.1.0/go.mod h1:x3G1lmhryIbr6QXLyzagVkcfY1ZhBxGlP0CdvRD3zZI=
github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8=
Expand Down
32 changes: 16 additions & 16 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

type ServerHandlerIn struct {
fx.In
CaduceusSenderWrapper *CaduceusSenderWrapper
Logger *zap.Logger
Telemetry *HandlerTelemetry
SinkWrapper *SinkWrapper
Logger *zap.Logger
Telemetry *HandlerTelemetry
}

type ServerHandlerOut struct {
Expand All @@ -40,20 +40,20 @@ type ServerHandler struct {
}
type HandlerTelemetryIn struct {
fx.In
ErrorRequests prometheus.Counter `name:"error_request_body_counter"`
EmptyRequests prometheus.Counter `name:"empty_request_boyd_counter"`
InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"`
IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"`
ModifiedWRPCount prometheus.CounterVec `name:"modified_wrp_count"`
IncomingQueueLatency prometheus.HistogramVec `name:"incoming_queue_latency_histogram_seconds"`
ErrorRequests prometheus.Counter `name:"error_request_body_count"`
EmptyRequests prometheus.Counter `name:"empty_request_body_count"`
InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"`
IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"`
ModifiedWRPCount *prometheus.CounterVec `name:"modified_wrp_count"`
IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"`
}
type HandlerTelemetry struct {
errorRequests prometheus.Counter
emptyRequests prometheus.Counter
invalidCount prometheus.Counter
incomingQueueDepthMetric prometheus.Gauge
modifiedWRPCount prometheus.CounterVec
incomingQueueLatency prometheus.HistogramVec
modifiedWRPCount *prometheus.CounterVec
incomingQueueLatency prometheus.ObserverVec
}

func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R

func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {
endTime := sh.now()
sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(float64(endTime.Sub(startTime).Seconds()))
sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds())
}

func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
Expand Down Expand Up @@ -189,18 +189,18 @@ func ProvideHandler() fx.Option {
},
func(in ServerHandlerIn) (ServerHandlerOut, error) {
//Hard coding maxOutstanding and incomingQueueDepth for now
handler, err := New(in.CaduceusSenderWrapper, in.Logger, in.Telemetry, 0.0, 0.0)
handler, err := New(in.SinkWrapper, in.Logger, in.Telemetry, 0.0, 0.0)
return ServerHandlerOut{
Handler: handler,
}, err
},
)
}
func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {
func New(sw *SinkWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {
return &ServerHandler{
caduceusHandler: &CaduceusHandler{
senderWrapper: senderWrapper,
Logger: log,
wrapper: sw,
Logger: log,
},
telemetry: t,
maxOutstanding: maxOutstanding,
Expand Down
14 changes: 5 additions & 9 deletions httpClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ var (
errNilHistogram = errors.New("histogram cannot be nil")
)

type httpClient interface {
Do(*http.Request) (*http.Response, error)
}

func nopHTTPClient(next httpClient) httpClient {
func nopClient(next Client) Client {
return next
}

Expand All @@ -33,15 +29,15 @@ func (d doerFunc) Do(req *http.Request) (*http.Response, error) {

type metricWrapper struct {
now func() time.Time
queryLatency prometheus.HistogramVec
queryLatency prometheus.ObserverVec
id string
}

func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) {
func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) {
if now == nil {
now = time.Now
}
if queryLatency.MetricVec == nil {
if queryLatency == nil {
return nil, errNilHistogram
}
return &metricWrapper{
Expand All @@ -51,7 +47,7 @@ func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec
}, nil
}

func (m *metricWrapper) roundTripper(next httpClient) httpClient {
func (m *metricWrapper) roundTripper(next Client) Client {
return doerFunc(func(req *http.Request) (*http.Response, error) {
startTime := m.now()
resp, err := next.Do(req)
Expand Down
Loading
Loading