diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go index 17451e090..c0a6c8183 100644 --- a/p2p/kademlia/store/sqlite/meta_worker.go +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -58,6 +58,8 @@ type MigrationMetaStore struct { updates sync.Map inserts sync.Map + + minKeysForMigration int } // NewMigrationMetaStore initializes the MigrationMetaStore. @@ -94,6 +96,7 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor insertTicker: time.NewTicker(commitInsertsInterval), migrationExecutionTicker: time.NewTicker(migrationExecutionTicker), cloud: cloud, + minKeysForMigration: minKeysToMigrate, } if err := handler.migrateMeta(); err != nil { @@ -580,7 +583,7 @@ func (d *MigrationMetaStore) processMigrationInBatches(ctx context.Context, migr return nil } - if totalKeys < minKeysToMigrate { + if totalKeys < d.minKeysForMigration { log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("keys-count", totalKeys).Info("Skipping migration due to insufficient keys") return nil } diff --git a/p2p/kademlia/store/sqlite/sqlite_test.go b/p2p/kademlia/store/sqlite/sqlite_test.go index a8c032ef6..32fd183fb 100644 --- a/p2p/kademlia/store/sqlite/sqlite_test.go +++ b/p2p/kademlia/store/sqlite/sqlite_test.go @@ -4,6 +4,7 @@ package sqlite import ( + "bytes" "context" "crypto/rand" "encoding/hex" @@ -184,3 +185,164 @@ func TestStore(t *testing.T) { os.Remove("data001.sqlite3-wal") } + +func TestBatchStoreAndRetrieveCloud(t *testing.T) { + err := os.Setenv("RCLONE_CONFIG_B2_TYPE", "b2") + if err != nil { + t.Fatalf("failed to set RCLONE_CONFIG_B2_TYPE: %v", err) + } + + if os.Getenv("B2_ACCOUNT_ID") == "" { + t.Skip("Skipping test as it requires B2_ACCOUNT_ID") + } + + err = os.Setenv("RCLONE_CONFIG_B2_ACCOUNT", os.Getenv("B2_ACCOUNT_ID")) + if err != nil { + t.Fatalf("failed to set RCLONE_CONFIG_B2_ACCOUNT: %v", err) + } + + err = os.Setenv("RCLONE_CONFIG_B2_KEY", os.Getenv("B2_API_KEY")) + if err != nil { + t.Fatalf("failed to set RCLONE_CONFIG_B2_KEY: %v", err) + } + + cloud := cloud.NewRcloneStorage("Pastel-Devnet-Storage-Layer-Cloud-Expansion", "b2") + ctx, cancel := context.WithCancel(context.Background()) + + mst, err := NewMigrationMetaStore(ctx, ".", cloud) + + mst.updateTicker.Stop() + mst.insertTicker.Stop() + + // override the tickers for testing + mst.updateTicker = time.NewTicker(2 * time.Second) + mst.insertTicker = time.NewTicker(2 * time.Second) + mst.minKeysForMigration = 2 + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + store, err := NewStore(ctx, ".", cloud, mst) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + r1 := []byte("test-record-1-meta") + r2 := []byte("test-record-2-meta") + r3 := []byte("test-record-3-meta") + + hashed, err := utils.Sha3256hash(r1) + if err != nil { + t.Fatalf("failed to hash record: %v", err) + } + + r1Key := hex.EncodeToString(hashed) + + hashed, err = utils.Sha3256hash(r2) + if err != nil { + t.Fatalf("failed to hash record: %v", err) + } + + r2Key := hex.EncodeToString(hashed) + + hashed, err = utils.Sha3256hash(r3) + if err != nil { + t.Fatalf("failed to hash record: %v", err) + } + + r3Key := hex.EncodeToString(hashed) + + defer func() { + + os.Remove("data001.sqlite3") + os.Remove("data001-migration-meta.sqlite3") + os.Remove("data001.sqlite3-shm") + os.Remove("data001.sqlite3-wal") + }() + + err = store.storeBatchRecord([][]byte{r1, r2, r3}, 0, true) + if err != nil { + t.Fatalf("failed to store record: %v", err) + } + + time.Sleep(3 * time.Second) + + type record struct { + Key string `db:"key"` + LastAcccessed time.Time `db:"last_accessed"` + AccessCount int `db:"access_count"` + DataSize int `db:"data_size"` + } + + var keys []record + err = store.migrationStore.db.Select(&keys, "SELECT key,last_accessed,access_count,data_size FROM meta where key in (?, ?, ?)", r1Key, r2Key, r3Key) + if err != nil { + t.Fatalf("failed to retrieve record: %v", err) + } + + if len(keys) != 3 { + t.Fatalf("expected 3 records, got %d", len(keys)) + } + + time.Sleep(1 * time.Second) + + migID, err := mst.CreateNewMigration(ctx) + if err != nil { + t.Fatalf("failed to create migration: %v", err) + } + + err = mst.InsertMetaMigrationData(ctx, migID, []string{r1Key, r2Key, r3Key}) + if err != nil { + t.Fatalf("failed to insert meta migration data: %v", err) + } + + err = mst.processMigrationInBatches(ctx, Migration{ID: migID}) + if err != nil { + t.Fatalf("failed to process migration in batches: %v", err) + } + + vals, _, err := store.RetrieveBatchValues(context.Background(), []string{r1Key, r2Key, r3Key}, true) + if err != nil { + t.Fatalf("failed to retrieve record: %v", err) + } + + if len(vals) != 3 { + t.Fatalf("expected 3 records, got %d", len(vals)) + } + + if !bytes.Equal(vals[0], r1) { + t.Fatalf("expected %s, got %s", r1, vals[0]) + } + + if !bytes.Equal(vals[1], r2) { + t.Fatalf("expected %s, got %s", r2, vals[1]) + } + + if !bytes.Equal(vals[2], r3) { + t.Fatalf("expected %s, got %s", r3, vals[2]) + } + + // Allow some time for goroutines to exit + time.Sleep(5 * time.Second) + + cancel() // Signal all contexts to finish + mst.updateTicker.Stop() + mst.insertTicker.Stop() + + time.Sleep(1 * time.Second) + + err = mst.cloud.Delete(r1Key) + if err != nil { + t.Fatalf("failed to delete record: %v", err) + } + + err = mst.cloud.Delete(r2Key) + if err != nil { + t.Fatalf("failed to delete record: %v", err) + } + + err = mst.cloud.Delete(r3Key) + if err != nil { + t.Fatalf("failed to delete record: %v", err) + } + +}