diff --git a/p2p/kademlia/store/cloud.go/cloud.go b/p2p/kademlia/store/cloud.go/cloud.go index 0e2256d72..def88ba51 100644 --- a/p2p/kademlia/store/cloud.go/cloud.go +++ b/p2p/kademlia/store/cloud.go/cloud.go @@ -235,3 +235,41 @@ func (r *RcloneStorage) CheckCloudConnection() error { return nil } + +// DeleteBatch deletes a batch of files from the cloud identified by their keys. +func (r *RcloneStorage) DeleteBatch(keys []string) error { + var wg sync.WaitGroup + semaphore := make(chan struct{}, maxConcurrentUploads) // Use same max concurrency as uploads + + var mu sync.Mutex + var lastError error + + for _, key := range keys { + wg.Add(1) + semaphore <- struct{}{} // Acquire semaphore to limit concurrency + + go func(key string) { + defer wg.Done() + defer func() { <-semaphore }() // Ensure to release semaphore + + // Construct the remote path where the file is stored + remotePath := fmt.Sprintf("%s:%s/%s", r.specName, r.bucketName, key) + + // Use rclone to delete the file from the remote + cmd := exec.Command("rclone", "deletefile", remotePath) + + if err := cmd.Run(); err != nil { + mu.Lock() + if lastError == nil { + lastError = fmt.Errorf("failed to delete some files") + } + lastError = fmt.Errorf("%v; key %s error: %v", lastError, key, err) + mu.Unlock() + } + }(key) + } + + wg.Wait() // Wait for all goroutines to finish + + return lastError // Return the last error encountered, if any +} diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go index d129e5f60..c97e63d76 100644 --- a/p2p/kademlia/store/sqlite/meta_worker.go +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "strings" "os" "path" @@ -113,6 +114,7 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor go handler.startLastAccessedUpdateWorker(ctx) go handler.startInsertWorker(ctx) go handler.startMigrationExecutionWorker(ctx) + log.WithContext(ctx).Info("MigrationMetaStore workers started") return handler, nil } @@ -192,16 +194,21 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error { return err } + log.WithContext(ctx).WithField("offset", offset).Info("Syncing meta with p2p data store") var batchUpdates []UpdateMessage found := false for rows.Next() { found = true var r Record + var t *time.Time - if err := rows.Scan(&r.Key, &r.Data, &r.UpdatedAt); err != nil { + if err := rows.Scan(&r.Key, &r.Data, &t); err != nil { log.WithContext(ctx).WithError(err).Error("Error scanning row from p2p data store") continue } + if t != nil { + r.UpdatedAt = *t + } dataSize := len(r.Data) batchUpdates = append(batchUpdates, UpdateMessage{ @@ -272,7 +279,6 @@ func (d *MigrationMetaStore) startLastAccessedUpdateWorker(ctx context.Context) d.commitLastAccessedUpdates(ctx) case <-ctx.Done(): log.WithContext(ctx).Info("Shutting down last accessed update worker") - d.commitLastAccessedUpdates(ctx) // Commit any remaining updates before shutdown return } } @@ -419,6 +425,21 @@ func (d *MigrationMetaStore) startMigrationExecutionWorker(ctx context.Context) } } +type Migration struct { + ID int `db:"id"` + MigrationStartedAt sql.NullTime `db:"migration_started_at"` + MigrationFinishedAt sql.NullTime `db:"migration_finished_at"` +} +type Migrations []Migration + +type MigrationKey struct { + Key string `db:"key"` + MigrationID int `db:"migration_id"` + IsMigrated bool `db:"is_migrated"` +} + +type MigrationKeys []MigrationKey + func (d *MigrationMetaStore) checkAndExecuteMigration(ctx context.Context) { // Check the available disk space isLow, err := utils.CheckDiskSpace(lowSpaceThresholdGB) @@ -426,56 +447,113 @@ func (d *MigrationMetaStore) checkAndExecuteMigration(ctx context.Context) { log.WithContext(ctx).WithError(err).Error("migration worker: check disk space failed") } - if !isLow { - // Disk space is sufficient, stop migration - return - } + //if !isLow { + // Disk space is sufficient, stop migration + //return + //} + log.WithContext(ctx).WithField("islow", isLow).Info("Starting data migration") // Step 1: Fetch pending migrations - var migrations []struct { - ID int `db:"id"` - } + var migrations Migrations - err = d.db.Select(&migrations, `SELECT id FROM migration WHERE migration_started_at IS NULL`) + err = d.db.Select(&migrations, `SELECT id FROM migration WHERE migration_started_at IS NULL or migration_finished_at IS NULL`) if err != nil { log.WithContext(ctx).WithError(err).Error("Failed to fetch pending migrations") return } + log.WithContext(ctx).WithField("count", len(migrations)).Info("Fetched pending migrations") - // Step 2: Iterate over each migration + // Iterate over each migration for _, migration := range migrations { - var keys []string - err := d.db.Select(&keys, `SELECT key FROM meta_migration WHERE migration_id = ?`, migration.ID) - if err != nil { - log.WithContext(ctx).WithError(err).Error("Failed to fetch keys for migration") + log.WithContext(ctx).WithField("migration_id", migration.ID).Info("Processing migration") + + if err := d.processMigrationInBatches(ctx, migration); err != nil { + log.WithContext(ctx).WithError(err).WithField("migration_id", migration.ID).Error("Failed to process migration") continue } + } +} - // Step 2.1: Check if there are at least 100 records - if len(keys) < minKeysToMigrate { - continue +func (d *MigrationMetaStore) processMigrationInBatches(ctx context.Context, migration Migration) error { + var migKeys MigrationKeys + err := d.db.Select(&migKeys, `SELECT key FROM meta_migration WHERE migration_id = ?`, migration.ID) + if err != nil { + return fmt.Errorf("failed to fetch keys for migration %d: %w", migration.ID, err) + } + + totalKeys := len(migKeys) + if totalKeys == 0 { + return nil + } + + if totalKeys < minKeysToMigrate { + log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("keys-count", totalKeys).Info("Skipping migration due to insufficient keys") + return nil + } + + migratedKeys := 0 + var keys []string + for _, key := range migKeys { + if key.IsMigrated { + migratedKeys++ + } else { + keys = append(keys, key.Key) } + } - // Step 2.2: Update migration_started_at to current timestamp - _, err = d.db.Exec(`UPDATE migration SET migration_started_at = ? WHERE id = ?`, time.Now(), migration.ID) - if err != nil { - log.WithContext(ctx).WithError(err).Error("Failed to update migration start time") - continue + nonMigratedKeys := len(keys) + maxBatchSize := 5000 + for start := 0; start < nonMigratedKeys; start += maxBatchSize { + end := start + maxBatchSize + if end > nonMigratedKeys { + end = nonMigratedKeys } - // Step 2.3: Retrieve data for keys - values, _, err := retrieveBatchValues(ctx, d.p2pDataStore, keys, false, Store{}) - if err != nil { - log.WithContext(ctx).WithError(err).Error("Failed to retrieve batch values") - continue + batchKeys := keys[start:end] + + // Mark migration as started + if start == 0 && !migration.MigrationStartedAt.Valid { + if _, err := d.db.Exec(`UPDATE migration SET migration_started_at = ? WHERE id = ?`, time.Now(), migration.ID); err != nil { + return err + } } - if err := d.uploadInBatches(ctx, keys, values); err != nil { - log.WithContext(ctx).WithError(err).Error("Failed to migrate some data") - continue + // Retrieve and upload data for current batch + if err := d.processSingleBatch(ctx, batchKeys); err != nil { + log.WithContext(ctx).WithError(err).WithField("migration_id", migration.ID).WithField("batch-keys-count", len(batchKeys)).Error("Failed to process batch") + return fmt.Errorf("failed to process batch: %w - exiting now", err) } + } + // Mark migration as finished if all keys are migrated + var leftMigKeys MigrationKeys + err = d.db.Select(&leftMigKeys, `SELECT key FROM meta_migration WHERE migration_id = ? and is_migrated = ?`, migration.ID, false) + if err != nil { + return fmt.Errorf("failed to fetch keys for migration %d: %w", migration.ID, err) } + + if len(leftMigKeys) == 0 { + if _, err := d.db.Exec(`UPDATE migration SET migration_finished_at = ? WHERE id = ?`, time.Now(), migration.ID); err != nil { + return fmt.Errorf("failed to mark migration %d as finished: %w", migration.ID, err) + } + } + + log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("tota-keys-count", totalKeys).WithField("migrated_in_current_iteration", nonMigratedKeys).Info("Migration processed successfully") + + return nil +} + +func (d *MigrationMetaStore) processSingleBatch(ctx context.Context, keys []string) error { + values, nkeys, err := d.retrieveBatchValuesToMigrate(ctx, keys) + if err != nil { + return fmt.Errorf("failed to retrieve batch values: %w", err) + } + + if err := d.uploadInBatches(ctx, nkeys, values); err != nil { + return fmt.Errorf("failed to upload batch values: %w", err) + } + + return nil } func (d *MigrationMetaStore) uploadInBatches(ctx context.Context, keys []string, values [][]byte) error { @@ -501,12 +579,18 @@ func (d *MigrationMetaStore) uploadInBatches(ctx context.Context, keys []string, continue } - if err := batchDeleteRecords(d.p2pDataStore, uploadedKeys); err != nil { + if err := batchSetMigratedRecords(d.p2pDataStore, uploadedKeys); err != nil { log.WithContext(ctx).WithError(err).Errorf("Failed to delete batch %d", i+1) lastError = err continue } + if err := d.batchSetMigrated(uploadedKeys); err != nil { + log.WithContext(ctx).WithError(err).Errorf("Failed to batch is_migrated %d", i+1) + lastError = err + continue + } + log.WithContext(ctx).Infof("Successfully uploaded and deleted records for batch %d of %d", i+1, batches) } @@ -624,3 +708,74 @@ func (d *MigrationMetaStore) InsertMetaMigrationData(ctx context.Context, migrat return nil } + +func (d *MigrationMetaStore) batchSetMigrated(keys []string) error { + if len(keys) == 0 { + log.P2P().Info("no keys provided for batch update (is_migrated)") + return nil + } + + // Create a parameter string for SQL query (?, ?, ?, ...) + paramStr := strings.Repeat("?,", len(keys)-1) + "?" + + // Create the SQL statement + query := fmt.Sprintf("UPDATE meta_migration set is_migrated = true WHERE key IN (%s)", paramStr) + + // Execute the query + res, err := d.db.Exec(query, stringArgsToInterface(keys)...) + if err != nil { + return fmt.Errorf("cannot batch update records (is_migrated): %w", err) + } + + // Optionally check rows affected + if rowsAffected, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected for batch update(is_migrated): %w", err) + } else if rowsAffected == 0 { + return fmt.Errorf("no rows affected for batch update (is_migrated)") + } + + return nil +} + +func (d *MigrationMetaStore) retrieveBatchValuesToMigrate(ctx context.Context, keys []string) ([][]byte, []string, error) { + if len(keys) == 0 { + return [][]byte{}, []string{}, nil // Return empty if no keys provided + } + + placeholders := make([]string, len(keys)) + args := make([]interface{}, len(keys)) + for i, key := range keys { + placeholders[i] = "?" + args[i] = key + } + + query := fmt.Sprintf(`SELECT key, data, is_on_cloud FROM data WHERE key IN (%s) AND is_on_cloud = false`, strings.Join(placeholders, ",")) + rows, err := d.p2pDataStore.QueryContext(ctx, query, args...) + if err != nil { + return nil, nil, fmt.Errorf("failed to retrieve records: %w", err) + } + defer rows.Close() + + // Assume pre-allocation for efficiency + values := make([][]byte, 0, len(keys)) + foundKeys := make([]string, 0, len(keys)) + + for rows.Next() { + var key string + var value []byte + var isOnCloud bool + if err := rows.Scan(&key, &value, &isOnCloud); err != nil { + return nil, nil, fmt.Errorf("failed to scan key and value: %w", err) + } + if len(value) > 1 && !isOnCloud { + values = append(values, value) + foundKeys = append(foundKeys, key) + } + } + + if err := rows.Err(); err != nil { + return nil, foundKeys, fmt.Errorf("rows processing error: %w", err) + } + + return values, foundKeys, nil +} diff --git a/p2p/kademlia/store/sqlite/sqlite.go b/p2p/kademlia/store/sqlite/sqlite.go index b2cd3e6a6..a9e23bdf7 100644 --- a/p2p/kademlia/store/sqlite/sqlite.go +++ b/p2p/kademlia/store/sqlite/sqlite.go @@ -514,7 +514,7 @@ func (s *Store) storeRecord(key []byte, value []byte, typ int, isOriginal bool) operation := func() error { now := time.Now().UTC() r := Record{Key: hkey, Data: value, UpdatedAt: now, Datatype: typ, Isoriginal: isOriginal, CreatedAt: now} - res, err := s.db.NamedExec(`INSERT INTO data(key, data, datatype, is_original, createdAt, updatedAt) values(:key, :data, :datatype, :isoriginal, :createdat, :updatedat) ON CONFLICT(key) DO UPDATE SET data=:data,updatedAt=:updatedat`, r) + res, err := s.db.NamedExec(`INSERT INTO data(key, data, datatype, is_original, createdAt, updatedAt, is_on_cloud) values(:key, :data, :datatype, :isoriginal, :createdat, :updatedat, false) ON CONFLICT(key) DO UPDATE SET data=:data,updatedAt=:updatedat,is_on_cloud=false`, r) if err != nil { return fmt.Errorf("cannot insert or update record with key %s: %w", hkey, err) } @@ -554,7 +554,7 @@ func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) erro } // Prepare insert statement - stmt, err := tx.PrepareNamed(`INSERT INTO data(key, data, datatype, is_original, createdAt, updatedAt) values(:key, :data, :datatype, :isoriginal, :createdat, :updatedat) ON CONFLICT(key) DO UPDATE SET data=:data,updatedAt=:updatedat`) + stmt, err := tx.PrepareNamed(`INSERT INTO data(key, data, datatype, is_original, createdAt, updatedAt, is_on_cloud) values(:key, :data, :datatype, :isoriginal, :createdat, :updatedat, false) ON CONFLICT(key) DO UPDATE SET data=:data,updatedAt=:updatedat,is_on_cloud=false`) if err != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { return fmt.Errorf("statement preparation failed, rollback failed: %v, original error: %w", rollbackErr, err) @@ -781,3 +781,31 @@ func stringArgsToInterface(args []string) []interface{} { } return iargs } + +func batchSetMigratedRecords(db *sqlx.DB, keys []string) error { + if len(keys) == 0 { + log.P2P().Info("no keys provided for batch update (migrated)") + return nil + } + + // Create a parameter string for SQL query (?, ?, ?, ...) + paramStr := strings.Repeat("?,", len(keys)-1) + "?" + + // Create the SQL statement + query := fmt.Sprintf("UPDATE data set data = X'', is_on_cloud = true WHERE key IN (%s)", paramStr) + + // Execute the query + res, err := db.Exec(query, stringArgsToInterface(keys)...) + if err != nil { + return fmt.Errorf("cannot batch update records (migrated): %w", err) + } + + // Optionally check rows affected + if rowsAffected, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected for batch update(migrated): %w", err) + } else if rowsAffected == 0 { + return fmt.Errorf("no rows affected for batch update (migrated)") + } + + return nil +} diff --git a/walletnode/go.mod b/walletnode/go.mod index b8ea5e4ce..112d37fb8 100644 --- a/walletnode/go.mod +++ b/walletnode/go.mod @@ -19,7 +19,6 @@ require ( github.com/json-iterator/go v1.1.12 github.com/pastelnetwork/gonode/common v0.0.0-20240229105633-1f295fe18563 github.com/pastelnetwork/gonode/mixins v0.0.0-00010101000000-000000000000 - github.com/pastelnetwork/gonode/p2p v0.0.0-00010101000000-000000000000 github.com/pastelnetwork/gonode/pastel v0.0.0-00010101000000-000000000000 github.com/pastelnetwork/gonode/proto v0.0.0-00010101000000-000000000000 github.com/pastelnetwork/gonode/raptorq v0.0.0-00010101000000-000000000000