Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
ConnectionsOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "nirn_proxy_open_connections",
Help: "Gauge for client connections currently open with the proxy",
}, []string{"method", "route"})
}, []string{"method", "route", "clientId"})

RequestsRoutedSent = promauto.NewCounter(prometheus.CounterOpts{
Name: "nirn_proxy_requests_routed_sent",
Expand Down
29 changes: 27 additions & 2 deletions lib/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type QueueManager struct {
}

func onEvictLruItem(key interface{}, value interface{}) {
// Log evictions so we can observe if bearer queues are being evicted unexpectedly
logger.WithFields(logrus.Fields{
"evicted_key": key,
}).Warn("Evicting bearer queue from LRU; destroying queue")
go value.(*RequestQueue).destroy()
}

Expand All @@ -63,6 +67,8 @@ func NewQueueManager(bufferSize int, maxBearerLruSize int) *QueueManager {
clusterGlobalRateLimiter: NewClusterGlobalRateLimiter(),
}

logger.WithFields(logrus.Fields{"bufferSize": bufferSize, "maxBearerLruSize": maxBearerLruSize}).Trace("Created QueueManager")

return q
}

Expand Down Expand Up @@ -123,14 +129,17 @@ func (m *QueueManager) SetCluster(cluster *memberlist.Memberlist, proxyPort stri
func (m *QueueManager) calculateRoute(pathHash uint64) string {
if m.cluster == nil {
// Route to self, proxy in stand-alone mode
logger.WithField("pathHash", pathHash).Trace("Cluster nil; routing to self")
return ""
}

if pathHash == 0 {
logger.WithField("pathHash", pathHash).Trace("Path hash 0; routing to self")
return ""
}

if _, ok := pathsToRouteLocally[pathHash]; ok {
logger.WithField("pathHash", pathHash).Trace("Path is in pathsToRouteLocally; routing to self")
return ""
}

Expand All @@ -143,11 +152,13 @@ func (m *QueueManager) calculateRoute(pathHash uint64) string {
count := uint64(len(members))

if count == 0 {
logger.Trace("No cluster members available; routing to self")
return ""
}

chosenIndex := pathHash % count
addr := m.nameToAddressMap[members[chosenIndex]]
logger.WithFields(logrus.Fields{"chosenIndex": chosenIndex, "addr": addr, "local": m.localNodeProxyListenAddr}).Trace("Calculated route for pathHash")
if addr == m.localNodeProxyListenAddr {
return ""
}
Expand Down Expand Up @@ -177,6 +188,7 @@ func (m *QueueManager) routeRequest(addr string, req *http.Request) (*http.Respo
RequestsRoutedSent.Inc()
} else {
RequestsRoutedError.Inc()
logger.WithFields(logrus.Fields{"to": addr, "path": req.URL.Path}).WithError(err).Warn("Error while routing request to node")
}

return resp, err
Expand Down Expand Up @@ -215,6 +227,7 @@ func (m *QueueManager) getOrCreateBotQueue(token string) (*RequestQueue, error)
}

m.queues[token] = q
logger.WithFields(logrus.Fields{"queue": token, "bufferSize": m.bufferSize}).Trace("Created new bot queue")
}
}

Expand All @@ -240,6 +253,7 @@ func (m *QueueManager) getOrCreateBearerQueue(token string) (*RequestQueue, erro
}

m.bearerQueues.Add(token, q)
logger.WithFields(logrus.Fields{"bearer": "***REDACTED***", "queue_present": true}).Trace("Created new bearer queue")
}
}

Expand All @@ -249,10 +263,13 @@ func (m *QueueManager) getOrCreateBearerQueue(token string) (*RequestQueue, erro
func (m *QueueManager) DiscordRequestHandler(resp http.ResponseWriter, req *http.Request) {
reqStart := time.Now()
metricsPath := GetMetricsPath(req.URL.Path)
ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method}).Inc()
defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method}).Dec()

token := req.Header.Get("Authorization")
clientId := GetBotId(token)

ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Inc()
defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Dec()

routingHash, path, queueType := m.GetRequestRoutingInfo(req, token)

m.fulfillRequest(&resp, req, queueType, path, routingHash, token, reqStart)
Expand Down Expand Up @@ -284,6 +301,9 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque
routeToHeader := req.Header.Get("nirn-routed-to")
req.Header.Del("nirn-routed-to")

logEntry = logEntry.WithFields(logrus.Fields{"routeTo": routeTo, "routeToHeader": routeToHeader, "queueType": queueType, "path": path, "pathHash": pathHash})
logEntry.Trace("fulfillRequest start")

if routeToHeader != "" {
RequestsRoutedRecv.Inc()
}
Expand Down Expand Up @@ -320,8 +340,12 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque
botLimit := q.botLimit
globalRouteTo := m.calculateRoute(botHash)

logEntry = logEntry.WithFields(logrus.Fields{"botHash": botHash, "botLimit": botLimit, "globalRouteTo": globalRouteTo})
logEntry.Trace("Queue has identifier; checking cluster global limiter")

if globalRouteTo == "" || queueType == Bearer {
m.clusterGlobalRateLimiter.Take(botHash, botLimit)
logEntry.Trace("Local cluster global limiter taken")
} else {
err = m.clusterGlobalRateLimiter.FireGlobalRequest(req.Context(), globalRouteTo, botHash, botLimit)
if err != nil {
Expand All @@ -330,6 +354,7 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque
Generate429(resp)
return
}
logEntry.Trace("Dispatched global request to remote node via clusterGlobalRateLimiter")
}
}
err = q.Queue(req, resp, path, pathHash)
Expand Down
Loading