Skip to content

Commit

Permalink
integration test for migration worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mateeullahmalik committed Aug 19, 2024
1 parent f8a1218 commit 77adc38
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 1 deletion.
5 changes: 4 additions & 1 deletion p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type MigrationMetaStore struct {

updates sync.Map
inserts sync.Map

minKeysForMigration int
}

// NewMigrationMetaStore initializes the MigrationMetaStore.
Expand Down Expand Up @@ -94,6 +96,7 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
insertTicker: time.NewTicker(commitInsertsInterval),
migrationExecutionTicker: time.NewTicker(migrationExecutionTicker),
cloud: cloud,
minKeysForMigration: minKeysToMigrate,
}

if err := handler.migrateMeta(); err != nil {
Expand Down Expand Up @@ -580,7 +583,7 @@ func (d *MigrationMetaStore) processMigrationInBatches(ctx context.Context, migr
return nil
}

if totalKeys < minKeysToMigrate {
if totalKeys < d.minKeysForMigration {
log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("keys-count", totalKeys).Info("Skipping migration due to insufficient keys")
return nil
}
Expand Down
162 changes: 162 additions & 0 deletions p2p/kademlia/store/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package sqlite

import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
Expand Down Expand Up @@ -184,3 +185,164 @@ func TestStore(t *testing.T) {
os.Remove("data001.sqlite3-wal")

}

func TestBatchStoreAndRetrieveCloud(t *testing.T) {
err := os.Setenv("RCLONE_CONFIG_B2_TYPE", "b2")
if err != nil {
t.Fatalf("failed to set RCLONE_CONFIG_B2_TYPE: %v", err)
}

if os.Getenv("B2_ACCOUNT_ID") == "" {
t.Skip("Skipping test as it requires B2_ACCOUNT_ID")
}

err = os.Setenv("RCLONE_CONFIG_B2_ACCOUNT", os.Getenv("B2_ACCOUNT_ID"))
if err != nil {
t.Fatalf("failed to set RCLONE_CONFIG_B2_ACCOUNT: %v", err)
}

err = os.Setenv("RCLONE_CONFIG_B2_KEY", os.Getenv("B2_API_KEY"))
if err != nil {
t.Fatalf("failed to set RCLONE_CONFIG_B2_KEY: %v", err)
}

cloud := cloud.NewRcloneStorage("Pastel-Devnet-Storage-Layer-Cloud-Expansion", "b2")
ctx, cancel := context.WithCancel(context.Background())

mst, err := NewMigrationMetaStore(ctx, ".", cloud)

mst.updateTicker.Stop()
mst.insertTicker.Stop()

// override the tickers for testing
mst.updateTicker = time.NewTicker(2 * time.Second)
mst.insertTicker = time.NewTicker(2 * time.Second)
mst.minKeysForMigration = 2
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
store, err := NewStore(ctx, ".", cloud, mst)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}

r1 := []byte("test-record-1-meta")
r2 := []byte("test-record-2-meta")
r3 := []byte("test-record-3-meta")

hashed, err := utils.Sha3256hash(r1)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r1Key := hex.EncodeToString(hashed)

hashed, err = utils.Sha3256hash(r2)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r2Key := hex.EncodeToString(hashed)

hashed, err = utils.Sha3256hash(r3)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r3Key := hex.EncodeToString(hashed)

defer func() {

os.Remove("data001.sqlite3")
os.Remove("data001-migration-meta.sqlite3")
os.Remove("data001.sqlite3-shm")
os.Remove("data001.sqlite3-wal")
}()

err = store.storeBatchRecord([][]byte{r1, r2, r3}, 0, true)
if err != nil {
t.Fatalf("failed to store record: %v", err)
}

time.Sleep(3 * time.Second)

type record struct {
Key string `db:"key"`
LastAcccessed time.Time `db:"last_accessed"`
AccessCount int `db:"access_count"`
DataSize int `db:"data_size"`
}

var keys []record
err = store.migrationStore.db.Select(&keys, "SELECT key,last_accessed,access_count,data_size FROM meta where key in (?, ?, ?)", r1Key, r2Key, r3Key)
if err != nil {
t.Fatalf("failed to retrieve record: %v", err)
}

if len(keys) != 3 {
t.Fatalf("expected 3 records, got %d", len(keys))
}

time.Sleep(1 * time.Second)

migID, err := mst.CreateNewMigration(ctx)
if err != nil {
t.Fatalf("failed to create migration: %v", err)
}

err = mst.InsertMetaMigrationData(ctx, migID, []string{r1Key, r2Key, r3Key})
if err != nil {
t.Fatalf("failed to insert meta migration data: %v", err)
}

err = mst.processMigrationInBatches(ctx, Migration{ID: migID})
if err != nil {
t.Fatalf("failed to process migration in batches: %v", err)
}

vals, _, err := store.RetrieveBatchValues(context.Background(), []string{r1Key, r2Key, r3Key}, true)
if err != nil {
t.Fatalf("failed to retrieve record: %v", err)
}

if len(vals) != 3 {
t.Fatalf("expected 3 records, got %d", len(vals))
}

if !bytes.Equal(vals[0], r1) {
t.Fatalf("expected %s, got %s", r1, vals[0])
}

if !bytes.Equal(vals[1], r2) {
t.Fatalf("expected %s, got %s", r2, vals[1])
}

if !bytes.Equal(vals[2], r3) {
t.Fatalf("expected %s, got %s", r3, vals[2])
}

// Allow some time for goroutines to exit
time.Sleep(5 * time.Second)

cancel() // Signal all contexts to finish
mst.updateTicker.Stop()
mst.insertTicker.Stop()

time.Sleep(1 * time.Second)

err = mst.cloud.Delete(r1Key)
if err != nil {
t.Fatalf("failed to delete record: %v", err)
}

err = mst.cloud.Delete(r2Key)
if err != nil {
t.Fatalf("failed to delete record: %v", err)
}

err = mst.cloud.Delete(r3Key)
if err != nil {
t.Fatalf("failed to delete record: %v", err)
}

}

0 comments on commit 77adc38

Please sign in to comment.