Skip to content

Commit

Permalink
Handle resource cleanup on termination within the inventory control s…
Browse files Browse the repository at this point in the history
…tream (#43644)

Historically, each component responsible for managing a particular
resource has been responsible for deleting said resources if
Teleport was shutting down for good. This is done to provide a
better user experience by trying to eliminate the chance of a
stale resource from being present for the full TTL of the backend
item. However, the mechanism to do so requires the process
responsible for the resources to delete each resource individually.
For dynamic resources this could require several thousand Delete RPCs
during shutdown. Since the shutdown process is time bound this could
result in some deletions from being successful and lead to the same
stale resource issues. This also poses a problem and races with any
inbound heartbeats being sent via the inventory control stream. All
resource deletion mechanism are unary RPCs that get processed
outside of the inventory control stream, which without careful
coordination could result in all the deletions occurring before any
in flight heartbeats are processed by the inventory control stream.

In an attempt to simplify the deletion process a new
UpstreamInventoryGoodbye message has been added to the inventory
control stream in order to allow deletion to occur in-band.
Instead of sending individual delete RPCs when a process is
terminating it can now send a single UpstreamInventoryGoodbye via
the inventory control stream. Any control streams that receive a
GoodBye prior to being terminated indicate to auth that it should
remove all resources that were being represented by said stream.
Additionally, the DownstreamInventoryHello was updated to include
supported capabilities to better coordinate backward compatibility
and supported features by the auth server. This allows the agent
to get a better understanding of what the auth server supports
without having to do a series of version checks.
  • Loading branch information
rosstimothy authored Jul 13, 2024
1 parent e22c004 commit 5205396
Show file tree
Hide file tree
Showing 12 changed files with 2,565 additions and 1,004 deletions.
8 changes: 7 additions & 1 deletion api/client/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func (i *downstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStr
oneOf.Msg = &proto.UpstreamInventoryOneOf_AgentMetadata{
AgentMetadata: &msg,
}
case proto.UpstreamInventoryGoodbye:
oneOf.Msg = &proto.UpstreamInventoryOneOf_Goodbye{
Goodbye: &msg,
}
default:
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
continue
Expand Down Expand Up @@ -478,6 +482,8 @@ func (i *upstreamICS) runRecvLoop(stream proto.AuthService_InventoryControlStrea
msg = *oneOf.GetPong()
case oneOf.GetAgentMetadata() != nil:
msg = *oneOf.GetAgentMetadata()
case oneOf.GetGoodbye() != nil:
msg = *oneOf.GetGoodbye()
default:
slog.WarnContext(stream.Context(), "received unknown upstream message", "message", oneOf)
continue
Expand Down Expand Up @@ -513,7 +519,7 @@ func (i *upstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStrea
UpdateLabels: &msg,
}
default:
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
sendMsg.errC <- trace.BadParameter("cannot send unexpected downstream msg type: %T", msg)
continue
}
err := stream.Send(&oneOf)
Expand Down
3,095 changes: 2,166 additions & 929 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/client/proto/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (p UpstreamInventoryPong) sealedUpstreamInventoryMessage() {}

func (a UpstreamInventoryAgentMetadata) sealedUpstreamInventoryMessage() {}

func (h UpstreamInventoryGoodbye) sealedUpstreamInventoryMessage() {}

// DownstreamInventoryMessage is a sealed interface representing the possible
// downstream messages of the inventory controls stream after initial hello.
type DownstreamInventoryMessage interface {
Expand Down
55 changes: 55 additions & 0 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,8 @@ message UpstreamInventoryOneOf {
UpstreamInventoryPong Pong = 3;
// UpstreamInventoryAgentMetadata advertises instance metadata.
UpstreamInventoryAgentMetadata AgentMetadata = 4;
// UpstreamInventoryGoodbye advertises that the instance is terminating.
UpstreamInventoryGoodbye Goodbye = 5;
}
}

Expand Down Expand Up @@ -2347,6 +2349,51 @@ message DownstreamInventoryHello {
string Version = 1;
// ServerID advertises the server ID of the auth server.
string ServerID = 2;

// SupportedCapabilities indicate which features of the ICS that
// the connect auth server supports. This allows agents to determine
// how they should interact with the auth server to maintain compatibility.
message SupportedCapabilities {
// ProxyHeartbeats indicates the ICS supports heartbeating proxy servers.
bool ProxyHeartbeats = 1;
// ProxyCleanup indicates the ICS supports deleting proxies when UpstreamInventoryGoodbye.DeleteResources is set.
bool ProxyCleanup = 2;
// ProxyHeartbeats indicates the ICS supports heartbeating proxy servers.
bool AuthHeartbeats = 3;
// ProxyCleanup indicates the ICS supports deleting proxies when UpstreamInventoryGoodbye.DeleteResources is set.
bool AuthCleanup = 4;
// NodeHeartbeats indicates the ICS supports heartbeating ssh servers.
bool NodeHeartbeats = 5;
// NodeCleanup indicates the ICS supports deleting nodes when UpstreamInventoryGoodbye.DeleteResources is set.
bool NodeCleanup = 6;
// AppHeartbeats indicates the ICS supports heartbeating app servers.
bool AppHeartbeats = 7;
// AppCleanup indicates the ICS supports deleting apps when UpstreamInventoryGoodbye.DeleteResources is set.
bool AppCleanup = 8;
// DatabaseHeartbeats indicates the ICS supports heartbeating databases.
bool DatabaseHeartbeats = 9;
// DatabaseCleanup indicates the ICS supports deleting databases when UpstreamInventoryGoodbye.DeleteResources is set.
bool DatabaseCleanup = 10;
// DatabaseServiceHeartbeats indicates the ICS supports heartbeating databse services.
bool DatabaseServiceHeartbeats = 11;
// DatabaseServiceCleanup indicates the ICS supports deleting database services when UpstreamInventoryGoodbye.DeleteResources is set.
bool DatabaseServiceCleanup = 12;
// WindowsDesktopHeartbeats indicates the ICS supports heartbeating windows desktop servers.
bool WindowsDesktopHeartbeats = 13;
// WindowsDesktopCleanup indicates the ICS supports deleting windows desktops when UpstreamInventoryGoodbye.DeleteResources is set.
bool WindowsDesktopCleanup = 14;
// WindowsDesktopHeartbeats indicates the ICS supports heartbeating windows desktop services.
bool WindowsDesktopServiceHeartbeats = 15;
// WindowsDesktopCleanup indicates the ICS supports deleting windows desktop services when UpstreamInventoryGoodbye.DeleteResources is set.
bool WindowsDesktopServiceCleanup = 16;
// KubernetesHeartbeats indicates the ICS supports heartbeating kubernetes clusters.
bool KubernetesHeartbeats = 17;
// KubernetesCleanup indicates the ICS supports deleting kubernetes clusters when UpstreamInventoryGoodbye.DeleteResources is set.
bool KubernetesCleanup = 18;
}

// SupportedCapabilities advertises the supported features of the auth server.
SupportedCapabilities Capabilities = 3;
}

// LabelUpdateKind is the type of service to update labels for.
Expand Down Expand Up @@ -2389,6 +2436,14 @@ message InventoryHeartbeat {
types.AppServerV3 AppServer = 2;
}

// UpstreamInventoryGoodbye informs the upstream service that instance
// is terminating
message UpstreamInventoryGoodbye {
// DeleteResources indicates that any heartbeats received from
// the instance should be terminated when the stream is closed.
bool DeleteResources = 1;
}

// InventoryStatusRequest requests inventory status info.
message InventoryStatusRequest {
// Connected requests summary of the inventory control streams registered with
Expand Down
5 changes: 5 additions & 0 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4537,6 +4537,11 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
downstreamHello := proto.DownstreamInventoryHello{
Version: teleport.Version,
ServerID: a.ServerID,
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
NodeHeartbeats: true,
AppHeartbeats: true,
AppCleanup: true,
},
}
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {
return trace.Wrap(err)
Expand Down
12 changes: 12 additions & 0 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
type Auth interface {
UpsertNode(context.Context, types.Server) (*types.KeepAlive, error)
UpsertApplicationServer(context.Context, types.AppServer) (*types.KeepAlive, error)
DeleteApplicationServer(ctx context.Context, namespace, hostID, name string) error

KeepAliveServer(context.Context, types.KeepAlive) error

Expand Down Expand Up @@ -319,6 +320,15 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
}

defer func() {
if handle.goodbye.GetDeleteResources() {
log.WithField("apps", len(handle.appServers)).Debug("Cleaning up resources in response to instance termination")
for _, app := range handle.appServers {
if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err)
}
}
}

c.instanceHBVariableDuration.Dec()
for _, service := range handle.hello.Services {
c.serviceCounter.decrement(service)
Expand Down Expand Up @@ -371,6 +381,8 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
}
case proto.UpstreamInventoryPong:
c.handlePong(handle, m)
case proto.UpstreamInventoryGoodbye:
handle.goodbye = m
default:
log.Warnf("Unexpected upstream message type %T on control stream of server %q.", m, handle.Hello().ServerID)
handle.CloseWithError(trace.BadParameter("unexpected upstream message type %T", m))
Expand Down
184 changes: 180 additions & 4 deletions lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ import (
"context"
"fmt"
"os"
"runtime"
"slices"
"sync"
"testing"
"time"

"github.com/gravitational/trace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/inventory/metadata"
usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport"
"github.com/gravitational/teleport/lib/utils"
)
Expand Down Expand Up @@ -90,6 +91,10 @@ func (a *fakeAuth) UpsertApplicationServer(_ context.Context, server types.AppSe
return &types.KeepAlive{}, a.err
}

func (a *fakeAuth) DeleteApplicationServer(ctx context.Context, namespace, hostID, name string) error {
return nil
}

func (a *fakeAuth) KeepAliveServer(_ context.Context, _ types.KeepAlive) error {
a.mu.Lock()
defer a.mu.Unlock()
Expand Down Expand Up @@ -723,6 +728,19 @@ func TestUpdateLabels(t *testing.T) {
func TestAgentMetadata(t *testing.T) {
// set the install method to validate it was returned as agent metadata
t.Setenv("TELEPORT_INSTALL_METHOD_AWSOIDC_DEPLOYSERVICE", "true")
metadataGetter = func(ctx context.Context) (*metadata.Metadata, error) {
return &metadata.Metadata{
OS: "llamaOS",
OSVersion: "1.2.3",
HostArchitecture: "llama",
GlibcVersion: "llama.5.6.7",
InstallMethods: []string{"llama", "alpaca"},
ContainerRuntime: "test",
ContainerOrchestrator: "test",
CloudEnvironment: "llama-cloud",
}, nil
}

const serverID = "test-instance"
const peerAddr = "1.2.3.4:456"

Expand Down Expand Up @@ -772,9 +790,167 @@ func TestAgentMetadata(t *testing.T) {

// Validate that the agent's metadata ends up in the auth server.
require.Eventually(t, func() bool {
return slices.Contains(upstreamHandle.AgentMetadata().InstallMethods, "awsoidc_deployservice") &&
upstreamHandle.AgentMetadata().OS == runtime.GOOS
}, 5*time.Second, 200*time.Millisecond)
return slices.Equal([]string{"llama", "alpaca"}, upstreamHandle.AgentMetadata().InstallMethods) &&
upstreamHandle.AgentMetadata().OS == "llamaOS"
}, 10*time.Second, 200*time.Millisecond)
}

func TestGoodbye(t *testing.T) {
t.Parallel()

tests := []struct {
name string
supportsGoodbye bool
}{
{
name: "no goodbye",
},
{
name: "goodbye",
supportsGoodbye: true,
},
}

upstreamHello := proto.UpstreamInventoryHello{
ServerID: "llama",
Version: teleport.Version,
Services: []types.SystemRole{types.RoleNode, types.RoleApp},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
controller := NewController(
&fakeAuth{},
usagereporter.DiscardUsageReporter{},
withInstanceHBInterval(time.Millisecond*200),
)
defer controller.Close()

// Set up fake in-memory control stream.
upstream, downstream := client.InventoryControlStreamPipe(client.ICSPipePeerAddr("127.0.0.1:8090"))

downstreamHello := proto.DownstreamInventoryHello{
Version: teleport.Version,
ServerID: "auth",
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
AppCleanup: test.supportsGoodbye,
AppHeartbeats: true,
NodeHeartbeats: true,
},
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
handle := NewDownstreamHandle(func(ctx context.Context) (client.DownstreamInventoryControlStream, error) {
return downstream, nil
}, upstreamHello)

// Wait for upstream hello.
select {
case msg := <-upstream.Recv():
require.Equal(t, upstreamHello, msg)
case <-ctx.Done():
require.Fail(t, "never got upstream hello")
}
require.NoError(t, upstream.Send(ctx, downstreamHello))

// Attempt to send a goodbye.
go func() {
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
assert.NoError(t, handle.SendGoodbye(ctx))
// Close the handle to unblock receive below.
assert.NoError(t, handle.Close())
}()

// Wait to see if a goodbye is received.
timeoutC := time.After(10 * time.Second)
for {
select {
case msg := <-upstream.Recv():
switch msg.(type) {
case proto.UpstreamInventoryHello, proto.InventoryHeartbeat,
proto.UpstreamInventoryPong, proto.UpstreamInventoryAgentMetadata:
case proto.UpstreamInventoryGoodbye:
if test.supportsGoodbye {
require.Equal(t, proto.UpstreamInventoryGoodbye{DeleteResources: true}, msg)
} else {
t.Fatalf("received an unexpected message %v", msg)
}
return
}
case <-upstream.Done():
return
case <-timeoutC:
if test.supportsGoodbye {
require.FailNow(t, "timeout waiting for goodbye message")
} else {
return
}
}
}
})
}
}

func TestGetSender(t *testing.T) {

controller := NewController(
&fakeAuth{},
usagereporter.DiscardUsageReporter{},
withInstanceHBInterval(time.Millisecond*200),
)
defer controller.Close()

// Set up fake in-memory control stream.
upstream, downstream := client.InventoryControlStreamPipe(client.ICSPipePeerAddr("127.0.0.1:8090"))

downstreamHello := proto.DownstreamInventoryHello{
Version: teleport.Version,
ServerID: "auth",
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
AppCleanup: true,
AppHeartbeats: true,
NodeHeartbeats: true,
},
}

upstreamHello := proto.UpstreamInventoryHello{
ServerID: "llama",
Version: teleport.Version,
Services: []types.SystemRole{types.RoleNode, types.RoleApp},
}

handle := NewDownstreamHandle(func(ctx context.Context) (client.DownstreamInventoryControlStream, error) {
return downstream, nil
}, upstreamHello)

// Validate that the sender is not present prior to
// the stream becoming healthy.
s, ok := handle.GetSender()
require.False(t, ok)
require.Nil(t, s)

// Wait for upstream hello.
select {
case msg := <-upstream.Recv():
require.Equal(t, upstreamHello, msg)
case <-time.After(5 * time.Second):
require.Fail(t, "never got upstream hello")
}
// Send the downstream hello so that the
// sender becomes available.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
require.NoError(t, upstream.Send(ctx, downstreamHello))

// Validate that once healthy the sender is provided.
require.EventuallyWithT(t, func(t *assert.CollectT) {
s, ok = handle.GetSender()
assert.True(t, ok)
assert.NotNil(t, s)
}, 10*time.Second, 100*time.Millisecond)
}

type eventOpts struct {
Expand Down
Loading

0 comments on commit 5205396

Please sign in to comment.