Skip to content

Commit

Permalink
[PSL-1259] improvements made in meta migration workers
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique committed Aug 28, 2024
1 parent 92bc759 commit 7ad605e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 32 deletions.
2 changes: 1 addition & 1 deletion common/storage/migratemeta/migrate_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/pastelnetwork/gonode/p2p/kademlia/store/sqlite"
)

func migrateKeys(ctx context.Context, p2p p2p.P2P, ph *mixins.PastelHandler, metaMigratorStore *sqlite.MigrationMetaStore, txid string) error {
func MigrateKeys(ctx context.Context, p2p p2p.P2P, ph *mixins.PastelHandler, metaMigratorStore *sqlite.MigrationMetaStore, txid string) error {
time.Sleep(5 * time.Minute)

logger := log.WithContext(ctx).WithField("txid", txid)
Expand Down
54 changes: 26 additions & 28 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ type UpdateMessage struct {

// MigrationMetaStore manages database operations.
type MigrationMetaStore struct {
db *sqlx.DB
p2pDataStore *sqlx.DB
cloud cloud.Storage
db *sqlx.DB
p2pDataStore *sqlx.DB
cloud cloud.Storage
isSyncInProgress bool

updateTicker *time.Ticker
insertTicker *time.Ticker
Expand Down Expand Up @@ -110,10 +111,13 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor

go func() {
if handler.isMetaSyncRequired() {
handler.isSyncInProgress = true
err := handler.syncMetaWithData(ctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data")
}

handler.isSyncInProgress = false
}
}()

Expand Down Expand Up @@ -229,35 +233,36 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
data_size = EXCLUDED.data_size,
access_count = access_count + 1`

for {
continueProcessing := true
for continueProcessing {
rows, err := d.p2pDataStore.Queryx(query, metaSyncBatchSize, offset)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error querying p2p data store")
return err
log.WithContext(ctx).WithError(err).Error("error querying p2p data store")
break
}

tx, err := d.db.Beginx()
if err != nil {
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to start transaction")
return err
log.WithContext(ctx).WithError(err).Error("failed to start transaction")
continue
}

stmt, err := tx.Prepare(insertQuery)
if err != nil {
tx.Rollback()
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to prepare statement")
return err
log.WithContext(ctx).WithError(err).Error("failed to prepare statement")
continue
}

var batchProcessed bool
var recordsProcessed int
for rows.Next() {
var r Record
var t *time.Time

if err := rows.Scan(&r.Key, &r.Data, &t); err != nil {
log.WithContext(ctx).WithError(err).Error("Error scanning row from p2p data store")
log.WithContext(ctx).WithError(err).Error("error scanning row from p2p data store")
continue
}
if t != nil {
Expand All @@ -269,36 +274,29 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
continue
}

batchProcessed = true
recordsProcessed++
}

stmt.Close()
if err := rows.Err(); err != nil {
tx.Rollback()
rows.Close()
log.WithContext(ctx).WithError(err).Error("Error iterating rows")
return err
log.WithContext(ctx).WithError(err).Error("error iterating rows")
}

if batchProcessed {
if recordsProcessed > 0 {
if err := tx.Commit(); err != nil {
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to commit transaction")
return err
}
} else {
tx.Rollback()
rows.Close()
break
}

rows.Close()
if !batchProcessed {
tx.Rollback()
log.WithContext(ctx).Info("no rows processed, rolling back and breaking.")
break
stmt.Close()

if recordsProcessed == 0 {
continueProcessing = false // No more records to process
} else {
offset += metaSyncBatchSize
}
offset += metaSyncBatchSize //
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion p2p/kademlia/store/sqlite/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func retrieveBatchValues(ctx context.Context, db *sqlx.DB, keys []string, getFro
values[idx] = value
keysFound++

if s.IsCloudBackupOn() {
if s.IsCloudBackupOn() && !s.migrationStore.isSyncInProgress {
if len(value) == 0 && is_on_cloud {
cloudKeys = append(cloudKeys, key)
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/kademlia/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) {
return nil, fmt.Errorf("failed to get record by key %s: %w", hkey, err)
}

if s.IsCloudBackupOn() {
if s.IsCloudBackupOn() && !s.migrationStore.isSyncInProgress {
PostAccessUpdate([]string{hkey})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
batchSize = 10000
batchSize = 5000
)

var (
Expand Down

0 comments on commit 7ad605e

Please sign in to comment.