From e19e6f6e6ff9bbedbdb24aac63970ab4e7802b07 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Tue, 23 Jan 2024 09:52:03 -0500 Subject: [PATCH] updated the caduceus outbound sender --- caduceus_type.go | 8 --- http.go | 2 +- httpClient.go | 12 ++-- listenerStub.go | 56 +++++++++++++++ metrics.go | 69 +++++++++++------- outboundSender.go | 175 +++++++++++++++++++++++----------------------- senderWrapper.go | 136 +++++++++++++++-------------------- 7 files changed, 253 insertions(+), 205 deletions(-) create mode 100644 listenerStub.go diff --git a/caduceus_type.go b/caduceus_type.go index 2696c404..cd2b6bfc 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -7,8 +7,6 @@ import ( "go.uber.org/zap" - "github.com/go-kit/kit/metrics" - "github.com/xmidt-org/wrp-go/v3" ) @@ -38,12 +36,6 @@ type SenderConfig struct { DisablePartnerIDs bool } -type CaduceusMetricsRegistry interface { - NewCounter(name string) metrics.Counter - NewGauge(name string) metrics.Gauge - NewHistogram(name string, buckets int) metrics.Histogram -} - type RequestHandler interface { HandleRequest(workerID int, msg *wrp.Message) } diff --git a/http.go b/http.go index 29e20e8f..b89d4bec 100644 --- a/http.go +++ b/http.go @@ -139,7 +139,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R } eventType = msg.FindEventStringSubMatch() - // sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg)) + sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg)) // return a 202 response.WriteHeader(http.StatusAccepted) diff --git a/httpClient.go b/httpClient.go index 07033f5a..e478bb44 100644 --- a/httpClient.go +++ b/httpClient.go @@ -9,7 +9,7 @@ import ( "strconv" "time" - "github.com/go-kit/kit/metrics" + "github.com/prometheus/client_golang/prometheus" ) var ( @@ -33,19 +33,21 @@ func (d doerFunc) Do(req *http.Request) (*http.Response, error) { type metricWrapper struct { now func() time.Time - queryLatency metrics.Histogram + queryLatency prometheus.HistogramVec + id string } -func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*metricWrapper, error) { +func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) { if now == nil { now = time.Now } - if queryLatency == nil { + if queryLatency.MetricVec == nil { return nil, errNilHistogram } return &metricWrapper{ now: now, queryLatency: queryLatency, + id: id, }, nil } @@ -62,7 +64,7 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient { // find time difference, add to metric var latency = endTime.Sub(startTime) - m.queryLatency.With("code", code).Observe(latency.Seconds()) + m.queryLatency.With(prometheus.Labels{UrlLabel: m.id, CodeLabel: code}).Observe(latency.Seconds()) return resp, err }) diff --git a/listenerStub.go b/listenerStub.go new file mode 100644 index 00000000..4803f681 --- /dev/null +++ b/listenerStub.go @@ -0,0 +1,56 @@ +package main + +import "time" + +//This is a stub for the ancla listener. This will be removed once we can add ancla back into caduceus + +type ListenerStub struct { + PartnerIds []string + Webhook Webhook +} + +type Webhook struct { + // Address is the subscription request origin HTTP Address. + Address string `json:"registered_from_address"` + + // Config contains data to inform how events are delivered. + Config DeliveryConfig `json:"config"` + + // FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow. + // Optional, set to "" to disable notifications. + FailureURL string `json:"failure_url"` + + // Events is the list of regular expressions to match an event type against. + Events []string `json:"events"` + + // Matcher type contains values to match against the metadata. + Matcher MetadataMatcherConfig `json:"matcher,omitempty"` + + // Duration describes how long the subscription lasts once added. + Duration time.Duration `json:"duration"` + + // Until describes the time this subscription expires. + Until time.Time `json:"until"` +} + +// DeliveryConfig is a Webhook substructure with data related to event delivery. +type DeliveryConfig struct { + // URL is the HTTP URL to deliver messages to. + URL string `json:"url"` + + // ContentType is content type value to set WRP messages to (unless already specified in the WRP). + ContentType string `json:"content_type"` + + // Secret is the string value for the SHA1 HMAC. + // (Optional, set to "" to disable behavior). + Secret string `json:"secret,omitempty"` + + // AlternativeURLs is a list of explicit URLs that should be round robin through on failure cases to the main URL. + AlternativeURLs []string `json:"alt_urls,omitempty"` +} + +// MetadataMatcherConfig is Webhook substructure with config to match event metadata. +type MetadataMatcherConfig struct { + // DeviceID is the list of regular expressions to match device id type against. + DeviceID []string `json:"device_id"` +} diff --git a/metrics.go b/metrics.go index 2d10a5a3..0e6e0a48 100644 --- a/metrics.go +++ b/metrics.go @@ -3,7 +3,6 @@ package main import ( - "github.com/go-kit/kit/metrics" "github.com/prometheus/client_golang/prometheus" "go.uber.org/fx" @@ -49,29 +48,22 @@ const ( CodeLabel = "code" ) -func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) { - c.deliveryCounter = m.NewCounter(DeliveryCounter) - c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter) - c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id) - c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id) - c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full") - c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired") - c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing") - - c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") - c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") - c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError) - c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id) - c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id) - c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id) - c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id) - c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) - c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) - c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) -} - -func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram { - return m.NewHistogram(QueryDurationHistogram, 11) +type SenderMetricsIn struct { + fx.In + QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"` + EventType prometheus.CounterVec `name:"incoming_event_type_count"` + DeliveryCounter prometheus.CounterVec `name:"delivery_count"` + DeliveryRetryCounter prometheus.CounterVec `name:"DeliveryRetryCounter"` + DeliveryRetryMaxGauge prometheus.GaugeVec `name:"delivery_retry_max"` + CutOffCounter prometheus.CounterVec `name:"slow_consumer_cut_off_count"` + SlowConsumerDroppedMsgCounter prometheus.CounterVec `name:"slow_consumer_dropped_message_count"` + DropsDueToPanic prometheus.CounterVec `name:"drops_due_to_panic"` + ConsumerDeliverUntilGauge prometheus.GaugeVec `name:"consumer_deliver_until"` + ConsumerDropUntilGauge prometheus.GaugeVec `name:"consumer_drop_until"` + ConsumerDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers"` + ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers_max"` + OutgoingQueueDepth prometheus.GaugeVec `name:"outgoing_queue_depths"` + ConsumerRenewalTimeGauge prometheus.GaugeVec `name:"consumer_renewal_time"` } // TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called @@ -161,3 +153,32 @@ func ProvideMetrics() fx.Option { }, EventLabel), ) } + +func ProvideSenderMetrics() fx.Option { + return fx.Provide( + func(in SenderMetricsIn) (SenderWrapperMetrics, OutboundSenderMetrics) { + outbounderMetrics := OutboundSenderMetrics{ + DeliveryCounter: in.DeliveryCounter, + DeliveryRetryCounter: in.DeliveryRetryCounter, + DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, + CutOffCounter: in.CutOffCounter, + SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter, + DropsDueToPanic: in.DropsDueToPanic, + ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge, + ConsumerDropUntilGauge: in.ConsumerDropUntilGauge, + ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge, + ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge, + OutgoingQueueDepth: in.OutgoingQueueDepth, + ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, + } + wrapperMetrics := SenderWrapperMetrics{ + QueryLatency: in.QueryLatency, + EventType: in.EventType, + } + + return wrapperMetrics, outbounderMetrics + }, + ) +} + + diff --git a/outboundSender.go b/outboundSender.go index ea9a8496..1bacf956 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" - "github.com/go-kit/kit/metrics" + "github.com/prometheus/client_golang/prometheus" "github.com/xmidt-org/webpa-common/v2/device" "github.com/xmidt-org/webpa-common/v2/semaphore" @@ -49,51 +49,6 @@ type FailureMessage struct { Workers int `json:"worker_count"` } -// OutboundSenderFactory is a configurable factory for OutboundSender objects. -type OutboundSenderFactory struct { - // The WebHookListener to service - // Listener ancla.InternalWebhook //TODO: add back in once ancla/argus dependency issue is fixed. - - // The http client Do() function to use for outbound requests. - // Sender func(*http.Request) (*http.Response, error) - Sender httpClient - - // - ClientMiddleware func(httpClient) httpClient - - // The number of delivery workers to create and use. - NumWorkers int - - // The queue depth to buffer events before we declare overflow, shut - // off the message delivery, and basically put the endpoint in "timeout." - QueueSize int - - // The amount of time to cut off the consumer if they don't keep up. - // Must be greater then 0 seconds - CutOffPeriod time.Duration - - // Number of delivery retries before giving up - DeliveryRetries int - - // Time in between delivery retries - DeliveryInterval time.Duration - - // Metrics registry. - MetricsRegistry CaduceusMetricsRegistry - - // The logger to use. - Logger *zap.Logger - - // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message - // has no partner IDs. - CustomPIDs []string - - // DisablePartnerIDs dictates whether or not to enforce the partner ID check. - DisablePartnerIDs bool - - QueryLatency metrics.Histogram -} - type OutboundSender interface { // Update(ancla.InternalWebhook) error Shutdown(bool) @@ -101,6 +56,30 @@ type OutboundSender interface { Queue(*wrp.Message) } +type OutboundSenderFactory struct { + Config SenderConfig + Logger *zap.Logger + Metrics OutboundSenderMetrics + Sender httpClient + Listener ListenerStub + ClientMiddleware func(httpClient) httpClient +} + +type OutboundSenderMetrics struct { + DeliveryCounter prometheus.CounterVec + DeliveryRetryCounter prometheus.CounterVec + DeliveryRetryMaxGauge prometheus.GaugeVec + CutOffCounter prometheus.CounterVec + SlowConsumerDroppedMsgCounter prometheus.CounterVec + DropsDueToPanic prometheus.CounterVec + ConsumerDeliverUntilGauge prometheus.GaugeVec + ConsumerDropUntilGauge prometheus.GaugeVec + ConsumerDeliveryWorkersGauge prometheus.GaugeVec + ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec + OutgoingQueueDepth prometheus.GaugeVec + ConsumerRenewalTimeGauge prometheus.GaugeVec +} + // CaduceusOutboundSender is the outbound sender object. type CaduceusOutboundSender struct { id string @@ -114,23 +93,23 @@ type CaduceusOutboundSender struct { queueSize int deliveryRetries int deliveryInterval time.Duration - deliveryCounter metrics.Counter - deliveryRetryCounter metrics.Counter - droppedQueueFullCounter metrics.Counter - droppedCutoffCounter metrics.Counter - droppedExpiredCounter metrics.Counter - droppedExpiredBeforeQueueCounter metrics.Counter - droppedNetworkErrCounter metrics.Counter - droppedInvalidConfig metrics.Counter - droppedPanic metrics.Counter - cutOffCounter metrics.Counter - queueDepthGauge metrics.Gauge - renewalTimeGauge metrics.Gauge - deliverUntilGauge metrics.Gauge - dropUntilGauge metrics.Gauge - maxWorkersGauge metrics.Gauge - currentWorkersGauge metrics.Gauge - deliveryRetryMaxGauge metrics.Gauge + deliveryCounter prometheus.CounterVec + deliveryRetryCounter prometheus.Counter + droppedQueueFullCounter prometheus.Counter + droppedCutoffCounter prometheus.Counter + droppedExpiredCounter prometheus.Counter + droppedExpiredBeforeQueueCounter prometheus.Counter + droppedNetworkErrCounter prometheus.Counter + droppedInvalidConfig prometheus.Counter + droppedPanic prometheus.Counter + cutOffCounter prometheus.Counter + queueDepthGauge prometheus.Gauge + renewalTimeGauge prometheus.Gauge + deliverUntilGauge prometheus.Gauge + dropUntilGauge prometheus.Gauge + maxWorkersGauge prometheus.Gauge + currentWorkersGauge prometheus.Gauge + deliveryRetryMaxGauge prometheus.Gauge wg sync.WaitGroup cutOffPeriod time.Duration workers semaphore.Interface @@ -145,27 +124,27 @@ type CaduceusOutboundSender struct { } // New creates a new OutboundSender object from the factory, or returns an error. -func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { +func (osf *OutboundSenderFactory) New() (obs OutboundSender, err error) { // if _, err = url.ParseRequestURI(osf.Listener.Webhook.Config.URL); nil != err { // return // } - if nil == osf.ClientMiddleware { + if osf.ClientMiddleware == nil { osf.ClientMiddleware = nopHTTPClient } - if nil == osf.Sender { + if osf.Sender == nil { err = errors.New("nil Sender()") return } - if 0 == osf.CutOffPeriod.Nanoseconds() { - err = errors.New("Invalid CutOffPeriod") + if osf.Config.CutOffPeriod.Nanoseconds() == 0 { + err = errors.New("invalid CutOffPeriod") return } - if nil == osf.Logger { - err = errors.New("Logger required") + if osf.Logger == nil { + err = errors.New("logger required") return } @@ -175,35 +154,35 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { // id: osf.Listener.Webhook.Config.URL, // listener: osf.Listener, sender: osf.Sender, - queueSize: osf.QueueSize, - cutOffPeriod: osf.CutOffPeriod, + queueSize: osf.Config.QueueSizePerSender, + cutOffPeriod: osf.Config.CutOffPeriod, // deliverUntil: osf.Listener.Webhook.Until, // logger: decoratedLogger, - deliveryRetries: osf.DeliveryRetries, - deliveryInterval: osf.DeliveryInterval, - maxWorkers: osf.NumWorkers, + deliveryRetries: osf.Config.DeliveryRetries, + deliveryInterval: osf.Config.DeliveryInterval, + maxWorkers: osf.Config.NumWorkersPerSender, failureMsg: FailureMessage{ // Original: osf.Listener, Text: failureText, - CutOffPeriod: osf.CutOffPeriod.String(), - QueueSize: osf.QueueSize, - Workers: osf.NumWorkers, + CutOffPeriod: osf.Config.CutOffPeriod.String(), + QueueSize: osf.Config.QueueSizePerSender, + Workers: osf.Config.NumWorkersPerSender, }, - customPIDs: osf.CustomPIDs, - disablePartnerIDs: osf.DisablePartnerIDs, + customPIDs: osf.Config.CustomPIDs, + disablePartnerIDs: osf.Config.DisablePartnerIDs, clientMiddleware: osf.ClientMiddleware, } // Don't share the secret with others when there is an error. // caduceusOutboundSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - CreateOutbounderMetrics(osf.MetricsRegistry, caduceusOutboundSender) + CreateOutbounderMetrics(osf.Metrics, caduceusOutboundSender) // update queue depth and current workers gauge to make sure they start at 0 caduceusOutboundSender.queueDepthGauge.Set(0) caduceusOutboundSender.currentWorkersGauge.Set(0) - caduceusOutboundSender.queue.Store(make(chan *wrp.Message, osf.QueueSize)) + caduceusOutboundSender.queue.Store(make(chan *wrp.Message, osf.Config.QueueSizePerSender)) // if err = caduceusOutboundSender.Update(osf.Listener); nil != err { // return @@ -456,7 +435,7 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti // a fresh one, counting any current messages in the queue as dropped. // It should never close a queue, as a queue not referenced anywhere will be // cleaned up by the garbage collector without needing to be closed. -func (obs *CaduceusOutboundSender) Empty(droppedCounter metrics.Counter) { +func (obs *CaduceusOutboundSender) Empty(droppedCounter prometheus.Counter) { droppedMsgs := obs.queue.Load().(chan *wrp.Message) obs.queue.Store(make(chan *wrp.Message, obs.queueSize)) droppedCounter.Add(float64(len(droppedMsgs))) @@ -591,7 +570,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri // Apply the secret - if "" != secret { + if secret != "" { s := hmac.New(sha1.New, []byte(secret)) s.Write(body) sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) @@ -601,11 +580,12 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri // find the event "short name" event := msg.FindEventStringSubMatch() + //TODO: do this need to be replaced by the retry repo? retryOptions := xhttp.RetryOptions{ Logger: obs.logger, Retries: obs.deliveryRetries, Interval: obs.deliveryInterval, - Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), + // Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), //webpa retry does not accept prometheus metrics // Always retry on failures up to the max count. ShouldRetry: xhttp.ShouldRetry, ShouldRetryStatus: xhttp.RetryCodes, @@ -646,7 +626,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri resp.Body.Close() } } - obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) + obs.deliveryCounter.With(prometheus.Labels{UrlLabel: obs.id, CodeLabel: code, EventLabel: event}).Add(1.0) l.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String("code", code), zap.String("url", req.URL.String())) } @@ -681,7 +661,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } // if no URL to send cut off notification to, do nothing - if "" == failureURL { + if failureURL == "" { return } @@ -696,7 +676,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } req.Header.Set("Content-Type", wrp.MimeTypeJson) - if "" != secret { + if secret != "" { h := hmac.New(sha1.New, []byte(secret)) h.Write(msg) sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil))) @@ -724,3 +704,22 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } } + +func CreateOutbounderMetrics(m OutboundSenderMetrics, c *CaduceusOutboundSender) { + c.deliveryRetryCounter = m.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: c.id}) + c.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{UrlLabel: c.id}) + c.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{UrlLabel: c.id}) + c.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: "queue_full"}) + c.droppedExpiredCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: "expired"}) + c.droppedExpiredBeforeQueueCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: "expired_before_queueing"}) + c.droppedCutoffCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: "cut_off"}) + c.droppedInvalidConfig = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: "invalid_config"}) + c.droppedNetworkErrCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: c.id, ReasonLabel: networkError}) + c.droppedPanic = m.DropsDueToPanic.With(prometheus.Labels{UrlLabel: c.id}) + c.queueDepthGauge = m.OutgoingQueueDepth.With(prometheus.Labels{UrlLabel: c.id}) + c.renewalTimeGauge = m.ConsumerRenewalTimeGauge.With(prometheus.Labels{UrlLabel: c.id}) + c.deliverUntilGauge = m.ConsumerDeliverUntilGauge.With(prometheus.Labels{UrlLabel: c.id}) + c.dropUntilGauge = m.ConsumerDropUntilGauge.With(prometheus.Labels{UrlLabel: c.id}) + c.currentWorkersGauge = m.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{UrlLabel: c.id}) + c.maxWorkersGauge = m.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{UrlLabel: c.id}) +} diff --git a/senderWrapper.go b/senderWrapper.go index 2fc269ef..76dcf95c 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -5,6 +5,7 @@ package main import ( "crypto/tls" "errors" + "fmt" "net/http" "sync" "time" @@ -21,26 +22,19 @@ import ( type CaduceusSenderWrapperIn struct { fx.In - Tracing candlelight.Tracing - SenderConfig SenderConfig - Metrics SenderMetrics - Logger *zap.Logger + Tracing candlelight.Tracing + SenderConfig SenderConfig + WrapperMetrics SenderWrapperMetrics + OutbounderMetrics OutboundSenderMetrics + Logger *zap.Logger + OutbounderFactory OutboundSenderFactory } -type CaduceusSenderWrapperOut struct { - fx.Out - CaduceusSenderWrapper *CaduceusSenderWrapper -} -type SenderMetricsIn struct { - fx.In - QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"` - EventType prometheus.CounterVec `name:"incoming_event_type_count"` -} - -type SenderMetrics struct { +type SenderWrapperMetrics struct { QueryLatency prometheus.HistogramVec EventType prometheus.CounterVec } + type SenderWrapper interface { // Update([]ancla.InternalWebhook) Queue(*wrp.Message) @@ -86,28 +80,19 @@ type CaduceusSenderWrapper struct { // DisablePartnerIDs dictates whether or not to enforce the partner ID check. disablePartnerIDs bool + outbounderSetUp *OutboundSenderFactory } var SenderWrapperModule = fx.Module("caduceusSenderWrapper", - fx.Provide( - func(in SenderMetricsIn) SenderMetrics { - return SenderMetrics{ - QueryLatency: in.QueryLatency, - EventType: in.EventType, - } - }, - ), fx.Provide( func(in CaduceusSenderWrapperIn) http.RoundTripper { return NewRoundTripper(in.SenderConfig, in.Tracing) }, ), fx.Provide( - func(tr http.RoundTripper, in CaduceusSenderWrapperIn) (CaduceusSenderWrapperOut, error) { + func(tr http.RoundTripper, in CaduceusSenderWrapperIn) (*CaduceusSenderWrapper, error) { csw, err := NewSenderWrapper(tr, in) - return CaduceusSenderWrapperOut{ - CaduceusSenderWrapper: csw, - }, err + return csw, err }, ), ) @@ -125,16 +110,20 @@ func NewSenderWrapper(tr http.RoundTripper, in CaduceusSenderWrapperIn) (csw *Ca logger: in.Logger, customPIDs: in.SenderConfig.CustomPIDs, disablePartnerIDs: in.SenderConfig.DisablePartnerIDs, - eventType: in.Metrics.EventType, - queryLatency: in.Metrics.QueryLatency, + eventType: in.WrapperMetrics.EventType, + queryLatency: in.WrapperMetrics.QueryLatency, } + + csw.outbounderSetUp.Config = in.SenderConfig + csw.outbounderSetUp.Logger = in.Logger + csw.outbounderSetUp.Metrics = in.OutbounderMetrics csw.sender = doerFunc((&http.Client{ Transport: tr, Timeout: in.SenderConfig.ClientTimeout, }).Do) if in.SenderConfig.Linger <= 0 { - err = errors.New("Linger must be positive.") + err = errors.New("linger must be positive") csw = nil return } @@ -163,58 +152,47 @@ func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http. return } -//Commenting out while until ancla/argus dependency issue is fixed. +// Commenting out while until ancla/argus dependency issue is fixed. // Update is called when we get changes to our webhook listeners with either // additions, or updates. This code takes care of building new OutboundSenders // and maintaining the existing OutboundSenders. -// func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) { -// // We'll like need this, so let's get one ready -// osf := OutboundSenderFactory{ -// Sender: sw.sender, -// CutOffPeriod: sw.cutOffPeriod, -// NumWorkers: sw.numWorkersPerSender, -// QueueSize: sw.queueSizePerSender, -// MetricsRegistry: sw.metricsRegistry, -// DeliveryRetries: sw.deliveryRetries, -// DeliveryInterval: sw.deliveryInterval, -// Logger: sw.logger, -// CustomPIDs: sw.customPIDs, -// DisablePartnerIDs: sw.disablePartnerIDs, -// QueryLatency: sw.queryLatency, -// } - -// ids := make([]struct { -// Listener ancla.InternalWebhook -// ID string -// }, len(list)) - -// for i, v := range list { -// ids[i].Listener = v -// ids[i].ID = v.Webhook.Config.URL -// } - -// sw.mutex.Lock() -// defer sw.mutex.Unlock() - -// for _, inValue := range ids { -// sender, ok := sw.senders[inValue.ID] -// if !ok { -// osf.Listener = inValue.Listener -// metricWrapper, err := newMetricWrapper(time.Now, osf.QueryLatency.With("url", inValue.ID)) - -// if err != nil { -// continue -// } -// osf.ClientMiddleware = metricWrapper.roundTripper -// obs, err := osf.New() -// if nil == err { -// sw.senders[inValue.ID] = obs -// } -// continue -// } -// sender.Update(inValue.Listener) -// } -// } +func (sw *CaduceusSenderWrapper) Update(list []ListenerStub) { + + ids := make([]struct { + Listener ListenerStub + ID string + }, len(list)) + + for i, v := range list { + ids[i].Listener = v + ids[i].ID = v.Webhook.Config.URL + } + + sw.mutex.Lock() + defer sw.mutex.Unlock() + + for _, inValue := range ids { + sender, ok := sw.senders[inValue.ID] + if !ok { + osf := sw.outbounderSetUp + osf.Sender = sw.sender + osf.Listener = inValue.Listener + metricWrapper, err := newMetricWrapper(time.Now, sw.queryLatency, inValue.ID) + + if err != nil { + continue + } + osf.ClientMiddleware = metricWrapper.roundTripper + obs, err := osf.New() + if nil == err { + sw.senders[inValue.ID] = obs + } + continue + } + fmt.Println(sender) + // sender.Update(inValue.Listener) //commenting out until argus/ancla fix + } +} // Queue is used to send all the possible outbound senders a request. This // function performs the fan-out and filtering to multiple possible endpoints.