From d950b58137950efa873829f392282fcbf8259130 Mon Sep 17 00:00:00 2001 From: Dave Henderson Date: Sun, 24 Nov 2024 14:24:05 -0500 Subject: [PATCH] feat(blobfs): Use AWS SDK v2 for blobfs (#892) Signed-off-by: Dave Henderson --- awssmfs/awssm_test.go | 2 +- blobfs/blob.go | 65 +++-------- blobfs/blob_test.go | 33 ++++-- blobfs/s3blob.go | 101 +++++++++++++++++ blobfs/s3opener.go | 247 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- url_schemes.md | 34 ++++-- 7 files changed, 415 insertions(+), 69 deletions(-) create mode 100644 blobfs/s3blob.go create mode 100644 blobfs/s3opener.go diff --git a/awssmfs/awssm_test.go b/awssmfs/awssm_test.go index 3c6c46a0..26d5bba7 100644 --- a/awssmfs/awssm_test.go +++ b/awssmfs/awssm_test.go @@ -6,8 +6,8 @@ import ( "testing" "testing/fstest" + "github.com/aws/aws-sdk-go-v2/aws" smtypes "github.com/aws/aws-sdk-go-v2/service/secretsmanager/types" - "github.com/aws/aws-sdk-go/aws" "github.com/hairyhenderson/go-fsimpl/internal/tests" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/blobfs/blob.go b/blobfs/blob.go index ab428241..6ffbf07b 100644 --- a/blobfs/blob.go +++ b/blobfs/blob.go @@ -15,9 +15,6 @@ import ( azblobblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" "github.com/hairyhenderson/go-fsimpl" "github.com/hairyhenderson/go-fsimpl/internal" "github.com/hairyhenderson/go-fsimpl/internal/env" @@ -144,7 +141,7 @@ func (f *blobFS) Open(name string) (fs.File, error) { if f.bucket == nil { bucket, err := f.openBucket() if err != nil { - return nil, fmt.Errorf("open bucket: %w", err) + return nil, fmt.Errorf("open: %w", err) } f.bucket = bucket @@ -185,7 +182,7 @@ func (f *blobFS) ReadFile(name string) ([]byte, error) { if f.bucket == nil { bucket, err := f.openBucket() if err != nil { - return nil, fmt.Errorf("open bucket: %w", err) + return nil, fmt.Errorf("readFile: %w", err) } f.bucket = bucket @@ -198,10 +195,8 @@ func (f *blobFS) ReadFile(name string) ([]byte, error) { func (f *blobFS) newOpener(ctx context.Context, scheme string) (opener blob.BucketURLOpener, err error) { switch scheme { case s3blob.Scheme: - sess := f.initS3Session() - // see https://gocloud.dev/concepts/urls/#muxes - return &s3blob.URLOpener{ConfigProvider: sess}, nil + return &s3v2URLOpener{}, nil case gcsblob.Scheme: if env.GetenvFS(f.envfs, "GOOGLE_ANON") == "true" { return &gcsblob.URLOpener{ @@ -230,21 +225,22 @@ func (f *blobFS) newOpener(ctx context.Context, scheme string) (opener blob.Buck } // initS3Session - -func (f *blobFS) initS3Session() *session.Session { - config := aws.NewConfig() - config = config.WithHTTPClient(f.hclient) +// Deprecated: this is for v1, but kept here for posterity +// func (f *blobFS) initS3Sessionv1() *session.Session { +// config := aws.NewConfig() +// config = config.WithHTTPClient(f.hclient) - if env.GetenvFS(f.envfs, "AWS_ANON") == "true" { - config = config.WithCredentials(credentials.AnonymousCredentials) - } +// if env.GetenvFS(f.envfs, "AWS_ANON") == "true" { +// config = config.WithCredentials(credentials.AnonymousCredentials) +// } - config = config.WithCredentialsChainVerboseErrors(true) +// config = config.WithCredentialsChainVerboseErrors(true) - return session.Must(session.NewSessionWithOptions(session.Options{ - Config: *config, - SharedConfigState: session.SharedConfigEnable, - })) -} +// return session.Must(session.NewSessionWithOptions(session.Options{ +// Config: *config, +// SharedConfigState: session.SharedConfigEnable, +// })) +// } // copy/sanitize the URL for the Go CDK - it doesn't like params it can't parse func (f *blobFS) cleanCdkURL(u url.URL) url.URL { @@ -290,35 +286,6 @@ func (f *blobFS) cleanGSURL(u url.URL) url.URL { return u } -func (f *blobFS) cleanS3URL(u url.URL) url.URL { - q := u.Query() - for param := range q { - switch param { - case "region", "endpoint", "disableSSL", "s3ForcePathStyle": - default: - q.Del(param) - } - } - - if q.Get("endpoint") == "" { - endpoint := env.GetenvFS(f.envfs, "AWS_S3_ENDPOINT") - if endpoint != "" { - q.Set("endpoint", endpoint) - } - } - - if q.Get("region") == "" { - region := env.GetenvFS(f.envfs, "AWS_REGION", env.GetenvFS(f.envfs, "AWS_DEFAULT_REGION")) - if region != "" { - q.Set("region", region) - } - } - - u.RawQuery = q.Encode() - - return u -} - type blobFile struct { ctx context.Context reader *blob.Reader diff --git a/blobfs/blob_test.go b/blobfs/blob_test.go index fa67de3b..4dd6a192 100644 --- a/blobfs/blob_test.go +++ b/blobfs/blob_test.go @@ -17,6 +17,7 @@ import ( "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func setupTestS3Bucket(t *testing.T) *url.URL { @@ -206,35 +207,35 @@ func TestBlobFS_ReadDir(t *testing.T) { t.Setenv("AWS_ANON", "true") fsys, err := New(tests.MustURL("s3://mybucket/?region=us-east-1&disableSSL=true&s3ForcePathStyle=true&endpoint=" + srvURL.Host)) - assert.NoError(t, err) + require.NoError(t, err) de, err := fs.ReadDir(fsys, "dir1") - assert.NoError(t, err) + require.NoError(t, err) assert.Len(t, de, 2) de, err = fs.ReadDir(fsys, ".") - assert.NoError(t, err) + require.NoError(t, err) assert.Len(t, de, 5) fi, err := de[0].Info() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "dir1", fi.Name()) f, err := fsys.Open("dir1") - assert.NoError(t, err) + require.NoError(t, err) assert.IsType(t, &blobFile{}, f) fi, err = f.Stat() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "dir1", fi.Name()) f, err = fsys.Open("file1") - assert.NoError(t, err) + require.NoError(t, err) defer f.Close() fi, err = f.Stat() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, fs.FileMode(0o444), fi.Mode()) } @@ -247,8 +248,20 @@ func TestBlobFS_CleanCdkURL(t *testing.T) { {"s3://foo/bar/baz", "s3://foo/bar/baz"}, {"s3://foo/bar/baz?type=hello/world", "s3://foo/bar/baz"}, {"s3://foo/bar/baz?region=us-east-1", "s3://foo/bar/baz?region=us-east-1"}, - {"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disableSSL=true"}, - {"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4", "s3://foo/bar/baz?endpoint=1.2.3.4&s3ForcePathStyle=true"}, + {"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disable_https=true"}, + { + "s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4", + "s3://foo/bar/baz?endpoint=https%3A%2F%2F1.2.3.4&use_path_style=true", + }, + {"s3://foo/bar/baz?disable_https=true&type=text/csv", "s3://foo/bar/baz?disable_https=true"}, + { + "s3://foo/bar/baz?type=text/csv&use_path_style=true&endpoint=1.2.3.4", + "s3://foo/bar/baz?endpoint=https%3A%2F%2F1.2.3.4&use_path_style=true", + }, + { + "s3://foo/bar/baz?disable_https=true&type=text/csv&use_path_style=true&endpoint=1.2.3.4:1234", + "s3://foo/bar/baz?disable_https=true&endpoint=http%3A%2F%2F1.2.3.4%3A1234&use_path_style=true", + }, {"gs://foo/bar/baz", "gs://foo/bar/baz"}, {"gs://foo/bar/baz?type=foo/bar", "gs://foo/bar/baz"}, {"gs://foo/bar/baz?access_id=123", "gs://foo/bar/baz?access_id=123"}, diff --git a/blobfs/s3blob.go b/blobfs/s3blob.go new file mode 100644 index 00000000..de9dd331 --- /dev/null +++ b/blobfs/s3blob.go @@ -0,0 +1,101 @@ +package blobfs + +import ( + "net/url" + + "github.com/hairyhenderson/go-fsimpl/internal/env" +) + +// s3-specific blobfs methods + +func (f *blobFS) cleanS3URL(u url.URL) url.URL { + q := u.Query() + translateV1Params(q) + + // allow known query parameters, remove unknown ones + for param := range q { + switch param { + case "accelerate", + "anonymous", + "disable_https", + "dualstack", + "endpoint", + "fips", + "hostname_immutable", + "profile", + "rate_limiter_capacity", + "region", + "use_path_style": + // not relevant for read operations, but are be passed through to the + // Go CDK + case "kmskeyid", "ssetype": + default: + q.Del(param) + } + } + + f.setParamsFromEnv(q) + + ensureValidEndpointURL(q) + + u.RawQuery = q.Encode() + + return u +} + +// translateV1Params translates v1 query parameters to v2 query parameters. +func translateV1Params(q url.Values) { + for param := range q { + switch param { + // changed to 'disable_https' in s3v2 + case "disableSSL": + q.Set("disable_https", q.Get(param)) + q.Del(param) + // changed to 'use_path_style' in s3v2 + case "s3ForcePathStyle": + q.Set("use_path_style", q.Get(param)) + q.Del(param) + } + } +} + +func ensureValidEndpointURL(q url.Values) { + // if we have an endpoint, make sure it's a parseable URL with a scheme + if endpoint := q.Get("endpoint"); endpoint != "" { + u, err := url.Parse(endpoint) + if err != nil || u.Scheme == "" { + // try adding a schema - if disable_https is set, use http, otherwise https + if q.Get("disable_https") == "true" { + q.Del("endpoint") + q.Set("endpoint", "http://"+endpoint) + } else { + q.Del("endpoint") + q.Set("endpoint", "https://"+endpoint) + } + } + } +} + +// setParamsFromEnv sets query parameters based on env vars +func (f *blobFS) setParamsFromEnv(q url.Values) { + if q.Get("endpoint") == "" { + endpoint := env.GetenvFS(f.envfs, "AWS_S3_ENDPOINT") + if endpoint != "" { + q.Set("endpoint", endpoint) + } + } + + if q.Get("region") == "" { + region := env.GetenvFS(f.envfs, "AWS_REGION", env.GetenvFS(f.envfs, "AWS_DEFAULT_REGION")) + if region != "" { + q.Set("region", region) + } + } + + if q.Get("anonymous") == "" { + anon := env.GetenvFS(f.envfs, "AWS_ANON") + if anon != "" { + q.Set("anonymous", anon) + } + } +} diff --git a/blobfs/s3opener.go b/blobfs/s3opener.go new file mode 100644 index 00000000..36a06451 --- /dev/null +++ b/blobfs/s3opener.go @@ -0,0 +1,247 @@ +package blobfs + +import ( + "context" + "fmt" + "net/url" + "strconv" + "strings" + + awsv2 "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/ratelimit" + "github.com/aws/aws-sdk-go-v2/aws/retry" + awsv2cfg "github.com/aws/aws-sdk-go-v2/config" + s3v2 "github.com/aws/aws-sdk-go-v2/service/s3" + typesv2 "github.com/aws/aws-sdk-go-v2/service/s3/types" + "gocloud.dev/blob" + "gocloud.dev/blob/s3blob" +) + +// Note: most of the code in this file is taken from the Go CDK and modified +// to support anonymous access to S3 buckets. +// See https://github.com/google/go-cloud/issues/3512 + +var _ blob.BucketURLOpener = (*s3v2URLOpener)(nil) + +type s3v2URLOpener struct { + // Options specifies the options to pass to OpenBucket. + Options s3blob.Options +} + +const ( + sseTypeParamKey = "ssetype" + kmsKeyIDParamKey = "kmskeyid" + accelerateParamKey = "accelerate" + usePathStyleParamkey = "use_path_style" + disableHTTPSParamKey = "disable_https" +) + +func toServerSideEncryptionType(value string) (typesv2.ServerSideEncryption, error) { + for _, sseType := range typesv2.ServerSideEncryptionAes256.Values() { + if strings.EqualFold(string(sseType), value) { + return sseType, nil + } + } + + return "", fmt.Errorf("%q is not a valid value for %q", value, sseTypeParamKey) +} + +// OpenBucketURL opens an s3blob.Bucket based on u. +// Taken from +// +//nolint:funlen,gocyclo +func (o *s3v2URLOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) { + q := u.Query() + + if sseTypeParam := q.Get(sseTypeParamKey); sseTypeParam != "" { + q.Del(sseTypeParamKey) + + sseType, err := toServerSideEncryptionType(sseTypeParam) + if err != nil { + return nil, err + } + + o.Options.EncryptionType = sseType + } + + if kmsKeyID := q.Get(kmsKeyIDParamKey); kmsKeyID != "" { + q.Del(kmsKeyIDParamKey) + + o.Options.KMSEncryptionID = kmsKeyID + } + + accelerate := false + + if accelerateParam := q.Get(accelerateParamKey); accelerateParam != "" { + q.Del(accelerateParamKey) + + var err error + + accelerate, err = strconv.ParseBool(accelerateParam) + if err != nil { + return nil, fmt.Errorf("invalid value for %q: %w", accelerateParamKey, err) + } + } + + opts := []func(*s3v2.Options){ + func(o *s3v2.Options) { + o.UseAccelerate = accelerate + }, + } + + if disableHTTPSParam := q.Get(disableHTTPSParamKey); disableHTTPSParam != "" { + q.Del(disableHTTPSParamKey) + + value, err := strconv.ParseBool(disableHTTPSParam) + if err != nil { + return nil, fmt.Errorf("invalid value for %q: %w", disableHTTPSParamKey, err) + } + + opts = append(opts, func(o *s3v2.Options) { + o.EndpointOptions.DisableHTTPS = value + }) + } + + if usePathStyleParam := q.Get(usePathStyleParamkey); usePathStyleParam != "" { + q.Del(usePathStyleParamkey) + + value, err := strconv.ParseBool(usePathStyleParam) + if err != nil { + return nil, fmt.Errorf("invalid value for %q: %w", usePathStyleParamkey, err) + } + + opts = append(opts, func(o *s3v2.Options) { + o.UsePathStyle = value + }) + } + + cfg, err := V2ConfigFromURLParams(ctx, q) + if err != nil { + return nil, fmt.Errorf("open bucket %v: %w", u, err) + } + + clientV2 := s3v2.NewFromConfig(cfg, opts...) + + return s3blob.OpenBucketV2(ctx, clientV2, u.Host, &o.Options) +} + +// V2ConfigFromURLParams returns an aws.Config for AWS SDK v2 initialized based on the URL +// parameters in q. It is intended to be used by URLOpeners for AWS services if +// UseV2 returns true. +// +// https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/aws#Config +// +// It returns an error if q contains any unknown query parameters; callers +// should remove any query parameters they know about from q before calling +// V2ConfigFromURLParams. +// +// The following query options are supported: +// - region: The AWS region for requests; sets WithRegion. +// - profile: The shared config profile to use; sets SharedConfigProfile. +// - endpoint: The AWS service endpoint to send HTTP request. +// - hostname_immutable: Make the hostname immutable, only works if endpoint is also set. +// - dualstack: A value of "true" enables dual stack (IPv4 and IPv6) endpoints. +// - fips: A value of "true" enables the use of FIPS endpoints. +// - rate_limiter_capacity: A integer value configures the capacity of a token bucket used +// in client-side rate limits. If no value is set, the client-side rate limiting is disabled. +// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/retries-timeouts/#client-side-rate-limiting. +// +//nolint:funlen,gocyclo +func V2ConfigFromURLParams(ctx context.Context, q url.Values) (awsv2.Config, error) { + var ( + endpoint string + hostnameImmutable bool + rateLimitCapacity int64 + opts []func(*awsv2cfg.LoadOptions) error + ) + + for param, values := range q { + value := values[0] + + switch param { + // See https://github.com/google/go-cloud/issues/3512 + case "anonymous": + enableAnon, err := strconv.ParseBool(value) + if err != nil { + return awsv2.Config{}, fmt.Errorf("invalid value for anonymous: %w", err) + } + + if enableAnon { + opts = append(opts, awsv2cfg.WithCredentialsProvider(awsv2.AnonymousCredentials{})) + } + case "hostname_immutable": + var err error + + hostnameImmutable, err = strconv.ParseBool(value) + if err != nil { + return awsv2.Config{}, fmt.Errorf("invalid value for hostname_immutable: %w", err) + } + case "region": + opts = append(opts, awsv2cfg.WithRegion(value)) + case "endpoint": + endpoint = value + case "profile": + opts = append(opts, awsv2cfg.WithSharedConfigProfile(value)) + case "dualstack": + dualStack, err := strconv.ParseBool(value) + if err != nil { + return awsv2.Config{}, fmt.Errorf("invalid value for dualstack: %w", err) + } + + if dualStack { + opts = append(opts, awsv2cfg.WithUseDualStackEndpoint(awsv2.DualStackEndpointStateEnabled)) + } + case "fips": + fips, err := strconv.ParseBool(value) + if err != nil { + return awsv2.Config{}, fmt.Errorf("invalid value for fips: %w", err) + } + + if fips { + opts = append(opts, awsv2cfg.WithUseFIPSEndpoint(awsv2.FIPSEndpointStateEnabled)) + } + case "rate_limiter_capacity": + var err error + + rateLimitCapacity, err = strconv.ParseInt(value, 10, 32) + if err != nil { + return awsv2.Config{}, fmt.Errorf("invalid value for capacity: %w", err) + } + case "awssdk": + // ignore, should be handled before this + default: + return awsv2.Config{}, fmt.Errorf("unknown query parameter %q", param) + } + } + + if endpoint != "" { + //nolint:staticcheck + customResolver := awsv2.EndpointResolverWithOptionsFunc( + func(_, region string, _ ...interface{}) (awsv2.Endpoint, error) { + //nolint:staticcheck + return awsv2.Endpoint{ + PartitionID: "aws", + URL: endpoint, + SigningRegion: region, + HostnameImmutable: hostnameImmutable, + }, nil + }) + //nolint:staticcheck + opts = append(opts, awsv2cfg.WithEndpointResolverWithOptions(customResolver)) + } + + var rateLimiter retry.RateLimiter + + rateLimiter = ratelimit.None + if rateLimitCapacity > 0 { + rateLimiter = ratelimit.NewTokenRateLimit(uint(rateLimitCapacity)) + } + + opts = append(opts, awsv2cfg.WithRetryer(func() awsv2.Retryer { + return retry.NewStandard(func(so *retry.StandardOptions) { + so.RateLimiter = rateLimiter + }) + })) + + return awsv2cfg.LoadDefaultConfig(ctx, opts...) +} diff --git a/go.mod b/go.mod index 3429071a..07d2e1cf 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.22.4 require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0 - github.com/aws/aws-sdk-go v1.55.5 github.com/aws/aws-sdk-go-v2 v1.32.5 github.com/aws/aws-sdk-go-v2/config v1.28.5 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 @@ -57,6 +56,7 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/ProtonMail/go-crypto v1.0.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect + github.com/aws/aws-sdk-go v1.55.5 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.46 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect diff --git a/url_schemes.md b/url_schemes.md index f0bccfd5..196bb4ec 100644 --- a/url_schemes.md +++ b/url_schemes.md @@ -358,14 +358,32 @@ filesystem. - `region`: The AWS region for requests. Defaults to the value from the `AWS_REGION` or `AWS_DEFAULT_REGION` environment variables, or the EC2 region if used in AWS EC2. - - `endpoint`: The endpoint (`hostname`, `hostname:port`, or fully qualified - URI). Useful for using a different S3-compatible object storage server. You - can also set the `AWS_S3_ENDPOINT` environment variable. - - `s3ForcePathStyle`: A value of `true` forces use of the deprecated - "path-style" access. This is necessary for some S3-compatible object storage - servers. - - `disableSSL`: A value of `true` disables SSL when sending requests. Use only - for test scenarios! + - `profile`: The shared config profile name to load from the shared + AWS configuration files. Defaults to the value from the `AWS_PROFILE` or + `AWS_DEFAULT_PROFILE` environment variables, or "default" if none are set. + - `accelerate`: A value of `true` uses the [S3 Transfer Accleration](https://aws.amazon.com/s3/transfer-acceleration/) endpoints. + - `disable_https`: A value of `true` disables the use of HTTPS when sending + requests. Use only for test scenarios! + - `use_path_style`: Allows you to enable the client to use path-style + addressing, i.e., `https://s3.amazonaws.com/BUCKET/KEY`. By default, the S3 + client will use virtual hosted bucket addressing when possible + (`https://BUCKET.s3.amazonaws.com/KEY`). This is necessary for some S3 + compatible object storage servers. + - `anonymous`: _Experimental: May be renamed in future releases._ A value of + `true` configures the client to not sign the request with AWS credentials. + This is necessary for accessing public S3 buckets. + - `dualstack`: A value of `true` configures the use of dualstack endpoint for + a bucket. See the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/dual-stack-endpoints.html) + for more information. + - `endpoint`: The endpoint (fully qualified URI). Useful for using a different + S3-compatible object storage server. You can also set the `AWS_S3_ENDPOINT` + environment variable. + - `fips`: A value of `true` configures the use of the FIPS endpoint for a + bucket. See the [AWS documentation](https://aws.amazon.com/compliance/fips/) + for more information. + - `rate_limiter_capacity`: An integer value configures the capacity of a token + bucket used in client-side rate limits. If no value is set, client-side rate + limiting is disabled. See the [AWS documentation](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/retries-timeouts/#client-side-rate-limiting). #### Examples