From 0f469ec0d0f39b370d19d70712350beb044244de Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Fri, 25 Oct 2024 18:32:46 +0200 Subject: [PATCH] Fix connected resource counts after keepalive errors (#47931) (#47951) * Fix connected resource counts after keepalive errors * Log server_id when cleaning up resources --- lib/auth/auth.go | 4 +-- lib/inventory/controller.go | 32 ++++++++++++---------- lib/inventory/controller_test.go | 47 +++++++++++++++++++++++++++++++- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 5cf485c64c09a..c8a53890645e8 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -410,9 +410,9 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) { log.Warnf("missing connected resources gauge for keep alive %s (this is a bug)", s) } }), - inventory.WithOnDisconnect(func(s string) { + inventory.WithOnDisconnect(func(s string, c int) { if g, ok := connectedResourceGauges[s]; ok { - g.Dec() + g.Sub(float64(c)) } else { log.Warnf("missing connected resources gauge for keep alive %s (this is a bug)", s) } diff --git a/lib/inventory/controller.go b/lib/inventory/controller.go index 953ab8e9fcfc7..5b501cf33e032 100644 --- a/lib/inventory/controller.go +++ b/lib/inventory/controller.go @@ -98,7 +98,7 @@ type controllerOptions struct { authID string instanceHeartbeats bool onConnectFunc func(string) - onDisconnectFunc func(string) + onDisconnectFunc func(string, int) } func (options *controllerOptions) SetDefaults() { @@ -125,11 +125,11 @@ func (options *controllerOptions) SetDefaults() { } if options.onConnectFunc == nil { - options.onConnectFunc = func(s string) {} + options.onConnectFunc = func(string) {} } if options.onDisconnectFunc == nil { - options.onDisconnectFunc = func(s string) {} + options.onDisconnectFunc = func(string, int) {} } } @@ -159,12 +159,12 @@ func WithOnConnect(f func(heartbeatKind string)) ControllerOption { } } -// WithOnDisconnect sets a function to be called every time an existing -// instance disconnects from the inventory control stream. The value -// provided to the callback is the keep alive type of the disconnected -// resource. The callback should return quickly so as not to prevent -// processing of heartbeats. -func WithOnDisconnect(f func(heartbeatKind string)) ControllerOption { +// WithOnDisconnect sets a function to be called every time an existing instance +// disconnects from the inventory control stream. The values provided to the +// callback are the keep alive type of the disconnected resource, as well as a +// count of how many resources disconnected at once. The callback should return +// quickly so as not to prevent processing of heartbeats. +func WithOnDisconnect(f func(heartbeatKind string, amount int)) ControllerOption { return func(opts *controllerOptions) { opts.onDisconnectFunc = f } @@ -204,7 +204,7 @@ type Controller struct { usageReporter usagereporter.UsageReporter testEvents chan testEvent onConnectFunc func(string) - onDisconnectFunc func(string) + onDisconnectFunc func(string, int) closeContext context.Context cancel context.CancelFunc @@ -312,7 +312,10 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { defer func() { if handle.goodbye.GetDeleteResources() { - log.WithField("apps", len(handle.appServers)).Debug("Cleaning up resources in response to instance termination") + log.WithFields(log.Fields{ + "apps": len(handle.appServers), + "server_id": handle.Hello().ServerID, + }).Debug("Cleaning up resources in response to instance termination") for _, app := range handle.appServers { if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) { log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err) @@ -328,11 +331,11 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { handle.ticker.Stop() if handle.sshServer != nil { - c.onDisconnectFunc(constants.KeepAliveNode) + c.onDisconnectFunc(constants.KeepAliveNode, 1) } - for range handle.appServers { - c.onDisconnectFunc(constants.KeepAliveApp) + if len(handle.appServers) > 0 { + c.onDisconnectFunc(constants.KeepAliveApp, len(handle.appServers)) } clear(handle.appServers) @@ -671,6 +674,7 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e if shouldRemove { c.testEvent(appKeepAliveDel) + c.onDisconnectFunc(constants.KeepAliveApp, 1) delete(handle.appServers, name) } } else { diff --git a/lib/inventory/controller_test.go b/lib/inventory/controller_test.go index d5eab3c9b1efb..03b8b9e67c00a 100644 --- a/lib/inventory/controller_test.go +++ b/lib/inventory/controller_test.go @@ -142,11 +142,14 @@ func TestSSHServerBasics(t *testing.T) { expectAddr: wantAddr, } + rc := &resourceCounter{} controller := NewController( auth, usagereporter.DiscardUsageReporter{}, withServerKeepAlive(time.Millisecond*200), withTestEventsChannel(events), + WithOnConnect(rc.onConnect), + WithOnDisconnect(rc.onDisconnect), ) defer controller.Close() @@ -272,6 +275,9 @@ func TestSSHServerBasics(t *testing.T) { t.Fatal("timeout waiting for handle closure") } + // verify that metrics have been updated correctly + require.Zero(t, 0, rc.count()) + // verify that the peer address of the control stream was used to override // zero-value IPs for heartbeats. auth.mu.Lock() @@ -295,11 +301,14 @@ func TestAppServerBasics(t *testing.T) { auth := &fakeAuth{} + rc := &resourceCounter{} controller := NewController( auth, usagereporter.DiscardUsageReporter{}, withServerKeepAlive(time.Millisecond*200), withTestEventsChannel(events), + WithOnConnect(rc.onConnect), + WithOnDisconnect(rc.onDisconnect), ) defer controller.Close() @@ -482,6 +491,9 @@ func TestAppServerBasics(t *testing.T) { case <-closeTimeout: t.Fatal("timeout waiting for handle closure") } + + // verify that metrics have been updated correctly + require.Zero(t, rc.count()) } // TestInstanceHeartbeat verifies basic expected behaviors for instance heartbeat. @@ -857,7 +869,6 @@ func TestGoodbye(t *testing.T) { } func TestGetSender(t *testing.T) { - controller := NewController( &fakeAuth{}, usagereporter.DiscardUsageReporter{}, @@ -968,3 +979,37 @@ func awaitEvents(t *testing.T, ch <-chan testEvent, opts ...eventOption) { } } } + +type resourceCounter struct { + mu sync.Mutex + c map[string]int +} + +func (r *resourceCounter) onConnect(typ string) { + r.mu.Lock() + defer r.mu.Unlock() + if r.c == nil { + r.c = make(map[string]int) + } + r.c[typ]++ +} + +func (r *resourceCounter) onDisconnect(typ string, amount int) { + r.mu.Lock() + defer r.mu.Unlock() + if r.c == nil { + r.c = make(map[string]int) + } + r.c[typ] -= amount +} + +func (r *resourceCounter) count() int { + r.mu.Lock() + defer r.mu.Unlock() + + var count int + for _, v := range r.c { + count += v + } + return count +}