diff --git a/lib/metrics.go b/lib/metrics.go index 417ff5d..14a3dc8 100644 --- a/lib/metrics.go +++ b/lib/metrics.go @@ -22,7 +22,7 @@ var ( ConnectionsOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "nirn_proxy_open_connections", Help: "Gauge for client connections currently open with the proxy", - }, []string{"method", "route"}) + }, []string{"method", "route", "clientId"}) RequestsRoutedSent = promauto.NewCounter(prometheus.CounterOpts{ Name: "nirn_proxy_requests_routed_sent", diff --git a/lib/queue_manager.go b/lib/queue_manager.go index e5346dd..8564193 100644 --- a/lib/queue_manager.go +++ b/lib/queue_manager.go @@ -45,6 +45,10 @@ type QueueManager struct { } func onEvictLruItem(key interface{}, value interface{}) { + // Log evictions so we can observe if bearer queues are being evicted unexpectedly + logger.WithFields(logrus.Fields{ + "evicted_key": key, + }).Warn("Evicting bearer queue from LRU; destroying queue") go value.(*RequestQueue).destroy() } @@ -63,6 +67,8 @@ func NewQueueManager(bufferSize int, maxBearerLruSize int) *QueueManager { clusterGlobalRateLimiter: NewClusterGlobalRateLimiter(), } + logger.WithFields(logrus.Fields{"bufferSize": bufferSize, "maxBearerLruSize": maxBearerLruSize}).Trace("Created QueueManager") + return q } @@ -123,14 +129,17 @@ func (m *QueueManager) SetCluster(cluster *memberlist.Memberlist, proxyPort stri func (m *QueueManager) calculateRoute(pathHash uint64) string { if m.cluster == nil { // Route to self, proxy in stand-alone mode + logger.WithField("pathHash", pathHash).Trace("Cluster nil; routing to self") return "" } if pathHash == 0 { + logger.WithField("pathHash", pathHash).Trace("Path hash 0; routing to self") return "" } if _, ok := pathsToRouteLocally[pathHash]; ok { + logger.WithField("pathHash", pathHash).Trace("Path is in pathsToRouteLocally; routing to self") return "" } @@ -143,11 +152,13 @@ func (m *QueueManager) calculateRoute(pathHash uint64) string { count := uint64(len(members)) if count == 0 { + logger.Trace("No cluster members available; routing to self") return "" } chosenIndex := pathHash % count addr := m.nameToAddressMap[members[chosenIndex]] + logger.WithFields(logrus.Fields{"chosenIndex": chosenIndex, "addr": addr, "local": m.localNodeProxyListenAddr}).Trace("Calculated route for pathHash") if addr == m.localNodeProxyListenAddr { return "" } @@ -177,6 +188,7 @@ func (m *QueueManager) routeRequest(addr string, req *http.Request) (*http.Respo RequestsRoutedSent.Inc() } else { RequestsRoutedError.Inc() + logger.WithFields(logrus.Fields{"to": addr, "path": req.URL.Path}).WithError(err).Warn("Error while routing request to node") } return resp, err @@ -215,6 +227,7 @@ func (m *QueueManager) getOrCreateBotQueue(token string) (*RequestQueue, error) } m.queues[token] = q + logger.WithFields(logrus.Fields{"queue": token, "bufferSize": m.bufferSize}).Trace("Created new bot queue") } } @@ -240,6 +253,7 @@ func (m *QueueManager) getOrCreateBearerQueue(token string) (*RequestQueue, erro } m.bearerQueues.Add(token, q) + logger.WithFields(logrus.Fields{"bearer": "***REDACTED***", "queue_present": true}).Trace("Created new bearer queue") } } @@ -249,10 +263,13 @@ func (m *QueueManager) getOrCreateBearerQueue(token string) (*RequestQueue, erro func (m *QueueManager) DiscordRequestHandler(resp http.ResponseWriter, req *http.Request) { reqStart := time.Now() metricsPath := GetMetricsPath(req.URL.Path) - ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method}).Inc() - defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method}).Dec() token := req.Header.Get("Authorization") + clientId := GetBotId(token) + + ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Inc() + defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Dec() + routingHash, path, queueType := m.GetRequestRoutingInfo(req, token) m.fulfillRequest(&resp, req, queueType, path, routingHash, token, reqStart) @@ -284,6 +301,9 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque routeToHeader := req.Header.Get("nirn-routed-to") req.Header.Del("nirn-routed-to") + logEntry = logEntry.WithFields(logrus.Fields{"routeTo": routeTo, "routeToHeader": routeToHeader, "queueType": queueType, "path": path, "pathHash": pathHash}) + logEntry.Trace("fulfillRequest start") + if routeToHeader != "" { RequestsRoutedRecv.Inc() } @@ -320,8 +340,12 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque botLimit := q.botLimit globalRouteTo := m.calculateRoute(botHash) + logEntry = logEntry.WithFields(logrus.Fields{"botHash": botHash, "botLimit": botLimit, "globalRouteTo": globalRouteTo}) + logEntry.Trace("Queue has identifier; checking cluster global limiter") + if globalRouteTo == "" || queueType == Bearer { m.clusterGlobalRateLimiter.Take(botHash, botLimit) + logEntry.Trace("Local cluster global limiter taken") } else { err = m.clusterGlobalRateLimiter.FireGlobalRequest(req.Context(), globalRouteTo, botHash, botLimit) if err != nil { @@ -330,6 +354,7 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque Generate429(resp) return } + logEntry.Trace("Dispatched global request to remote node via clusterGlobalRateLimiter") } } err = q.Queue(req, resp, path, pathHash)