Skip to content

Commit

Permalink
[PSL-1237] implements a service to identify the data to be migrated
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique committed Aug 8, 2024
1 parent fe7a60a commit 0fbffbc
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 1 deletion.
115 changes: 114 additions & 1 deletion p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sqlite

import (
"context"
"database/sql"
"fmt"

"os"
Expand Down Expand Up @@ -284,7 +285,7 @@ func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
return
}

stmt, err := tx.Prepare("INSERT OR REPLACE INTO meta (key, last_access_time) VALUES (?, ?)")
stmt, err := tx.Prepare("INSERT OR REPLACE INTO meta (key, last_accessed) VALUES (?, ?)")
if err != nil {
tx.Rollback() // Roll back the transaction on error
log.WithContext(ctx).WithError(err).Error("Error preparing statement (commitLastAccessedUpdates)")
Expand Down Expand Up @@ -511,3 +512,115 @@ func (d *MigrationMetaStore) uploadInBatches(ctx context.Context, keys []string,

return lastError
}

type MetaStoreInterface interface {
GetCountOfStaleData(ctx context.Context, staleTime time.Time) (int, error)
GetStaleDataInBatches(ctx context.Context, batchSize, batchNumber int, duration time.Time) ([]string, error)
GetPendingMigrationID(ctx context.Context) (int, error)
CreateNewMigration(ctx context.Context) (int, error)
InsertMetaMigrationData(ctx context.Context, migrationID int, keys []string) error
}

// GetCountOfStaleData returns the count of stale data where last_accessed is 3 months before.
func (d *MigrationMetaStore) GetCountOfStaleData(ctx context.Context, staleTime time.Time) (int, error) {
var count int
query := `SELECT COUNT(*) FROM meta WHERE last_accessed < ?`

err := d.db.GetContext(ctx, &count, query, staleTime)
if err != nil {
return 0, fmt.Errorf("failed to get count of stale data: %w", err)
}
return count, nil
}

// GetStaleDataInBatches retrieves stale data entries in batches from the meta table.
func (d *MigrationMetaStore) GetStaleDataInBatches(ctx context.Context, batchSize, batchNumber int, duration time.Time) ([]string, error) {
offset := batchNumber * batchSize

query := `
SELECT key
FROM meta
WHERE last_accessed < ?
LIMIT ? OFFSET ?
`

rows, err := d.db.QueryxContext(ctx, query, duration, batchSize, offset)
if err != nil {
return nil, fmt.Errorf("failed to get stale data in batches: %w", err)
}
defer rows.Close()

var staleData []string
for rows.Next() {
var data string
if err := rows.Scan(&data); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
staleData = append(staleData, data)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over rows: %w", err)
}
return staleData, nil
}

func (d *MigrationMetaStore) GetPendingMigrationID(ctx context.Context) (int, error) {
var migrationID int
query := `SELECT id FROM migration WHERE migration_started_at IS NULL LIMIT 1`

err := d.db.GetContext(ctx, &migrationID, query)
if err == sql.ErrNoRows {
return 0, nil // No pending migrations
} else if err != nil {
return 0, fmt.Errorf("failed to get pending migration ID: %w", err)
}

return migrationID, nil
}

func (d *MigrationMetaStore) CreateNewMigration(ctx context.Context) (int, error) {
query := `INSERT INTO migration (created_at, updated_at) VALUES (?, ?)`
now := time.Now()

result, err := d.db.ExecContext(ctx, query, now, now)
if err != nil {
return 0, fmt.Errorf("failed to create new migration: %w", err)
}

migrationID, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("failed to get last insert ID: %w", err)
}

return int(migrationID), nil
}

func (d *MigrationMetaStore) InsertMetaMigrationData(ctx context.Context, migrationID int, keys []string) error {
tx, err := d.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}

stmt, err := tx.Preparex(`INSERT INTO meta_migration (key, migration_id, created_at, updated_at) VALUES (?, ?, ?, ?)`)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()

now := time.Now()
for _, key := range keys {
if _, err := stmt.Exec(key, migrationID, now, now); err != nil {
tx.Rollback()
return fmt.Errorf("failed to insert meta migration data: %w", err)
}
}

if err := tx.Commit(); err != nil {
tx.Rollback()
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}
78 changes: 78 additions & 0 deletions supernode/services/metamigrator/migration_data_identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package metamigrator

import (
"context"
"fmt"
"github.com/pastelnetwork/gonode/common/log"
"time"
)

const (
batchSize = 10000
)

