From 4317f2ab2af35c24605f2a623af8f4113564d45f Mon Sep 17 00:00:00 2001 From: basti Date: Sun, 7 Dec 2025 13:29:37 +0100 Subject: [PATCH 1/3] feat: enhance metrics and logging in QueueManager --- lib/metrics.go | 2 +- lib/queue_manager.go | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/lib/metrics.go b/lib/metrics.go index 417ff5d..14a3dc8 100644 --- a/lib/metrics.go +++ b/lib/metrics.go @@ -22,7 +22,7 @@ var ( ConnectionsOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "nirn_proxy_open_connections", Help: "Gauge for client connections currently open with the proxy", - }, []string{"method", "route"}) + }, []string{"method", "route", "clientId"}) RequestsRoutedSent = promauto.NewCounter(prometheus.CounterOpts{ Name: "nirn_proxy_requests_routed_sent", diff --git a/lib/queue_manager.go b/lib/queue_manager.go index e5346dd..3e3841e 100644 --- a/lib/queue_manager.go +++ b/lib/queue_manager.go @@ -45,6 +45,10 @@ type QueueManager struct { } func onEvictLruItem(key interface{}, value interface{}) { + // Log evictions so we can observe if bearer queues are being evicted unexpectedly + logger.WithFields(logrus.Fields{ + "evicted_key": key, + }).Warn("Evicting bearer queue from LRU; destroying queue") go value.(*RequestQueue).destroy() } @@ -63,6 +67,8 @@ func NewQueueManager(bufferSize int, maxBearerLruSize int) *QueueManager { clusterGlobalRateLimiter: NewClusterGlobalRateLimiter(), } + logger.WithFields(logrus.Fields{"bufferSize": bufferSize, "maxBearerLruSize": maxBearerLruSize}).Trace("Created QueueManager") + return q } @@ -123,14 +129,17 @@ func (m *QueueManager) SetCluster(cluster *memberlist.Memberlist, proxyPort stri func (m *QueueManager) calculateRoute(pathHash uint64) string { if m.cluster == nil { // Route to self, proxy in stand-alone mode + logger.WithField("pathHash", pathHash).Trace("Cluster nil; routing to self") return "" } if pathHash == 0 { + logger.WithField("pathHash", pathHash).Trace("Path hash 0; routing to self") return "" } if _, ok := pathsToRouteLocally[pathHash]; ok { + logger.WithField("pathHash", pathHash).Trace("Path is in pathsToRouteLocally; routing to self") return "" } @@ -143,11 +152,13 @@ func (m *QueueManager) calculateRoute(pathHash uint64) string { count := uint64(len(members)) if count == 0 { + logger.Trace("No cluster members available; routing to self") return "" } chosenIndex := pathHash % count addr := m.nameToAddressMap[members[chosenIndex]] + logger.WithFields(logrus.Fields{"chosenIndex": chosenIndex, "addr": addr, "local": m.localNodeProxyListenAddr}).Trace("Calculated route for pathHash") if addr == m.localNodeProxyListenAddr { return "" } @@ -177,6 +188,7 @@ func (m *QueueManager) routeRequest(addr string, req *http.Request) (*http.Respo RequestsRoutedSent.Inc() } else { RequestsRoutedError.Inc() + logger.WithFields(logrus.Fields{"to": addr, "path": req.URL.Path}).WithError(err).Warn("Error while routing request to node") } return resp, err @@ -215,6 +227,7 @@ func (m *QueueManager) getOrCreateBotQueue(token string) (*RequestQueue, error) } m.queues[token] = q + logger.WithFields(logrus.Fields{"queue": token, "bufferSize": m.bufferSize}).Trace("Created new bot queue") } } @@ -240,6 +253,7 @@ func (m *QueueManager) getOrCreateBearerQueue(token string) (*RequestQueue, erro } m.bearerQueues.Add(token, q) + logger.WithFields(logrus.Fields{"bearer": "***REDACTED***", "queue_present": true}).Trace("Created new bearer queue") } } @@ -249,10 +263,16 @@ func (m *QueueManager) getOrCreateBearerQueue(token string) (*RequestQueue, erro func (m *QueueManager) DiscordRequestHandler(resp http.ResponseWriter, req *http.Request) { reqStart := time.Now() metricsPath := GetMetricsPath(req.URL.Path) - ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method}).Inc() - defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method}).Dec() token := req.Header.Get("Authorization") + clientId := req.Header.Get("X-Client-Id") + if clientId == "" { + clientId = GetBotId(token) + } + + ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Inc() + defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Dec() + routingHash, path, queueType := m.GetRequestRoutingInfo(req, token) m.fulfillRequest(&resp, req, queueType, path, routingHash, token, reqStart) @@ -284,6 +304,9 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque routeToHeader := req.Header.Get("nirn-routed-to") req.Header.Del("nirn-routed-to") + logEntry = logEntry.WithFields(logrus.Fields{"routeTo": routeTo, "routeToHeader": routeToHeader, "queueType": queueType, "path": path, "pathHash": pathHash}) + logEntry.Trace("fulfillRequest start") + if routeToHeader != "" { RequestsRoutedRecv.Inc() } @@ -320,8 +343,12 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque botLimit := q.botLimit globalRouteTo := m.calculateRoute(botHash) + logEntry = logEntry.WithFields(logrus.Fields{"botHash": botHash, "botLimit": botLimit, "globalRouteTo": globalRouteTo}) + logEntry.Trace("Queue has identifier; checking cluster global limiter") + if globalRouteTo == "" || queueType == Bearer { m.clusterGlobalRateLimiter.Take(botHash, botLimit) + logEntry.Trace("Local cluster global limiter taken") } else { err = m.clusterGlobalRateLimiter.FireGlobalRequest(req.Context(), globalRouteTo, botHash, botLimit) if err != nil { @@ -330,6 +357,7 @@ func (m *QueueManager) fulfillRequest(resp *http.ResponseWriter, req *http.Reque Generate429(resp) return } + logEntry.Trace("Dispatched global request to remote node via clusterGlobalRateLimiter") } } err = q.Queue(req, resp, path, pathHash) From 56dcf38e7fcfaa070ebec456f6068af9ea24773a Mon Sep 17 00:00:00 2001 From: basti Date: Sun, 7 Dec 2025 13:38:09 +0100 Subject: [PATCH 2/3] fix: correct clientId initialization in QueueManager --- lib/queue_manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/queue_manager.go b/lib/queue_manager.go index 3e3841e..2fd0633 100644 --- a/lib/queue_manager.go +++ b/lib/queue_manager.go @@ -265,10 +265,10 @@ func (m *QueueManager) DiscordRequestHandler(resp http.ResponseWriter, req *http metricsPath := GetMetricsPath(req.URL.Path) token := req.Header.Get("Authorization") - clientId := req.Header.Get("X-Client-Id") - if clientId == "" { - clientId = GetBotId(token) - } + clientId := "" + if clientId == "" { + clientId = GetBotId(token) + } ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Inc() defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Dec() From f1b79a495fec5a54bc8b72368d41656d76e827b1 Mon Sep 17 00:00:00 2001 From: basti Date: Sun, 7 Dec 2025 13:43:40 +0100 Subject: [PATCH 3/3] fix: simplify clientId initialization in QueueManager --- lib/queue_manager.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/queue_manager.go b/lib/queue_manager.go index 2fd0633..8564193 100644 --- a/lib/queue_manager.go +++ b/lib/queue_manager.go @@ -265,10 +265,7 @@ func (m *QueueManager) DiscordRequestHandler(resp http.ResponseWriter, req *http metricsPath := GetMetricsPath(req.URL.Path) token := req.Header.Get("Authorization") - clientId := "" - if clientId == "" { - clientId = GetBotId(token) - } + clientId := GetBotId(token) ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Inc() defer ConnectionsOpen.With(map[string]string{"route": metricsPath, "method": req.Method, "clientId": clientId}).Dec()