Skip to content

Commit

Permalink
migrate lib/kube/proxy eks to aws sdk v2
Browse files Browse the repository at this point in the history
  • Loading branch information
creack committed Dec 30, 2024
1 parent 03fd997 commit e662c31
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 49 deletions.
57 changes: 40 additions & 17 deletions lib/kube/proxy/cluster_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/eks"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -39,6 +39,7 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/cloud/awsconfig"
"github.com/gravitational/teleport/lib/cloud/azure"
"github.com/gravitational/teleport/lib/cloud/gcp"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
Expand Down Expand Up @@ -85,7 +86,8 @@ type kubeDetails struct {
// clusterDetailsConfig contains the configuration for creating a proxied cluster.
type clusterDetailsConfig struct {
// cloudClients is the cloud clients to use for dynamic clusters.
cloudClients cloud.Clients
awsCloudClients ClientGetter
cloudClients cloud.Clients
// kubeCreds is the credentials to use for the cluster.
kubeCreds kubeCreds
// cluster is the cluster to create a proxied cluster for.
Expand All @@ -103,8 +105,10 @@ type clusterDetailsConfig struct {
component KubeServiceType
}

const defaultRefreshPeriod = 5 * time.Minute
const backoffRefreshStep = 10 * time.Second
const (
defaultRefreshPeriod = 5 * time.Minute
backoffRefreshStep = 10 * time.Second
)

// newClusterDetails creates a proxied kubeDetails structure given a dynamic cluster.
func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDetails, err error) {
Expand Down Expand Up @@ -270,7 +274,7 @@ func getKubeClusterCredentials(ctx context.Context, cfg clusterDetailsConfig) (k
case cfg.cluster.IsAzure():
return getAzureCredentials(ctx, cfg.cloudClients, dynCredsCfg)
case cfg.cluster.IsAWS():
return getAWSCredentials(ctx, cfg.cloudClients, dynCredsCfg)
return getAWSCredentials(ctx, cfg.awsCloudClients, dynCredsCfg)
case cfg.cluster.IsGCP():
return getGCPCredentials(ctx, cfg.cloudClients, dynCredsCfg)
default:
Expand Down Expand Up @@ -308,7 +312,7 @@ func azureRestConfigClient(cloudClients cloud.Clients) dynamicCredsClient {
}

// getAWSCredentials creates a dynamicKubeCreds that generates and updates the access credentials to a EKS kubernetes cluster.
func getAWSCredentials(ctx context.Context, cloudClients cloud.Clients, cfg dynamicCredsConfig) (*dynamicKubeCreds, error) {
func getAWSCredentials(ctx context.Context, cloudClients ClientGetter, cfg dynamicCredsConfig) (*dynamicKubeCreds, error) {
// create a client that returns the credentials for kubeCluster
cfg.client = getAWSClientRestConfig(cloudClients, cfg.clock, cfg.resourceMatchers)
creds, err := newDynamicKubeCreds(ctx, cfg)
Expand All @@ -334,45 +338,64 @@ func getAWSResourceMatcherToCluster(kubeCluster types.KubeCluster, resourceMatch
return nil
}

// STSPresignClient is the subset of the STS presign interface we use in fetchers.
type STSPresignClient = kubeutils.STSPresignClient

type EKSClient interface {
eks.DescribeClusterAPIClient
}

type STSClient interface{}

// ClientGetter is an interface for getting an EKS client and an STS client.
type ClientGetter interface {
// GetAWSEKSClient returns AWS EKS client for the specified region.
GetAWSEKSClient(ctx context.Context, region string, opts ...awsconfig.OptionsFn) (EKSClient, error)
// GetAWSSTSClient returns AWS STS client for the specified region.
GetAWSSTSClient(ctx context.Context, region string, opts ...awsconfig.OptionsFn) (STSClient, error)
// GetAWSSTSPresignClient returns AWS STS presign client for the specified region.
GetAWSSTSPresignClient(ctx context.Context, region string, opts ...awsconfig.OptionsFn) (STSPresignClient, error)
}

// getAWSClientRestConfig creates a dynamicCredsClient that generates returns credentials to EKS clusters.
func getAWSClientRestConfig(cloudClients cloud.Clients, clock clockwork.Clock, resourceMatchers []services.ResourceMatcher) dynamicCredsClient {
func getAWSClientRestConfig(cloudClients ClientGetter, clock clockwork.Clock, resourceMatchers []services.ResourceMatcher) dynamicCredsClient {
return func(ctx context.Context, cluster types.KubeCluster) (*rest.Config, time.Time, error) {
region := cluster.GetAWSConfig().Region
opts := []cloud.AWSOptionsFn{
cloud.WithAmbientCredentials(),
cloud.WithoutSessionCache(),
opts := []awsconfig.OptionsFn{
awsconfig.WithAmbientCredentials(),
// awsconfig.WithoutSessionCache(),
}
if awsAssume := getAWSResourceMatcherToCluster(cluster, resourceMatchers); awsAssume != nil {
opts = append(opts, cloud.WithAssumeRole(awsAssume.AssumeRoleARN, awsAssume.ExternalID))
opts = append(opts, awsconfig.WithAssumeRole(awsAssume.AssumeRoleARN, awsAssume.ExternalID))
}
regionalClient, err := cloudClients.GetAWSEKSClient(ctx, region, opts...)
if err != nil {
return nil, time.Time{}, trace.Wrap(err)
}

eksCfg, err := regionalClient.DescribeClusterWithContext(ctx, &eks.DescribeClusterInput{
eksCfg, err := regionalClient.DescribeCluster(ctx, &eks.DescribeClusterInput{
Name: aws.String(cluster.GetAWSConfig().Name),
})
if err != nil {
return nil, time.Time{}, trace.Wrap(err)
}

ca, err := base64.StdEncoding.DecodeString(aws.StringValue(eksCfg.Cluster.CertificateAuthority.Data))
ca, err := base64.StdEncoding.DecodeString(aws.ToString(eksCfg.Cluster.CertificateAuthority.Data))
if err != nil {
return nil, time.Time{}, trace.Wrap(err)
}

apiEndpoint := aws.StringValue(eksCfg.Cluster.Endpoint)
apiEndpoint := aws.ToString(eksCfg.Cluster.Endpoint)
if len(apiEndpoint) == 0 {
return nil, time.Time{}, trace.BadParameter("invalid api endpoint for cluster %q", cluster.GetAWSConfig().Name)
}

stsClient, err := cloudClients.GetAWSSTSClient(ctx, region, opts...)
stsPresignClient, err := cloudClients.GetAWSSTSPresignClient(ctx, region, opts...)
if err != nil {
return nil, time.Time{}, trace.Wrap(err)
}

token, exp, err := kubeutils.GenAWSEKSToken(stsClient, cluster.GetAWSConfig().Name, clock)
token, exp, err := kubeutils.GenAWSEKSToken(ctx, stsPresignClient, cluster.GetAWSConfig().Name, clock)
if err != nil {
return nil, time.Time{}, trace.Wrap(err)
}
Expand Down
171 changes: 153 additions & 18 deletions lib/kube/proxy/kube_creds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ package proxy
import (
"context"
"encoding/base64"
"errors"
"net/url"
"slices"
"strings"
"sync"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/service/eks"
ekstypes "github.com/aws/aws-sdk-go-v2/service/eks/types"
"github.com/aws/aws-sdk-go-v2/service/sts"
ststypes "github.com/aws/aws-sdk-go-v2/service/sts/types"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
Expand All @@ -37,14 +44,138 @@ import (
"github.com/gravitational/teleport/api/types"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/cloud/awsconfig"
"github.com/gravitational/teleport/lib/cloud/azure"
"github.com/gravitational/teleport/lib/cloud/gcp"
"github.com/gravitational/teleport/lib/cloud/mocks"
"github.com/gravitational/teleport/lib/fixtures"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
)

type mockEKSClientGetter struct {
stsClient *mockSTSAPI
stsPresignClient *mockSTSPresignAPI
eksClient *mockEKSAPI
}

func (e *mockEKSClientGetter) GetAWSEKSClient(ctx context.Context, region string, opts ...awsconfig.OptionsFn) (EKSClient, error) {
_, _ = awsconfig.GetConfig(ctx, region, opts...)
return e.eksClient, nil
}

func (e *mockEKSClientGetter) GetAWSSTSClient(ctx context.Context, region string, opts ...awsconfig.OptionsFn) (STSClient, error) {
_, _ = awsconfig.GetConfig(ctx, region, opts...)
return e.stsClient, nil
}

func (e *mockEKSClientGetter) GetAWSSTSPresignClient(ctx context.Context, region string, opts ...awsconfig.OptionsFn) (kubeutils.STSPresignClient, error) {
_, _ = awsconfig.GetConfig(ctx, region, opts...)
return e.stsPresignClient, nil
}

type mockSTSPresignAPI struct {
url *url.URL
}

func (a *mockSTSPresignAPI) PresignGetCallerIdentity(ctx context.Context, params *sts.GetCallerIdentityInput, optFns ...func(*sts.PresignOptions)) (*v4.PresignedHTTPRequest, error) {
return &v4.PresignedHTTPRequest{
URL: a.url.String(),
}, nil
}

type mockSTSAPI struct {
mu sync.Mutex

ARN string

assumedRoleARNs []string
assumedRoleExternalIDs []string
}

func (m *mockSTSAPI) GetAssumedRoleARNs() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.assumedRoleARNs
}

func (m *mockSTSAPI) GetAssumedRoleExternalIDs() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.assumedRoleExternalIDs
}

func (m *mockSTSAPI) ResetAssumeRoleHistory() {
m.mu.Lock()
defer m.mu.Unlock()
m.assumedRoleARNs = nil
m.assumedRoleExternalIDs = nil
}

func (m *mockSTSAPI) AssumeRole(ctx context.Context, params *sts.AssumeRoleInput, optFns ...func(*sts.Options)) (*sts.AssumeRoleOutput, error) {
m.mu.Lock()
defer m.mu.Unlock()

if !slices.Contains(m.assumedRoleARNs, aws.ToString(params.RoleArn)) {
m.assumedRoleARNs = append(m.assumedRoleARNs, aws.ToString(params.RoleArn))
m.assumedRoleExternalIDs = append(m.assumedRoleExternalIDs, aws.ToString(params.ExternalId))
}
expiry := time.Now().Add(60 * time.Minute)
return &sts.AssumeRoleOutput{
Credentials: &ststypes.Credentials{
AccessKeyId: params.RoleArn,
SecretAccessKey: aws.String("secret"),
SessionToken: aws.String("token"),
Expiration: &expiry,
},
}, nil
}

func (m *mockSTSAPI) GetCallerIdentity(ctx context.Context, params *sts.GetCallerIdentityInput, optFns ...func(*sts.Options)) (*sts.GetCallerIdentityOutput, error) {
return &sts.GetCallerIdentityOutput{
Arn: aws.String(m.ARN),
}, nil
}

type mockEKSAPI struct {
EKSClient

notify chan struct{}
clusters []*ekstypes.Cluster
}

func (m *mockEKSAPI) ListClusters(ctx context.Context, req *eks.ListClustersInput, _ ...func(*eks.Options)) (*eks.ListClustersOutput, error) {
defer func() {
if m.notify != nil {
m.notify <- struct{}{}
}
}()
var names []string
for _, cluster := range m.clusters {
names = append(names, aws.ToString(cluster.Name))
}
return &eks.ListClustersOutput{
Clusters: names,
}, nil
}

func (m *mockEKSAPI) DescribeCluster(_ context.Context, req *eks.DescribeClusterInput, _ ...func(*eks.Options)) (*eks.DescribeClusterOutput, error) {
defer func() {
if m.notify != nil {
m.notify <- struct{}{}
}
}()
for _, cluster := range m.clusters {
if aws.ToString(cluster.Name) == aws.ToString(req.Name) {
return &eks.DescribeClusterOutput{
Cluster: cluster,
}, nil
}
}
return nil, errors.New("cluster not found")
}

// Test_DynamicKubeCreds tests the dynamic kube credrentials generator for
// AWS, GCP, and Azure clusters accessed using their respective IAM credentials.
// This test mocks the cloud provider clients and the STS client to generate
Expand Down Expand Up @@ -99,32 +230,36 @@ func Test_DynamicKubeCreds(t *testing.T) {
)
require.NoError(t, err)

// mock sts client
// Mock sts client.
u := &url.URL{
Scheme: "https",
Host: "sts.amazonaws.com",
Path: "/?Action=GetCallerIdentity&Version=2011-06-15",
}
sts := &mocks.STSClientV1{
// u is used to presign the request
// here we just verify the pre-signed request includes this url.
URL: u,
}
// mock clients
cloudclients := &cloud.TestCloudClients{
STS: sts,
EKS: &mocks.EKSMock{
Notify: notify,
Clusters: []*eks.Cluster{
sts := &mockSTSAPI{}
// EKS clients.
eksClients := &mockEKSClientGetter{
stsClient: sts,
stsPresignClient: &mockSTSPresignAPI{
// u is used to presign the request
// here we just verify the pre-signed request includes this url.
url: u,
},
eksClient: &mockEKSAPI{
notify: notify,
clusters: []*ekstypes.Cluster{
{
Endpoint: aws.String("https://api.eks.us-west-2.amazonaws.com"),
Name: aws.String(awsKube.GetAWSConfig().Name),
CertificateAuthority: &eks.Certificate{
CertificateAuthority: &ekstypes.Certificate{
Data: aws.String(base64.RawStdEncoding.EncodeToString([]byte(fixtures.TLSCACertPEM))),
},
},
},
},
}
// Mock clients.
cloudclients := &cloud.TestCloudClients{
GCPGKE: &mocks.GKEMock{
Notify: notify,
Clock: fakeClock,
Expand Down Expand Up @@ -204,7 +339,7 @@ func Test_DynamicKubeCreds(t *testing.T) {
name: "aws eks cluster without assume role",
args: args{
cluster: awsKube,
client: getAWSClientRestConfig(cloudclients, fakeClock, nil),
client: getAWSClientRestConfig(eksClients, fakeClock, nil),
validateBearerToken: validateEKSToken,
},
wantAddr: "api.eks.us-west-2.amazonaws.com:443",
Expand All @@ -213,7 +348,7 @@ func Test_DynamicKubeCreds(t *testing.T) {
name: "aws eks cluster with unmatched assume role",
args: args{
cluster: awsKube,
client: getAWSClientRestConfig(cloudclients, fakeClock, []services.ResourceMatcher{
client: getAWSClientRestConfig(eksClients, fakeClock, []services.ResourceMatcher{
{
Labels: types.Labels{
"rand": []string{"value"},
Expand All @@ -233,7 +368,7 @@ func Test_DynamicKubeCreds(t *testing.T) {
args: args{
cluster: awsKube,
client: getAWSClientRestConfig(
cloudclients,
eksClients,
fakeClock,
[]services.ResourceMatcher{
{
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/utils/eks_token_signed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func GenAWSEKSToken(ctx context.Context, stsClient STSPresignClient, clusterID s
clusterIDHeader = "x-k8s-aws-id"
)

presignedReq, err := sts.NewPresignClient(nil).PresignGetCallerIdentity(ctx, nil, func(po *sts.PresignOptions) {
presignedReq, err := stsClient.PresignGetCallerIdentity(ctx, nil, func(po *sts.PresignOptions) {
po.ClientOptions = append(po.ClientOptions, func(o *sts.Options) {
o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error {
return stack.Finalize.Add(middleware.FinalizeMiddlewareFunc("ClusterIDHeaderMW", func(
Expand Down
Loading

0 comments on commit e662c31

Please sign in to comment.