Skip to content

Commit

Permalink
updates from review
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 committed Oct 7, 2024
1 parent 94c5a71 commit 273bedd
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 31 deletions.
20 changes: 2 additions & 18 deletions db/background_mgr_attachment_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,6 @@ func (a *AttachmentCompactionManager) Init(ctx context.Context, options map[stri
return newRunInit()
}

func (a *AttachmentCompactionManager) PurgeDCPMetadata(ctx context.Context, datastore base.DataStore, database *Database, metadataKeyPrefix string) error {

bucket, err := base.AsGocbV2Bucket(database.Bucket)
if err != nil {
return err
}
numVbuckets, err := bucket.GetMaxVbno()
if err != nil {
return err
}

metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, metadataKeyPrefix)
base.InfofCtx(ctx, base.KeyDCP, "purging persisted dcp metadata for attachment compaction run %s", a.CompactID)
metadata.Purge(ctx, base.DefaultNumWorkers)
return nil
}

func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[string]interface{}, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error {
database := options["database"].(*Database)

Expand Down Expand Up @@ -204,7 +187,8 @@ func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ct
if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) {
base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase)
// to rollback any phase for attachment compaction we need to purge all persisted dcp metadata
err = PurgeDCPMetadata(ctx, database.DatabaseContext, keyPrefix, a.CompactID)
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID)
err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID)
if err != nil {
base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err)
return false, err
Expand Down
10 changes: 6 additions & 4 deletions db/background_mgr_attachment_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string
a.DocsProcessed.Add(1)
syncData, _, _, err := UnmarshalDocumentSyncDataFromFeed(event.Value, event.DataType, collection.userXattrKey(), false)
if err != nil {
failProcess(err, "error unmarshaling document %s with migrationID %s: %v", base.UD(docID), a.MigrationID, err)
failProcess(err, "[%s] error unmarshaling document %s: %v, stopping attachment migration.", migrationLoggingID, base.UD(docID), err)
}

if syncData == nil || syncData.Attachments == nil {
Expand All @@ -140,7 +140,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string
// xattr migration to take place
err = collWithUser.MigrateAttachmentMetadata(collCtx, docID, event.Cas, syncData)
if err != nil {
failProcess(err, "error migrating document attachment metadata for doc: %s with migrationID %s: %v", base.UD(docID), a.MigrationID, err)
failProcess(err, "[%s] error migrating document attachment metadata for doc: %s: %v", migrationLoggingID, base.UD(docID), err)
}
a.DocsChanged.Add(1)
return true
Expand Down Expand Up @@ -307,7 +307,8 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex
return nil
}
if len(a.CollectionIDs) != len(collectionIDs) {
err := PurgeDCPMetadata(ctx, database, metadataKeyPrefix, a.MigrationID)
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID)
err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID)
if err != nil {
return err
}
Expand All @@ -316,7 +317,8 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex
slices.Sort(a.CollectionIDs)
purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs)
if purgeNeeded != 0 {
err := PurgeDCPMetadata(ctx, database, metadataKeyPrefix, a.MigrationID)
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID)
err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID)
if err != nil {
return err
}
Expand Down
14 changes: 5 additions & 9 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/channels"
pkgerrors "github.com/pkg/errors"
"golang.org/x/exp/maps"
)

const (
Expand Down Expand Up @@ -2571,15 +2572,11 @@ func (db *Database) DataStoreNames() base.ScopeAndCollectionNames {

// GetCollectionIDs will return all collection IDs for all collections configured on the database
func (db *DatabaseContext) GetCollectionIDs() []uint32 {
collIDs := make([]uint32, 0)
for id, _ := range db.CollectionByID {
collIDs = append(collIDs, id)
}
return collIDs
return maps.Keys(db.CollectionByID)
}

// PurgeDCPMetadata will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0
func PurgeDCPMetadata(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, taskID string) error {
// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0
func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, taskID string) error {

bucket, err := base.AsGocbV2Bucket(database.Bucket)
if err != nil {
Expand All @@ -2591,8 +2588,7 @@ func PurgeDCPMetadata(ctx context.Context, database *DatabaseContext, metadataKe
}

datastore := database.MetadataStore
metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, metadataKeyPrefix)
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", taskID)
metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, checkpointPrefix)
metadata.Purge(ctx, base.DefaultNumWorkers)
return nil
}

0 comments on commit 273bedd

Please sign in to comment.