Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PSL-1240] Implement batching, Fix bugs in migration execution worker #920

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions p2p/kademlia/store/cloud.go/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,41 @@ func (r *RcloneStorage) CheckCloudConnection() error {

return nil
}

// DeleteBatch deletes a batch of files from the cloud identified by their keys.
func (r *RcloneStorage) DeleteBatch(keys []string) error {
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxConcurrentUploads) // Use same max concurrency as uploads

var mu sync.Mutex
var lastError error

for _, key := range keys {
wg.Add(1)
semaphore <- struct{}{} // Acquire semaphore to limit concurrency

go func(key string) {
defer wg.Done()
defer func() { <-semaphore }() // Ensure to release semaphore

// Construct the remote path where the file is stored
remotePath := fmt.Sprintf("%s:%s/%s", r.specName, r.bucketName, key)

// Use rclone to delete the file from the remote
cmd := exec.Command("rclone", "deletefile", remotePath)

if err := cmd.Run(); err != nil {
mu.Lock()
if lastError == nil {
lastError = fmt.Errorf("failed to delete some files")
}
lastError = fmt.Errorf("%v; key %s error: %v", lastError, key, err)
mu.Unlock()
}
}(key)
}

wg.Wait() // Wait for all goroutines to finish

return lastError // Return the last error encountered, if any
}
219 changes: 187 additions & 32 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"strings"

"os"
"path"
Expand Down Expand Up @@ -113,6 +114,7 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
go handler.startLastAccessedUpdateWorker(ctx)
go handler.startInsertWorker(ctx)
go handler.startMigrationExecutionWorker(ctx)
log.WithContext(ctx).Info("MigrationMetaStore workers started")

return handler, nil
}
Expand Down Expand Up @@ -192,16 +194,21 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
return err
}

log.WithContext(ctx).WithField("offset", offset).Info("Syncing meta with p2p data store")
var batchUpdates []UpdateMessage
found := false
for rows.Next() {
found = true
var r Record
var t *time.Time

if err := rows.Scan(&r.Key, &r.Data, &r.UpdatedAt); err != nil {
if err := rows.Scan(&r.Key, &r.Data, &t); err != nil {
log.WithContext(ctx).WithError(err).Error("Error scanning row from p2p data store")
continue
}
if t != nil {
r.UpdatedAt = *t
}

dataSize := len(r.Data)
batchUpdates = append(batchUpdates, UpdateMessage{
Expand Down Expand Up @@ -272,7 +279,6 @@ func (d *MigrationMetaStore) startLastAccessedUpdateWorker(ctx context.Context)
d.commitLastAccessedUpdates(ctx)
case <-ctx.Done():
log.WithContext(ctx).Info("Shutting down last accessed update worker")
d.commitLastAccessedUpdates(ctx) // Commit any remaining updates before shutdown
return
}
}
Expand Down Expand Up @@ -419,63 +425,135 @@ func (d *MigrationMetaStore) startMigrationExecutionWorker(ctx context.Context)
}
}

type Migration struct {
ID int `db:"id"`
MigrationStartedAt sql.NullTime `db:"migration_started_at"`
MigrationFinishedAt sql.NullTime `db:"migration_finished_at"`
}
type Migrations []Migration

type MigrationKey struct {
Key string `db:"key"`
MigrationID int `db:"migration_id"`
IsMigrated bool `db:"is_migrated"`
}

type MigrationKeys []MigrationKey

func (d *MigrationMetaStore) checkAndExecuteMigration(ctx context.Context) {
// Check the available disk space
isLow, err := utils.CheckDiskSpace(lowSpaceThresholdGB)
if err != nil {
log.WithContext(ctx).WithError(err).Error("migration worker: check disk space failed")
}

if !isLow {
// Disk space is sufficient, stop migration
return
}
//if !isLow {
// Disk space is sufficient, stop migration
//return
//}

log.WithContext(ctx).WithField("islow", isLow).Info("Starting data migration")
// Step 1: Fetch pending migrations
var migrations []struct {
ID int `db:"id"`
}
var migrations Migrations

err = d.db.Select(&migrations, `SELECT id FROM migration WHERE migration_started_at IS NULL`)
err = d.db.Select(&migrations, `SELECT id FROM migration WHERE migration_started_at IS NULL or migration_finished_at IS NULL`)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Failed to fetch pending migrations")
return
}
log.WithContext(ctx).WithField("count", len(migrations)).Info("Fetched pending migrations")