var (
staleTime = time.Now().AddDate(0, -3, 0)
)

func (task *MetaMigratorTask) IdentifyMigrationData(ctx context.Context) (err error) {
var migrationID int

migrationID, err = task.service.metaStore.GetPendingMigrationID(ctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error retrieving pending migration")
return fmt.Errorf("failed to get pending migration ID: %w", err)
}

if migrationID == 0 {
log.WithContext(ctx).Info("creating new migration")

migrationID, err = task.service.metaStore.CreateNewMigration(ctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error creating new migration")
return fmt.Errorf("failed to create new migration: %w", err)
}
}
log.WithContext(ctx).WithField("migration_id", migrationID).Info("migration info has been sorted")

totalCount, err := task.service.metaStore.GetCountOfStaleData(ctx, staleTime)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error retrieving stale data count")
return fmt.Errorf("failed to get count of stale data: %w", err)
}

if totalCount == 0 {
log.WithContext(ctx).Info("no stale data found to migrate")
return nil
}
log.WithContext(ctx).WithField("total_keys", totalCount).Info("total-data that needs to migrate has been identified")

numOfBatches := getNumOfBatches(totalCount)
log.WithContext(ctx).WithField("no_of_batches", numOfBatches).Info("batches required to store migration-meta has been calculated")

for batchNo := 0; batchNo < numOfBatches; batchNo++ {
staleData, err := task.service.metaStore.GetStaleDataInBatches(ctx, batchSize, batchNo, staleTime)
if err != nil {
log.WithContext(ctx).Error("error retrieving batch of stale data")
return fmt.Errorf("failed to get stale data in batch %d: %w", batchNo, err)
}

if err := task.service.metaStore.InsertMetaMigrationData(ctx, migrationID, staleData); err != nil {
log.WithContext(ctx).Error("error inserting batch of stale data to migration-meta")
return fmt.Errorf("failed to insert stale data for migration %d: %w", migrationID, err)
}

log.WithContext(ctx).WithField("batch", batchNo).Debug("data added to migration-meta for migration")
}

return nil
}

func getNumOfBatches(totalCount int) int {
numBatches := totalCount / batchSize
if totalCount%batchSize != 0 {
numBatches++
}

return numBatches
}
59 changes: 59 additions & 0 deletions supernode/services/metamigrator/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package metamigrator

import (
"context"
"time"

"github.com/pastelnetwork/gonode/common/log"
"github.com/pastelnetwork/gonode/common/utils"
"github.com/pastelnetwork/gonode/p2p/kademlia/store/sqlite"
"github.com/pastelnetwork/gonode/supernode/services/common"
)

const (
logPrefix = "MetaMigrator"
defaultMetaMigratorDataIdentifier = 24 * time.Hour
lowSpaceThresholdGB = 50 // in GB

)

// MetaMigratorService represents the MetaMigrator service.
type MetaMigratorService struct {
*common.SuperNodeService
metaStore *sqlite.MigrationMetaStore
}

// Run starts the MetaMigrator service task
func (service *MetaMigratorService) Run(ctx context.Context) error {
for {
select {
case <-time.After(defaultMetaMigratorDataIdentifier):

isLow, err := utils.CheckDiskSpace(lowSpaceThresholdGB)
if err != nil {
log.WithContext(ctx).WithField("method", "MetaMigratorService").WithError(err).Error("check disk space failed")
continue
}

if !isLow {
continue
}

task := NewMetaMigratorTask(service)
if err := task.IdentifyMigrationData(ctx); err != nil {
log.WithContext(ctx).WithError(err).Error("failed to identify migration data")
}
case <-ctx.Done():
log.Println("Context done being called in IdentifyMigrationData loop in service.go")
return nil
}
}
}

// NewService returns a new MetaMigratorService instance.
func NewService(metaStore *sqlite.MigrationMetaStore) *MetaMigratorService {
return &MetaMigratorService{
SuperNodeService: common.NewSuperNodeService(nil, nil, nil),
metaStore: metaStore,
}
}
13 changes: 13 additions & 0 deletions supernode/services/metamigrator/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package metamigrator

// MetaMigratorTask is the task of identifying migration data and then migrating that data to cloud.
type MetaMigratorTask struct {
service *MetaMigratorService
}

// NewMetaMigratorTask returns a new MetaMigratorTask instance.
func NewMetaMigratorTask(service *MetaMigratorService) *MetaMigratorTask {
return &MetaMigratorTask{
service: service,
}
}

0 comments on commit 0fbffbc

Please sign in to comment.