Skip to content

Commit

Permalink
Add telemetry to VNet in Connect (#41587)
Browse files Browse the repository at this point in the history
* Add OnNewConnection to AppProvider

* Cache cluster IDs

* Pass installation ID to tsh daemon

It will be needed to report usage events straight from tsh daemon.
It used to be available only in the Electron app which sent this ID
with every ReportUsageEvent RPC.

* apiserver.New: Create listener after services

This way if initializing a service fails, we don't create a listener
unnecessarily.

* GetCachedClient: Rename argument to clarify usage

* Report usage event on VNet connection

* Remove debug log when app was already reported

* Reuse context in tests

* Extract fake web proxy setup to separate function

* usageReporter.ReportApp: Use background ctx instead of TCP conn ctx
  • Loading branch information
ravicious authored Jun 4, 2024
1 parent d5cd4a1 commit 2d14f0b
Show file tree
Hide file tree
Showing 18 changed files with 700 additions and 109 deletions.
17 changes: 14 additions & 3 deletions integration/teleterm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/teleterm/api/uri"
"github.com/gravitational/teleport/lib/teleterm/apiserver/handler"
"github.com/gravitational/teleport/lib/teleterm/clusteridcache"
"github.com/gravitational/teleport/lib/teleterm/clusters"
"github.com/gravitational/teleport/lib/teleterm/daemon"
libutils "github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestTeleterm(t *testing.T) {
testGetClusterReturnsPropertiesFromAuthServer(t, pack)
})

