diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go index 732b8c966..48c456421 100644 --- a/p2p/kademlia/store/sqlite/meta_worker.go +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -2,6 +2,7 @@ package sqlite import ( "context" + "database/sql" "fmt" "os" @@ -284,7 +285,7 @@ func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) { return } - stmt, err := tx.Prepare("INSERT OR REPLACE INTO meta (key, last_access_time) VALUES (?, ?)") + stmt, err := tx.Prepare("INSERT OR REPLACE INTO meta (key, last_accessed) VALUES (?, ?)") if err != nil { tx.Rollback() // Roll back the transaction on error log.WithContext(ctx).WithError(err).Error("Error preparing statement (commitLastAccessedUpdates)") @@ -511,3 +512,115 @@ func (d *MigrationMetaStore) uploadInBatches(ctx context.Context, keys []string, return lastError } + +type MetaStoreInterface interface { + GetCountOfStaleData(ctx context.Context, staleTime time.Time) (int, error) + GetStaleDataInBatches(ctx context.Context, batchSize, batchNumber int, duration time.Time) ([]string, error) + GetPendingMigrationID(ctx context.Context) (int, error) + CreateNewMigration(ctx context.Context) (int, error) + InsertMetaMigrationData(ctx context.Context, migrationID int, keys []string) error +} + +// GetCountOfStaleData returns the count of stale data where last_accessed is 3 months before. +func (d *MigrationMetaStore) GetCountOfStaleData(ctx context.Context, staleTime time.Time) (int, error) { + var count int + query := `SELECT COUNT(*) FROM meta WHERE last_accessed < ?` + + err := d.db.GetContext(ctx, &count, query, staleTime) + if err != nil { + return 0, fmt.Errorf("failed to get count of stale data: %w", err) + } + return count, nil +} + +// GetStaleDataInBatches retrieves stale data entries in batches from the meta table. +func (d *MigrationMetaStore) GetStaleDataInBatches(ctx context.Context, batchSize, batchNumber int, duration time.Time) ([]string, error) { + offset := batchNumber * batchSize + + query := ` + SELECT key + FROM meta + WHERE last_accessed < ? + LIMIT ? OFFSET ? + ` + + rows, err := d.db.QueryxContext(ctx, query, duration, batchSize, offset) + if err != nil { + return nil, fmt.Errorf("failed to get stale data in batches: %w", err) + } + defer rows.Close() + + var staleData []string + for rows.Next() { + var data string + if err := rows.Scan(&data); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + staleData = append(staleData, data) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate over rows: %w", err) + } + return staleData, nil +} + +func (d *MigrationMetaStore) GetPendingMigrationID(ctx context.Context) (int, error) { + var migrationID int + query := `SELECT id FROM migration WHERE migration_started_at IS NULL LIMIT 1` + + err := d.db.GetContext(ctx, &migrationID, query) + if err == sql.ErrNoRows { + return 0, nil // No pending migrations + } else if err != nil { + return 0, fmt.Errorf("failed to get pending migration ID: %w", err) + } + + return migrationID, nil +} + +func (d *MigrationMetaStore) CreateNewMigration(ctx context.Context) (int, error) { + query := `INSERT INTO migration (created_at, updated_at) VALUES (?, ?)` + now := time.Now() + + result, err := d.db.ExecContext(ctx, query, now, now) + if err != nil { + return 0, fmt.Errorf("failed to create new migration: %w", err) + } + + migrationID, err := result.LastInsertId() + if err != nil { + return 0, fmt.Errorf("failed to get last insert ID: %w", err) + } + + return int(migrationID), nil +} + +func (d *MigrationMetaStore) InsertMetaMigrationData(ctx context.Context, migrationID int, keys []string) error { + tx, err := d.db.BeginTxx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + + stmt, err := tx.Preparex(`INSERT INTO meta_migration (key, migration_id, created_at, updated_at) VALUES (?, ?, ?, ?)`) + if err != nil { + tx.Rollback() + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + now := time.Now() + for _, key := range keys { + if _, err := stmt.Exec(key, migrationID, now, now); err != nil { + tx.Rollback() + return fmt.Errorf("failed to insert meta migration data: %w", err) + } + } + + if err := tx.Commit(); err != nil { + tx.Rollback() + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} diff --git a/supernode/services/metamigrator/migration_data_identifier.go b/supernode/services/metamigrator/migration_data_identifier.go new file mode 100644 index 000000000..9bdbafbbd --- /dev/null +++ b/supernode/services/metamigrator/migration_data_identifier.go @@ -0,0 +1,78 @@ +package metamigrator + +import ( + "context" + "fmt" + "github.com/pastelnetwork/gonode/common/log" + "time" +) + +const ( + batchSize = 10000 +) + +var ( + staleTime = time.Now().AddDate(0, -3, 0) +) + +func (task *MetaMigratorTask) IdentifyMigrationData(ctx context.Context) (err error) { + var migrationID int + + migrationID, err = task.service.metaStore.GetPendingMigrationID(ctx) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error retrieving pending migration") + return fmt.Errorf("failed to get pending migration ID: %w", err) + } + + if migrationID == 0 { + log.WithContext(ctx).Info("creating new migration") + + migrationID, err = task.service.metaStore.CreateNewMigration(ctx) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error creating new migration") + return fmt.Errorf("failed to create new migration: %w", err) + } + } + log.WithContext(ctx).WithField("migration_id", migrationID).Info("migration info has been sorted") + + totalCount, err := task.service.metaStore.GetCountOfStaleData(ctx, staleTime) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error retrieving stale data count") + return fmt.Errorf("failed to get count of stale data: %w", err) + } + + if totalCount == 0 { + log.WithContext(ctx).Info("no stale data found to migrate") + return nil + } + log.WithContext(ctx).WithField("total_keys", totalCount).Info("total-data that needs to migrate has been identified") + + numOfBatches := getNumOfBatches(totalCount) + log.WithContext(ctx).WithField("no_of_batches", numOfBatches).Info("batches required to store migration-meta has been calculated") + + for batchNo := 0; batchNo < numOfBatches; batchNo++ { + staleData, err := task.service.metaStore.GetStaleDataInBatches(ctx, batchSize, batchNo, staleTime) + if err != nil { + log.WithContext(ctx).Error("error retrieving batch of stale data") + return fmt.Errorf("failed to get stale data in batch %d: %w", batchNo, err) + } + + if err := task.service.metaStore.InsertMetaMigrationData(ctx, migrationID, staleData); err != nil { + log.WithContext(ctx).Error("error inserting batch of stale data to migration-meta") + return fmt.Errorf("failed to insert stale data for migration %d: %w", migrationID, err) + } + + log.WithContext(ctx).WithField("batch", batchNo).Debug("data added to migration-meta for migration") + } + + return nil +} + +func getNumOfBatches(totalCount int) int { + numBatches := totalCount / batchSize + if totalCount%batchSize != 0 { + numBatches++ + } + + return numBatches +} diff --git a/supernode/services/metamigrator/service.go b/supernode/services/metamigrator/service.go new file mode 100644 index 000000000..b8ed3457e --- /dev/null +++ b/supernode/services/metamigrator/service.go @@ -0,0 +1,59 @@ +package metamigrator + +import ( + "context" + "time" + + "github.com/pastelnetwork/gonode/common/log" + "github.com/pastelnetwork/gonode/common/utils" + "github.com/pastelnetwork/gonode/p2p/kademlia/store/sqlite" + "github.com/pastelnetwork/gonode/supernode/services/common" +) + +const ( + logPrefix = "MetaMigrator" + defaultMetaMigratorDataIdentifier = 24 * time.Hour + lowSpaceThresholdGB = 50 // in GB + +) + +// MetaMigratorService represents the MetaMigrator service. +type MetaMigratorService struct { + *common.SuperNodeService + metaStore *sqlite.MigrationMetaStore +} + +// Run starts the MetaMigrator service task +func (service *MetaMigratorService) Run(ctx context.Context) error { + for { + select { + case <-time.After(defaultMetaMigratorDataIdentifier): + + isLow, err := utils.CheckDiskSpace(lowSpaceThresholdGB) + if err != nil { + log.WithContext(ctx).WithField("method", "MetaMigratorService").WithError(err).Error("check disk space failed") + continue + } + + if !isLow { + continue + } + + task := NewMetaMigratorTask(service) + if err := task.IdentifyMigrationData(ctx); err != nil { + log.WithContext(ctx).WithError(err).Error("failed to identify migration data") + } + case <-ctx.Done(): + log.Println("Context done being called in IdentifyMigrationData loop in service.go") + return nil + } + } +} + +// NewService returns a new MetaMigratorService instance. +func NewService(metaStore *sqlite.MigrationMetaStore) *MetaMigratorService { + return &MetaMigratorService{ + SuperNodeService: common.NewSuperNodeService(nil, nil, nil), + metaStore: metaStore, + } +} diff --git a/supernode/services/metamigrator/task.go b/supernode/services/metamigrator/task.go new file mode 100644 index 000000000..9a5ec5e28 --- /dev/null +++ b/supernode/services/metamigrator/task.go @@ -0,0 +1,13 @@ +package metamigrator + +// MetaMigratorTask is the task of identifying migration data and then migrating that data to cloud. +type MetaMigratorTask struct { + service *MetaMigratorService +} + +// NewMetaMigratorTask returns a new MetaMigratorTask instance. +func NewMetaMigratorTask(service *MetaMigratorService) *MetaMigratorTask { + return &MetaMigratorTask{ + service: service, + } +}