Skip to content

Commit

Permalink
Merge branch 'tests' into feat/senders-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Jan 29, 2024
2 parents e480530 + 49f2c98 commit 1dc32f3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 35 deletions.
22 changes: 11 additions & 11 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
uuid "github.com/satori/go.uuid"

"github.com/xmidt-org/sallust"
"github.com/xmidt-org/wrp-go/v3"
)

Expand All @@ -31,7 +32,6 @@ type ServerHandlerOut struct {

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
log *zap.Logger
caduceusHandler RequestHandler
telemetry *HandlerTelemetry
incomingQueueDepth int64
Expand All @@ -58,18 +58,19 @@ type HandlerTelemetry struct {

func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
eventType := unknownEventType
log := sallust.Get(request.Context())
// find time difference, add to metric after function finishes
defer func(s time.Time) {
sh.recordQueueLatencyToHistogram(s, eventType)
}(sh.now())

sh.log.Info("Receiving incoming request...")
log.Info("Receiving incoming request...")

if len(request.Header["Content-Type"]) != 1 || request.Header["Content-Type"][0] != "application/msgpack" {
//return a 415
response.WriteHeader(http.StatusUnsupportedMediaType)
response.Write([]byte("Invalid Content-Type header(s). Expected application/msgpack. \n"))
sh.log.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
log.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
return
}

Expand All @@ -80,7 +81,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
// return a 503
response.WriteHeader(http.StatusServiceUnavailable)
response.Write([]byte("Incoming queue is full.\n"))
sh.log.Debug("Incoming queue is full.\n")
log.Debug("Incoming queue is full.\n")
return
}

Expand All @@ -90,14 +91,14 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
payload, err := io.ReadAll(request.Body)
if err != nil {
sh.telemetry.errorRequests.Add(1.0)
sh.log.Error("Unable to retrieve the request body.", zap.Error(err))
log.Error("Unable to retrieve the request body.", zap.Error(err))
response.WriteHeader(http.StatusBadRequest)
return
}

if len(payload) == 0 {
sh.telemetry.emptyRequests.Add(1.0)
sh.log.Error("Empty payload.")
log.Error("Empty payload.")
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Empty payload.\n"))
return
Expand All @@ -113,10 +114,10 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
response.WriteHeader(http.StatusBadRequest)
if err != nil {
response.Write([]byte("Invalid payload format.\n"))
sh.log.Debug("Invalid payload format.")
log.Debug("Invalid payload format.")
} else {
response.Write([]byte("Invalid MessageType.\n"))
sh.log.Debug("Invalid MessageType.")
log.Debug("Invalid MessageType.")
}
return
}
Expand All @@ -127,7 +128,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
sh.telemetry.invalidCount.Add(1.0)
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Strings must be UTF-8.\n"))
sh.log.Debug("Strings must be UTF-8.")
log.Debug("Strings must be UTF-8.")
return
}
eventType = msg.FindEventStringSubMatch()
Expand All @@ -138,7 +139,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
response.WriteHeader(http.StatusAccepted)
response.Write([]byte("Request placed on to queue.\n"))

sh.log.Debug("event passed to senders.", zap.Any("event", msg))
log.Debug("event passed to senders.", zap.Any("event", msg))
}

func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {
Expand Down Expand Up @@ -201,7 +202,6 @@ func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTeleme
senderWrapper: senderWrapper,
Logger: log,
},
log: log,
telemetry: t,
maxOutstanding: maxOutstanding,
incomingQueueDepth: incomingQueueDepth,
Expand Down
30 changes: 8 additions & 22 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/xmidt-org/httpaux/retry"
"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"

"github.com/xmidt-org/webpa-common/v2/semaphore"
"github.com/xmidt-org/webpa-common/v2/xhttp"
"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/wrp-go/v3/wrphttp"
)
Expand Down Expand Up @@ -579,33 +578,20 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri
// find the event "short name"
event := msg.FindEventStringSubMatch()

//TODO: do this need to be replaced by the retry repo?
retryOptions := xhttp.RetryOptions{
Logger: obs.logger,
/*TODO: need middleware for:
Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event)
Logger
Update Request
*/
retryConfig := retry.Config{
Retries: obs.deliveryRetries,
Interval: obs.deliveryInterval,
// Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), //webpa retry does not accept prometheus metrics
// Always retry on failures up to the max count.
ShouldRetry: xhttp.ShouldRetry,
ShouldRetryStatus: xhttp.RetryCodes,
}

// update subsequent requests with the next url in the list upon failure
retryOptions.UpdateRequest = func(request *http.Request) {
urls = urls.Next()
tmp, err := url.Parse(urls.Value.(string))
if err != nil {
obs.logger.Error("failed to update url", zap.String("url", urls.Value.(string)), zap.Error(err))
return
}
request.URL = tmp
}

// Send it
obs.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do)
client := obs.clientMiddleware(doerFunc(retryer))
client := retry.New(retryConfig, obs.clientMiddleware(obs.sender))
resp, err := client.Do(req)

code := "failure"
Expand Down
4 changes: 2 additions & 2 deletions primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain,
endpoints = append(endpoints, r)
}
m := basculehelper.MetricValidator{
C: basculehelper.CapabilitiesValidator{Checker: c},
// Measures: capabilityCheckMeasures,
C: basculehelper.CapabilitiesValidator{Checker: c},
Endpoints: endpoints,
// Measures: capabilityCheckMeasures,
}
bearerRules = append(bearerRules, m.CreateValidator(capabilityCheck.Type == "enforce"))
}
Expand Down

0 comments on commit 1dc32f3

Please sign in to comment.