t.Run("Test headless watcher", func(t *testing.T) {
t.Run("headless watcher", func(t *testing.T) {
t.Parallel()

testHeadlessWatcher(t, pack, creds)
Expand All @@ -124,7 +125,7 @@ func TestTeleterm(t *testing.T) {
testDeleteConnectMyComputerNode(t, pack)
})

t.Run("TestClientCache", func(t *testing.T) {
t.Run("client cache", func(t *testing.T) {
t.Parallel()

testClientCache(t, pack, creds)
Expand Down Expand Up @@ -362,10 +363,13 @@ func testGetClusterReturnsPropertiesFromAuthServer(t *testing.T, pack *dbhelpers
})
require.NoError(t, err)

clusterIDCache := clusteridcache.Cache{}

daemonService, err := daemon.New(daemon.Config{
Storage: storage,
KubeconfigsDir: t.TempDir(),
AgentsDir: t.TempDir(),
ClusterIDCache: &clusterIDCache,
})
require.NoError(t, err)
t.Cleanup(func() {
Expand All @@ -381,15 +385,22 @@ func testGetClusterReturnsPropertiesFromAuthServer(t *testing.T, pack *dbhelpers

rootClusterName, _, err := net.SplitHostPort(pack.Root.Cluster.Web)
require.NoError(t, err)
clusterURI := uri.NewClusterURI(rootClusterName)

response, err := handler.GetCluster(context.Background(), &api.GetClusterRequest{
ClusterUri: uri.NewClusterURI(rootClusterName).String(),
ClusterUri: clusterURI.String(),
})
require.NoError(t, err)

require.Equal(t, userName, response.LoggedInUser.Name)
require.ElementsMatch(t, []string{requestableRoleName}, response.LoggedInUser.RequestableRoles)
require.ElementsMatch(t, []string{suggestedReviewer}, response.LoggedInUser.SuggestedReviewers)

// Verify that cluster ID cache gets updated.
clusterIDFromCache, ok := clusterIDCache.Load(clusterURI)
require.True(t, ok, "ID for cluster %q was not found in the cache", clusterURI)
require.NotEmpty(t, clusterIDFromCache)
require.Equal(t, response.AuthClusterId, clusterIDFromCache)
}

func testHeadlessWatcher(t *testing.T, pack *dbhelpers.DatabasePack, creds *helpers.UserCreds) {
Expand Down
26 changes: 14 additions & 12 deletions lib/teleterm/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,6 @@ func New(cfg Config) (*APIServer, error) {
return nil, trace.Wrap(err)
}

// Create the listener, set up the server.

ls, err := newListener(cfg.HostAddr, cfg.ListeningC)
if err != nil {
return nil, trace.Wrap(err)
}

grpcServer := grpc.NewServer(cfg.TshdServerCreds,
grpc.ChainUnaryInterceptor(withErrorHandling(cfg.Log)),
grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams),
)

// Create Terminal and VNet services.

serviceHandler, err := handler.New(
Expand All @@ -66,11 +54,25 @@ func New(cfg Config) (*APIServer, error) {
vnetService, err := vnet.New(vnet.Config{
DaemonService: cfg.Daemon,
InsecureSkipVerify: cfg.InsecureSkipVerify,
ClusterIDCache: cfg.ClusterIDCache,
InstallationID: cfg.InstallationID,
})
if err != nil {
return nil, trace.Wrap(err)
}

// Create the listener, set up the server.

ls, err := newListener(cfg.HostAddr, cfg.ListeningC)
if err != nil {
return nil, trace.Wrap(err)
}

grpcServer := grpc.NewServer(cfg.TshdServerCreds,
grpc.ChainUnaryInterceptor(withErrorHandling(cfg.Log)),
grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams),
)

api.RegisterTerminalServiceServer(grpcServer, serviceHandler)
vnetapi.RegisterVnetServiceServer(grpcServer, vnetService)

Expand Down
13 changes: 12 additions & 1 deletion lib/teleterm/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/teleterm/clusteridcache"
"github.com/gravitational/teleport/lib/teleterm/daemon"
"github.com/gravitational/teleport/lib/utils"
)
Expand All @@ -34,7 +35,9 @@ type Config struct {
HostAddr string
InsecureSkipVerify bool
// Daemon is the terminal daemon service
Daemon *daemon.Service
Daemon *daemon.Service
ClusterIDCache *clusteridcache.Cache
InstallationID string
// Log is a component logger
Log logrus.FieldLogger
TshdServerCreds grpc.ServerOption
Expand Down Expand Up @@ -65,5 +68,13 @@ func (c *Config) CheckAndSetDefaults() error {
c.Log = logrus.WithField(teleport.ComponentKey, "conn:apiserver")
}

if c.InstallationID == "" {
return trace.BadParameter("missing installation ID")
}

if c.ClusterIDCache == nil {
c.ClusterIDCache = &clusteridcache.Cache{}
}

return nil
}
61 changes: 61 additions & 0 deletions lib/teleterm/clusteridcache/clusteridcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package clusteridcache

import (
"sync"

"github.com/gravitational/teleport/lib/teleterm/api/uri"
)

// Cache stores cluster IDs indexed by their cluster URIs.
//
// Cluster IDs are required when reporting usage events, but they are not publicly known and can be
// fetched only after logging in to a cluster. Today, most events are sent from the Electron app.
// The Electron app caches cluster IDs on its own. However, sometimes we want to send events
// straight from the tsh daemon, in which case we need to know the ID of a cluster.
//
// Whenever the user logs in and fetches full details of a cluster, the cluster ID gets saved to the
// cache. Later on when tsh daemon wants to send a usage event, it can load the cluster ID from the
// cache.
//
// This cache is never cleared since cluster IDs are saved only for root clusters. Logging in to a
// root cluster overwrites existing ID under the same URI.
//
// TODO(ravicious): Refactor usage reporting to operate on cluster URIs instead of cluster IDs and
// keep the cache only on the side of tsh daemon. Fetch a cluster ID whenever it's first requested
// to avoid an issue with trying to send a usage event before the cluster ID is known.
// https://github.com/gravitational/teleport/issues/23030
type Cache struct {
m sync.Map
}

// Store stores the cluster ID for the given uri of a root cluster.
func (c *Cache) Store(uri uri.ResourceURI, clusterID string) {
c.m.Store(uri.String(), clusterID)
}

// Load returns the cluster ID for the given uri of a root cluster.
func (c *Cache) Load(uri uri.ResourceURI) (string, bool) {
id, ok := c.m.Load(uri.String())

if !ok {
return "", false
}

return id.(string), true
}
4 changes: 3 additions & 1 deletion lib/teleterm/clusters/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/teleterm/api/uri"
"github.com/gravitational/teleport/lib/teleterm/clusteridcache"
)

// Cluster describes user settings and access to various resources.
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *Cluster) Connected() bool {
// GetWithDetails makes requests to the auth server to return details of the current
// Cluster that cannot be found on the disk only, including details about the user
// and enabled enterprise features. This method requires a valid cert.
func (c *Cluster) GetWithDetails(ctx context.Context, authClient authclient.ClientI) (*ClusterWithDetails, error) {
func (c *Cluster) GetWithDetails(ctx context.Context, authClient authclient.ClientI, clusterIDCache *clusteridcache.Cache) (*ClusterWithDetails, error) {
var (
clusterPingResponse *webclient.PingResponse
webConfig *webclient.WebConfig
Expand Down Expand Up @@ -142,6 +143,7 @@ func (c *Cluster) GetWithDetails(ctx context.Context, authClient authclient.Clie
return trace.Wrap(err)
}
authClusterID = clusterName.GetClusterID()
clusterIDCache.Store(c.URI, authClusterID)
return nil
})
return trace.Wrap(err)
Expand Down
6 changes: 6 additions & 0 deletions lib/teleterm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Config struct {
KubeconfigsDir string
// AgentsDir contains agent config files and data directories for Connect My Computer.
AgentsDir string
// InstallationID is a unique ID identifying a specific Teleport Connect installation.
InstallationID string
}

// CheckAndSetDefaults checks and sets default config values.
Expand Down Expand Up @@ -77,5 +79,9 @@ func (c *Config) CheckAndSetDefaults() error {
return trace.BadParameter("missing agents directory")
}

if c.InstallationID == "" {
return trace.BadParameter("missing installation ID")
}

return nil
}
9 changes: 9 additions & 0 deletions lib/teleterm/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/client/clientcache"
"github.com/gravitational/teleport/lib/teleterm/api/uri"
"github.com/gravitational/teleport/lib/teleterm/clusteridcache"
"github.com/gravitational/teleport/lib/teleterm/clusters"
"github.com/gravitational/teleport/lib/teleterm/services/connectmycomputer"
)
Expand Down Expand Up @@ -74,6 +75,10 @@ type Config struct {
ConnectMyComputerNodeName *connectmycomputer.NodeName

CreateClientCacheFunc func(resolver clientcache.NewClientFunc) (ClientCache, error)
// ClusterIDCache gets updated whenever daemon.Service.ResolveClusterWithDetails gets called.
// Since that method is called by the Electron app only for root clusters and typically only once
// after a successful login, this cache doesn't have to be cleared.
ClusterIDCache *clusteridcache.Cache
}

// ResolveClusterFunc returns a cluster by URI.
Expand Down Expand Up @@ -174,5 +179,9 @@ func (c *Config) CheckAndSetDefaults() error {
}
}

if c.ClusterIDCache == nil {
c.ClusterIDCache = &clusteridcache.Cache{}
}

return nil
}
8 changes: 4 additions & 4 deletions lib/teleterm/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *Service) ResolveClusterWithDetails(ctx context.Context, uri string) (*c
return nil, nil, trace.Wrap(err)
}

withDetails, err := cluster.GetWithDetails(ctx, proxyClient.CurrentCluster())
withDetails, err := cluster.GetWithDetails(ctx, proxyClient.CurrentCluster(), s.cfg.ClusterIDCache)
if err != nil {
return nil, nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1133,9 +1133,9 @@ func (s *Service) findGatewayByTargetURI(targetURI uri.ResourceURI) (gateway.Gat

// GetCachedClient returns a client from the cache if it exists,
// otherwise it dials the remote server.
func (s *Service) GetCachedClient(ctx context.Context, clusterURI uri.ResourceURI) (*client.ClusterClient, error) {
profileName := clusterURI.GetProfileName()
leafClusterName := clusterURI.GetLeafClusterName()
func (s *Service) GetCachedClient(ctx context.Context, resourceURI uri.ResourceURI) (*client.ClusterClient, error) {
profileName := resourceURI.GetProfileName()
leafClusterName := resourceURI.GetLeafClusterName()
clt, err := s.clientCache.Get(ctx, profileName, leafClusterName)
return clt, trace.Wrap(err)
}
Expand Down
6 changes: 6 additions & 0 deletions lib/teleterm/teleterm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/gravitational/teleport/lib/teleterm/apiserver"
"github.com/gravitational/teleport/lib/teleterm/clusteridcache"
"github.com/gravitational/teleport/lib/teleterm/clusters"
"github.com/gravitational/teleport/lib/teleterm/daemon"
)
Expand All @@ -55,12 +56,15 @@ func Serve(ctx context.Context, cfg Config) error {
return trace.Wrap(err)
}

clusterIDCache := &clusteridcache.Cache{}

daemonService, err := daemon.New(daemon.Config{
Storage: storage,
CreateTshdEventsClientCredsFunc: grpcCredentials.tshdEvents,
PrehogAddr: cfg.PrehogAddr,
KubeconfigsDir: cfg.KubeconfigsDir,
AgentsDir: cfg.AgentsDir,
ClusterIDCache: clusterIDCache,
})
if err != nil {
return trace.Wrap(err)
Expand All @@ -72,6 +76,8 @@ func Serve(ctx context.Context, cfg Config) error {
Daemon: daemonService,
TshdServerCreds: grpcCredentials.tshd,
ListeningC: cfg.ListeningC,
ClusterIDCache: clusterIDCache,
InstallationID: cfg.InstallationID,
})
if err != nil {
return trace.Wrap(err)
Expand Down
1 change: 1 addition & 0 deletions lib/teleterm/teleterm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestStart(t *testing.T) {
ListeningC: listeningC,
KubeconfigsDir: t.TempDir(),
AgentsDir: t.TempDir(),
InstallationID: "foo",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading

0 comments on commit 2d14f0b

Please sign in to comment.