// Step 2: Iterate over each migration
// Iterate over each migration
for _, migration := range migrations {
var keys []string
err := d.db.Select(&keys, `SELECT key FROM meta_migration WHERE migration_id = ?`, migration.ID)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Failed to fetch keys for migration")
log.WithContext(ctx).WithField("migration_id", migration.ID).Info("Processing migration")

if err := d.processMigrationInBatches(ctx, migration); err != nil {
log.WithContext(ctx).WithError(err).WithField("migration_id", migration.ID).Error("Failed to process migration")
continue
}
}
}

// Step 2.1: Check if there are at least 100 records
if len(keys) < minKeysToMigrate {
continue
func (d *MigrationMetaStore) processMigrationInBatches(ctx context.Context, migration Migration) error {
var migKeys MigrationKeys
err := d.db.Select(&migKeys, `SELECT key FROM meta_migration WHERE migration_id = ?`, migration.ID)
if err != nil {
return fmt.Errorf("failed to fetch keys for migration %d: %w", migration.ID, err)
}

totalKeys := len(migKeys)
if totalKeys == 0 {
return nil
}

if totalKeys < minKeysToMigrate {
log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("keys-count", totalKeys).Info("Skipping migration due to insufficient keys")
return nil
}

migratedKeys := 0
var keys []string
for _, key := range migKeys {
if key.IsMigrated {
migratedKeys++
} else {
keys = append(keys, key.Key)
}
}

// Step 2.2: Update migration_started_at to current timestamp
_, err = d.db.Exec(`UPDATE migration SET migration_started_at = ? WHERE id = ?`, time.Now(), migration.ID)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Failed to update migration start time")
continue
nonMigratedKeys := len(keys)
maxBatchSize := 5000
for start := 0; start < nonMigratedKeys; start += maxBatchSize {
end := start + maxBatchSize
if end > nonMigratedKeys {
end = nonMigratedKeys
}

// Step 2.3: Retrieve data for keys
values, _, err := retrieveBatchValues(ctx, d.p2pDataStore, keys, false, Store{})
if err != nil {
log.WithContext(ctx).WithError(err).Error("Failed to retrieve batch values")
continue
batchKeys := keys[start:end]

// Mark migration as started
if start == 0 && !migration.MigrationStartedAt.Valid {
if _, err := d.db.Exec(`UPDATE migration SET migration_started_at = ? WHERE id = ?`, time.Now(), migration.ID); err != nil {
return err
}
}

if err := d.uploadInBatches(ctx, keys, values); err != nil {
log.WithContext(ctx).WithError(err).Error("Failed to migrate some data")
continue
// Retrieve and upload data for current batch
if err := d.processSingleBatch(ctx, batchKeys); err != nil {
log.WithContext(ctx).WithError(err).WithField("migration_id", migration.ID).WithField("batch-keys-count", len(batchKeys)).Error("Failed to process batch")
return fmt.Errorf("failed to process batch: %w - exiting now", err)
}
}

// Mark migration as finished if all keys are migrated
var leftMigKeys MigrationKeys
err = d.db.Select(&leftMigKeys, `SELECT key FROM meta_migration WHERE migration_id = ? and is_migrated = ?`, migration.ID, false)
if err != nil {
return fmt.Errorf("failed to fetch keys for migration %d: %w", migration.ID, err)
}

if len(leftMigKeys) == 0 {
if _, err := d.db.Exec(`UPDATE migration SET migration_finished_at = ? WHERE id = ?`, time.Now(), migration.ID); err != nil {
return fmt.Errorf("failed to mark migration %d as finished: %w", migration.ID, err)
}
}

log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("tota-keys-count", totalKeys).WithField("migrated_in_current_iteration", nonMigratedKeys).Info("Migration processed successfully")

return nil
}

func (d *MigrationMetaStore) processSingleBatch(ctx context.Context, keys []string) error {
values, nkeys, err := d.retrieveBatchValuesToMigrate(ctx, keys)
if err != nil {
return fmt.Errorf("failed to retrieve batch values: %w", err)
}

if err := d.uploadInBatches(ctx, nkeys, values); err != nil {
return fmt.Errorf("failed to upload batch values: %w", err)
}

return nil
}

func (d *MigrationMetaStore) uploadInBatches(ctx context.Context, keys []string, values [][]byte) error {
Expand All @@ -501,12 +579,18 @@ func (d *MigrationMetaStore) uploadInBatches(ctx context.Context, keys []string,
continue
}

if err := batchDeleteRecords(d.p2pDataStore, uploadedKeys); err != nil {
if err := batchSetMigratedRecords(d.p2pDataStore, uploadedKeys); err != nil {
log.WithContext(ctx).WithError(err).Errorf("Failed to delete batch %d", i+1)
lastError = err
continue
}

if err := d.batchSetMigrated(uploadedKeys); err != nil {
log.WithContext(ctx).WithError(err).Errorf("Failed to batch is_migrated %d", i+1)
lastError = err
continue
}

log.WithContext(ctx).Infof("Successfully uploaded and deleted records for batch %d of %d", i+1, batches)
}

Expand Down Expand Up @@ -624,3 +708,74 @@ func (d *MigrationMetaStore) InsertMetaMigrationData(ctx context.Context, migrat

return nil
}

func (d *MigrationMetaStore) batchSetMigrated(keys []string) error {
if len(keys) == 0 {
log.P2P().Info("no keys provided for batch update (is_migrated)")
return nil
}

// Create a parameter string for SQL query (?, ?, ?, ...)
paramStr := strings.Repeat("?,", len(keys)-1) + "?"

// Create the SQL statement
query := fmt.Sprintf("UPDATE meta_migration set is_migrated = true WHERE key IN (%s)", paramStr)

// Execute the query
res, err := d.db.Exec(query, stringArgsToInterface(keys)...)
if err != nil {
return fmt.Errorf("cannot batch update records (is_migrated): %w", err)
}

// Optionally check rows affected
if rowsAffected, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected for batch update(is_migrated): %w", err)
} else if rowsAffected == 0 {
return fmt.Errorf("no rows affected for batch update (is_migrated)")
}

return nil
}

func (d *MigrationMetaStore) retrieveBatchValuesToMigrate(ctx context.Context, keys []string) ([][]byte, []string, error) {
if len(keys) == 0 {
return [][]byte{}, []string{}, nil // Return empty if no keys provided
}

placeholders := make([]string, len(keys))
args := make([]interface{}, len(keys))
for i, key := range keys {
placeholders[i] = "?"
args[i] = key
}

query := fmt.Sprintf(`SELECT key, data, is_on_cloud FROM data WHERE key IN (%s) AND is_on_cloud = false`, strings.Join(placeholders, ","))
rows, err := d.p2pDataStore.QueryContext(ctx, query, args...)
if err != nil {
return nil, nil, fmt.Errorf("failed to retrieve records: %w", err)
}
defer rows.Close()

// Assume pre-allocation for efficiency
values := make([][]byte, 0, len(keys))
foundKeys := make([]string, 0, len(keys))

for rows.Next() {
var key string
var value []byte
var isOnCloud bool
if err := rows.Scan(&key, &value, &isOnCloud); err != nil {
return nil, nil, fmt.Errorf("failed to scan key and value: %w", err)
}
if len(value) > 1 && !isOnCloud {
values = append(values, value)
foundKeys = append(foundKeys, key)
}
}

if err := rows.Err(); err != nil {
return nil, foundKeys, fmt.Errorf("rows processing error: %w", err)
}

return values, foundKeys, nil
}
Loading
Loading