From 0ef761cc8bff62639f35dbde1dde29a1df310d82 Mon Sep 17 00:00:00 2001 From: Derek Buitenhuis Date: Sun, 2 Jun 2024 20:32:54 +0100 Subject: [PATCH] gcp/saver: Only return errors.KindAlreadyExists if all three exist (#1957) * gcp/saver: Only return errors.KindAlreadyExists if all three exist In #1124, a GCP lock type was added as a singleflight backend. As part of this work, the GCP backend's Save() was made serial, likely because moduploader.Upload requires a call to Exists() before it, rendering the GCP lock less useful, by doubling the calls to GCS. However, by doing this, the existence check was now only checking the existence of the mod file, and not the info or zip. This meant that if during a Save, the zip or info uploads failed, on subsequent rquests, that when using the GCP singleflight backend, Athens would assume everything had been stashed and saved properly, and then fail to serve up the info or zip that had failed upload, meaning the cache was in an unhealable broklen state, requiring a manual intervention. To fix this, without breaking the singleflight behavior, introduce a metadata key that is set on the mod file during its initial upload, indicating that a Stash is still in progress on subsequent files, which gets removed once all three files are uploaded successfully, which can be checked if it it is determined that the mod file already exists. That way we can return a errors.KindAlreadyExists if a Stash is in progress, but also properly return it when a Stash is *not* currently in progress if and only if all three files exist on GCS, which prevents the cache from becoming permanently poisoned. One note is that it is possible the GCS call to remove the metadata key fails, which would mean it is left on the mod object forever. To avoid this, consider it stale after 2 minutes. --------- Signed-off-by: Derek Buitenhuis Co-authored-by: Matt --- cmd/proxy/actions/app_proxy.go | 6 +- config.dev.toml | 4 + docs/content/configuration/storage.md | 11 +++ pkg/config/config.go | 1 + pkg/config/config_test.go | 3 + pkg/config/singleflight.go | 13 +++ pkg/stash/with_gcs.go | 21 ++++- pkg/stash/with_gcs_test.go | 79 +++++++++++++++- pkg/storage/gcp/gcp.go | 5 +- pkg/storage/gcp/saver.go | 129 ++++++++++++++++++++++++-- 10 files changed, 255 insertions(+), 17 deletions(-) diff --git a/cmd/proxy/actions/app_proxy.go b/cmd/proxy/actions/app_proxy.go index 777ad4ca6..0117bc9db 100644 --- a/cmd/proxy/actions/app_proxy.go +++ b/cmd/proxy/actions/app_proxy.go @@ -101,7 +101,7 @@ func addProxyRoutes( lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs) checker := storage.WithChecker(s) - withSingleFlight, err := getSingleFlight(l, c, checker) + withSingleFlight, err := getSingleFlight(l, c, s, checker) if err != nil { return err } @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a l.logger.WithContext(ctx).Printf(format, v...) } -func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) { +func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) { switch c.SingleFlightType { case "", "memory": return stash.WithSingleflight, nil @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) ( if c.StorageType != "gcp" { return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType) } - return stash.WithGCSLock, nil + return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s) case "azureblob": if c.StorageType != "azureblob" { return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType) diff --git a/config.dev.toml b/config.dev.toml index d04df2fea..aab468dfc 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -377,6 +377,10 @@ ShutdownTimeout = 60 # Max retries while acquiring the lock. Defaults to 10. # Env override: ATHENS_REDIS_LOCK_MAX_RETRIES MaxRetries = 10 + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 [Storage] # Only storage backends that are specified in Proxy.StorageType are required here [Storage.CDN] diff --git a/docs/content/configuration/storage.md b/docs/content/configuration/storage.md index ab63ea653..81ef957d2 100644 --- a/docs/content/configuration/storage.md +++ b/docs/content/configuration/storage.md @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red SentinelPassword = "sekret" Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis. + + +### Using GCP as a singleflight mechanism + +The GCP singleflight mechanism does not required configuration, and works out of the box. It has a +single option with which it can be customized: + + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 diff --git a/pkg/config/config.go b/pkg/config/config.go index 9caed7fee..b591b949a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -181,6 +181,7 @@ func defaultConfig() *Config { SentinelPassword: "sekret", LockConfig: DefaultRedisLockConfig(), }, + GCP: DefaultGCPConfig(), }, Index: &Index{ MySQL: &MySQL{ diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 6c741a04a..b57445e7d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) { LockConfig: DefaultRedisLockConfig(), }, Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"}, + GCP: DefaultGCPConfig(), } expConf := &Config{ @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string { } else if singleFlight.Etcd != nil { envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd" envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints + } else if singleFlight.GCP != nil { + envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold) } } return envVars diff --git a/pkg/config/singleflight.go b/pkg/config/singleflight.go index 69049b10a..b3b6fe048 100644 --- a/pkg/config/singleflight.go +++ b/pkg/config/singleflight.go @@ -7,6 +7,7 @@ type SingleFlight struct { Etcd *Etcd Redis *Redis RedisSentinel *RedisSentinel + GCP *GCP } // Etcd holds client side configuration @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig { MaxRetries: 10, } } + +// GCP is the configuration for GCP locking. +type GCP struct { + StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"` +} + +// DefaultGCPConfig returns the default GCP locking configuration. +func DefaultGCPConfig() *GCP { + return &GCP{ + StaleThreshold: 120, + } +} diff --git a/pkg/stash/with_gcs.go b/pkg/stash/with_gcs.go index 3e2386e5c..788251f1c 100644 --- a/pkg/stash/with_gcs.go +++ b/pkg/stash/with_gcs.go @@ -2,15 +2,32 @@ package stash import ( "context" + "fmt" + "time" "github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/observ" + "github.com/gomods/athens/pkg/storage" + "github.com/gomods/athens/pkg/storage/gcp" ) // WithGCSLock returns a distributed singleflight // using a GCS backend. See the config.toml documentation for details. -func WithGCSLock(s Stasher) Stasher { - return &gcsLock{s} +func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) { + if staleThreshold <= 0 { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold")) + } + // Since we *must* be using a GCP stoagfe backend, we can abuse this + // fact to mutate it, so that we can get our threshold into Save(). + // Your instincts are correct, this is kind of gross. + gs, ok := s.(*gcp.Storage) + if !ok { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage")) + } + gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second) + return func(s Stasher) Stasher { + return &gcsLock{s} + }, nil } type gcsLock struct { diff --git a/pkg/stash/with_gcs_test.go b/pkg/stash/with_gcs_test.go index 3a309cac0..d738a26d0 100644 --- a/pkg/stash/with_gcs_test.go +++ b/pkg/stash/with_gcs_test.go @@ -3,6 +3,7 @@ package stash import ( "bytes" "context" + "fmt" "io" "os" "strings" @@ -17,6 +18,12 @@ import ( "golang.org/x/sync/errgroup" ) +type failReader int + +func (f *failReader) Read([]byte) (int, error) { + return 0, fmt.Errorf("failure") +} + // TestWithGCS requires a real GCP backend implementation // and it will ensure that saving to modules at the same time // is done synchronously so that only the first module gets saved. @@ -41,7 +48,11 @@ func TestWithGCS(t *testing.T) { for i := 0; i < 5; i++ { content := uuid.New().String() ms := &mockGCPStasher{strg, content} - s := WithGCSLock(ms) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) eg.Go(func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -79,6 +90,72 @@ func TestWithGCS(t *testing.T) { } } +// TestWithGCSPartialFailure equires a real GCP backend implementation +// and ensures that if one of the non-singleflight-lock files fails to +// upload, that the cache does not remain poisoned. +func TestWithGCSPartialFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + const ( + mod = "stashmod" + ver = "v1.0.0" + ) + strg := getStorage(t) + strg.Delete(ctx, mod, ver) + defer strg.Delete(ctx, mod, ver) + + // sanity check + _, err := strg.GoMod(ctx, mod, ver) + if !errors.Is(err, errors.KindNotFound) { + t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err) + } + + content := uuid.New().String() + ms := &mockGCPStasher{strg, content} + fr := new(failReader) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) + // We simulate a failure by manually passing an io.Reader that will fail. + err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content)) + if err == nil { + // We *want* to fail. + t.Fatal(err) + } + + // Now try a Stash. This should upload the missing files. + _, err = s.Stash(ctx, "stashmod", "v1.0.0") + if err != nil { + t.Fatal(err) + } + + info, err := strg.Info(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + modContent, err := strg.GoMod(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + zip, err := strg.Zip(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + defer zip.Close() + zipContent, err := io.ReadAll(zip) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(info, modContent) { + t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent)) + } + if !bytes.Equal(info, zipContent) { + t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent)) + } +} + // mockGCPStasher is like mockStasher // but leverages in memory storage // so that redis can determine diff --git a/pkg/storage/gcp/gcp.go b/pkg/storage/gcp/gcp.go index a15d25d1d..b502fe588 100644 --- a/pkg/storage/gcp/gcp.go +++ b/pkg/storage/gcp/gcp.go @@ -15,8 +15,9 @@ import ( // Storage implements the (./pkg/storage).Backend interface. type Storage struct { - bucket *storage.BucketHandle - timeout time.Duration + bucket *storage.BucketHandle + timeout time.Duration + staleThreshold time.Duration } // New returns a new Storage instance backed by a Google Cloud Storage bucket. diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 4298aaa17..b430d64d7 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "time" "cloud.google.com/go/storage" "github.com/gomods/athens/pkg/config" @@ -12,6 +13,10 @@ import ( googleapi "google.golang.org/api/googleapi" ) +// Fallback for how long we consider an "in_progress" metadata key stale, +// due to failure to remove it. +const fallbackInProgressStaleThreshold = 2 * time.Minute + // Save uploads the module's .mod, .zip and .info files for a given version // It expects a context, which can be provided using context.Background // from the standard library until context has been threaded down the stack. @@ -20,40 +25,146 @@ import ( // Uploaded files are publicly accessible in the storage bucket as per // an ACL rule. func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { - const op errors.Op = "gcp.Save" + const op errors.Op = "gcp.save" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() gomodPath := config.PackageVersionedName(module, version, "mod") - err := s.upload(ctx, gomodPath, bytes.NewReader(mod)) - if err != nil { + innerErr := s.save(ctx, module, version, mod, zip, info) + if errors.Is(innerErr, errors.KindAlreadyExists) { + // Cache hit. + return errors.E(op, innerErr) + } + // No cache hit. Remove the metadata lock if it is there. + inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + if inProgress { + outerErr = s.removeInProgressMetadata(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + } + return innerErr +} + +// SetStaleThreshold sets the threshold of how long we consider +// a lock metadata stale after. +func (s *Storage) SetStaleThreshold(threshold time.Duration) { + s.staleThreshold = threshold +} + +func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + const op errors.Op = "gcp.save" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + gomodPath := config.PackageVersionedName(module, version, "mod") + seenAlreadyExists := 0 + err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true) + // If it already exists, check the object metadata to see if the + // other two are still uploading in progress somewhere else. If they + // are, return a cache hit. If not, continue on to the other two, + // and only return a cache hit if all three exist. + if errors.Is(err, errors.KindAlreadyExists) { + inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath) + if progressErr != nil { + return errors.E(op, progressErr) + } + if inProgress { + // err is known to be errors.KindAlreadyExists at this point, so + // this is a cache hit return. + return errors.E(op, err) + } + seenAlreadyExists++ + } else if err != nil { + // Other errors return errors.E(op, err) } zipPath := config.PackageVersionedName(module, version, "zip") - err = s.upload(ctx, zipPath, zip) - if err != nil { + err = s.upload(ctx, zipPath, zip, false) + if errors.Is(err, errors.KindAlreadyExists) { + seenAlreadyExists++ + } else if err != nil { return errors.E(op, err) } infoPath := config.PackageVersionedName(module, version, "info") - err = s.upload(ctx, infoPath, bytes.NewReader(info)) + err = s.upload(ctx, infoPath, bytes.NewReader(info), false) + // Have all three returned errors.KindAlreadyExists? + if errors.Is(err, errors.KindAlreadyExists) { + if seenAlreadyExists == 2 { + return errors.E(op, err) + } + } else if err != nil { + return errors.E(op, err) + } + return nil +} + +func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error { + const op errors.Op = "gcp.removeInProgressMetadata" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + _, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{ + Metadata: map[string]string{}, + }) if err != nil { return errors.E(op, err) } return nil } -func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error { +func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) { + const op errors.Op = "gcp.checkUploadInProgress" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + attrs, err := s.bucket.Object(gomodPath).Attrs(ctx) + if err != nil { + return false, errors.E(op, err) + } + // If we have a config-set lock threshold, i.e. we are using the GCP + // slightflight backend, use it. Otherwise, use the fallback, which + // is arguably irrelevant when not using GCP for singleflighting. + threshold := fallbackInProgressStaleThreshold + if s.staleThreshold > 0 { + threshold = s.staleThreshold + } + if attrs.Metadata != nil { + _, ok := attrs.Metadata["in_progress"] + if ok { + // In case the final call to remove the metadata fails for some reason, + // we have a threshold after which we consider this to be stale. + if time.Since(attrs.Created) > threshold { + return false, nil + } + return true, nil + } + } + return false, nil +} + +func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error { const op errors.Op = "gcp.upload" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + wc := s.bucket.Object(path).If(storage.Conditions{ DoesNotExist: true, - }).NewWriter(ctx) + }).NewWriter(cancelCtx) + + // We set this metadata only for the first of the three files uploaded, + // for use as a singleflight lock. + if first { + wc.ObjectAttrs.Metadata = make(map[string]string) + wc.ObjectAttrs.Metadata["in_progress"] = "true" + } // NOTE: content type is auto detected on GCP side and ACL defaults to public // Once we support private storage buckets this may need refactoring // unless there is a way to set the default perms in the project. if _, err := io.Copy(wc, stream); err != nil { - _ = wc.Close() + // Purposely do not close it to avoid creating a partial file. return err }