Skip to content

Commit

Permalink
[PSL-1258] fix cloud retrieve queries for download
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique committed Aug 21, 2024
1 parent f8a1218 commit 8c9865c
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 12 deletions.
7 changes: 7 additions & 0 deletions mixins/pasteld_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ func (pt *PastelHandler) RegTicket(ctx context.Context, RegTXID string) (*pastel
return &regTicket, nil
}

func (pt *PastelHandler) GetRQIDs(ctx context.Context, txid string) (rqIDs []string, err error) {
info, err := pt.GetTicketInfo(ctx, txid, pastel.ActionTypeCascade)

return info.RQIDs, nil
}

func (pt *PastelHandler) GetTicketInfo(ctx context.Context, txid, ttype string) (info TicketInfo, err error) {
info.EstimatedDownloadTime = defaultDownloadTimeout
switch ttype {
Expand Down Expand Up @@ -264,6 +270,7 @@ func (pt *PastelHandler) GetTicketInfo(ctx context.Context, txid, ttype string)
info.IsTicketPublic = cTicket.MakePubliclyAccessible
info.Filename = cTicket.FileName
info.FileType = cTicket.FileType
info.RQIDs = cTicket.RQIDs

est := getEstimatedDownloadSizeOnBytes(cTicket.OriginalFileSizeInBytes)
if est > defaultDownloadTimeout {
Expand Down
2 changes: 2 additions & 0 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func (s *DHT) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by
value, err := s.store.Retrieve(ctx, decoded)
if err == nil && len(value) > 0 {
return value, nil
} else if err != nil {
log.WithContext(ctx).WithField("db_key", dbKey).WithError(err).Error("error retrieving key")
}

// if queries only option is set, do not search just return error
Expand Down
12 changes: 6 additions & 6 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,14 +561,14 @@ func (d *MigrationMetaStore) checkAndExecuteMigration(ctx context.Context) {
for _, migration := range migrations {
log.WithContext(ctx).WithField("migration_id", migration.ID).Info("Processing migration")

if err := d.processMigrationInBatches(ctx, migration); err != nil {
if err := d.ProcessMigrationInBatches(ctx, migration); err != nil {
log.WithContext(ctx).WithError(err).WithField("migration_id", migration.ID).Error("Failed to process migration")
continue
}
}
}

func (d *MigrationMetaStore) processMigrationInBatches(ctx context.Context, migration Migration) error {
func (d *MigrationMetaStore) ProcessMigrationInBatches(ctx context.Context, migration Migration) error {
var migKeys MigrationKeys
err := d.db.Select(&migKeys, `SELECT key FROM meta_migration WHERE migration_id = ?`, migration.ID)
if err != nil {
Expand All @@ -580,10 +580,10 @@ func (d *MigrationMetaStore) processMigrationInBatches(ctx context.Context, migr
return nil
}

if totalKeys < minKeysToMigrate {
log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("keys-count", totalKeys).Info("Skipping migration due to insufficient keys")
return nil
}
//if totalKeys < minKeysToMigrate {
// log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("keys-count", totalKeys).Info("Skipping migration due to insufficient keys")
// return nil
//}

migratedKeys := 0
var keys []string
Expand Down
12 changes: 6 additions & 6 deletions p2p/kademlia/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ type Store struct {

// Record is a data record
type Record struct {
Key string
Data []byte
Datatype int
Isoriginal bool
Key string `db:"key"`
Data []byte `db:"data"`
Datatype int `db:"datatype"`
Isoriginal bool `db:"is_original"`
CreatedAt time.Time
UpdatedAt time.Time
ReplicatedAt time.Time
Expand Down Expand Up @@ -427,7 +427,7 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) {
hkey := hex.EncodeToString(key)

r := Record{}
err := s.db.Get(&r, `SELECT data FROM data WHERE key = ?`, hkey)
err := s.db.Get(&r, `SELECT data, is_on_cloud, is_original, datatype FROM data WHERE key = ?`, hkey)
if err != nil {
return nil, fmt.Errorf("failed to get record by key %s: %w", hkey, err)
}
Expand All @@ -448,7 +448,7 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) {
return nil, fmt.Errorf("failed to retrieve data from cloud: data is supposed to be on cloud but backup is not enabled")
}

data, err := s.cloud.Fetch(r.Key)
data, err := s.cloud.Fetch(hkey)
if err != nil {
return nil, fmt.Errorf("failed to retrieve data from cloud: %w", err)
}
Expand Down
72 changes: 72 additions & 0 deletions supernode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package cmd

import (
"context"
"encoding/hex"
"fmt"
"github.com/btcsuite/btcutil/base58"
"io"
"os"
"path/filepath"
"time"

"github.com/pastelnetwork/gonode/p2p/kademlia/store/sqlite"
"github.com/pastelnetwork/gonode/supernode/services/metamigrator"
Expand Down Expand Up @@ -252,6 +255,12 @@ func runApp(ctx context.Context, config *configs.Config) error {
return errors.Errorf("could not create p2p service, %w", err)
}

//go func() {
// if err := migrateKeys(ctx, p2p, mixins.NewPastelHandler(pastelClient), metaMigratorStore, "6222bd2307ec59730f8ab2451bf9b8d94e79bbbaac43d692695f2e6baeaf6c42"); err != nil {
// log.WithContext(ctx).WithError(err).Error("error migrating keys")
// }
//}()

rqAddr := fmt.Sprint(config.RaptorQ.Host, ":", config.RaptorQ.Port)
// raptorq client
config.NftRegister.RaptorQServiceAddress = rqAddr
Expand Down Expand Up @@ -357,3 +366,66 @@ func runApp(ctx context.Context, config *configs.Config) error {

return runServices(ctx, grpc, p2p, nftRegister, nftDownload, senseRegister, cascadeRegister, statsMngr, debugSerivce, storageChallenger, selfHealing, collectionRegister, healthCheckChallenger, metaMigratorWorker)
}

func migrateKeys(ctx context.Context, p2p p2p.P2P, ph *mixins.PastelHandler, metaMigratorStore *sqlite.MigrationMetaStore, txid string) error {
time.Sleep(5 * time.Minute)

logger := log.WithContext(ctx).WithField("txid", txid)
batchSize := 1000

RQIDs, err := ph.GetRQIDs(ctx, txid)
if err != nil {
logger.WithError(err).Error("error retrieving RQIDs")
return err
}
logger.Info("RQIDs retrieved by TxID")

migrationID, err := metaMigratorStore.CreateNewMigration(ctx)
if err != nil {
return fmt.Errorf("failed to create migration: %w", err)
}
logger.Info("migration has been created")

var keys []string
for _, RQID := range RQIDs {
data, err := p2p.Retrieve(ctx, RQID, true)
if err != nil {
logger.WithError(err).Error("error retrieving key from p2p")
}

if len(data) != 0 {
decodedRQID := base58.Decode(RQID)
dbKey := hex.EncodeToString(decodedRQID)
keys = append(keys, dbKey)
} else {
continue
}

if len(keys) >= batchSize {
if err := metaMigratorStore.InsertMetaMigrationData(ctx, migrationID, keys); err != nil {
logger.WithError(err).Error("error inserting batch keys to meta-migration")
}

logger.Info("keys added to meta-migration")
keys = nil
}
}

if len(keys) > 0 {
if err := metaMigratorStore.InsertMetaMigrationData(ctx, migrationID, keys); err != nil {
logger.WithError(err).Error("error inserting batch of stale data to migration-meta")
return fmt.Errorf("failed to insert stale data for migration %d: %w", migrationID, err)
}

logger.Info("keys added to meta-migration")
}

if err := metaMigratorStore.ProcessMigrationInBatches(ctx, sqlite.Migration{
ID: migrationID,
}); err != nil {
logger.WithError(err).Error("error processing migration in batches")
}
logger.Info("keys have been migrated")

return nil
}

0 comments on commit 8c9865c

Please sign in to comment.