Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integration test for migration worker #928

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type MigrationMetaStore struct {

updates sync.Map
inserts sync.Map

minKeysForMigration int
}

// NewMigrationMetaStore initializes the MigrationMetaStore.
Expand Down Expand Up @@ -94,6 +96,7 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
insertTicker: time.NewTicker(commitInsertsInterval),
migrationExecutionTicker: time.NewTicker(migrationExecutionTicker),
cloud: cloud,
minKeysForMigration: minKeysToMigrate,
}

if err := handler.migrateMeta(); err != nil {
Expand Down Expand Up @@ -580,7 +583,7 @@ func (d *MigrationMetaStore) processMigrationInBatches(ctx context.Context, migr
return nil
}

if totalKeys < minKeysToMigrate {
if totalKeys < d.minKeysForMigration {
log.WithContext(ctx).WithField("migration_id", migration.ID).WithField("keys-count", totalKeys).Info("Skipping migration due to insufficient keys")
return nil
}
Expand Down
162 changes: 162 additions & 0 deletions p2p/kademlia/store/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package sqlite

import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
Expand Down Expand Up @@ -184,3 +185,164 @@ func TestStore(t *testing.T) {
os.Remove("data001.sqlite3-wal")

}

func TestBatchStoreAndRetrieveCloud(t *testing.T) {
err := os.Setenv("RCLONE_CONFIG_B2_TYPE", "b2")
if err != nil {
t.Fatalf("failed to set RCLONE_CONFIG_B2_TYPE: %v", err)
}

if os.Getenv("B2_ACCOUNT_ID") == "" {
t.Skip("Skipping test as it requires B2_ACCOUNT_ID")
}

err = os.Setenv("RCLONE_CONFIG_B2_ACCOUNT", os.Getenv("B2_ACCOUNT_ID"))
if err != nil {
t.Fatalf("failed to set RCLONE_CONFIG_B2_ACCOUNT: %v", err)
}

err = os.Setenv("RCLONE_CONFIG_B2_KEY", os.Getenv("B2_API_KEY"))
if err != nil {
t.Fatalf("failed to set RCLONE_CONFIG_B2_KEY: %v", err)
}

cloud := cloud.NewRcloneStorage("Pastel-Devnet-Storage-Layer-Cloud-Expansion", "b2")
ctx, cancel := context.WithCancel(context.Background())

mst, err := NewMigrationMetaStore(ctx, ".", cloud)

mst.updateTicker.Stop()
mst.insertTicker.Stop()

// override the tickers for testing
mst.updateTicker = time.NewTicker(2 * time.Second)
mst.insertTicker = time.NewTicker(2 * time.Second)
mst.minKeysForMigration = 2
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
store, err := NewStore(ctx, ".", cloud, mst)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}

r1 := []byte("test-record-1-meta")
r2 := []byte("test-record-2-meta")
r3 := []byte("test-record-3-meta")

hashed, err := utils.Sha3256hash(r1)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r1Key := hex.EncodeToString(hashed)

hashed, err = utils.Sha3256hash(r2)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r2Key := hex.EncodeToString(hashed)

hashed, err = utils.Sha3256hash(r3)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r3Key := hex.EncodeToString(hashed)

defer func() {

os.Remove("data001.sqlite3")
os.Remove("data001-migration-meta.sqlite3")
os.Remove("data001.sqlite3-shm")
os.Remove("data001.sqlite3-wal")
}()

err = store.storeBatchRecord([][]byte{r1, r2, r3}, 0, true)
if err != nil {
t.Fatalf("failed to store record: %v", err)
}

time.Sleep(3 * time.Second)

type record struct {
Key string `db:"key"`
LastAcccessed time.Time `db:"last_accessed"`
AccessCount int `db:"access_count"`
DataSize int `db:"data_size"`
}

var keys []record
err = store.migrationStore.db.Select(&keys, "SELECT key,last_accessed,access_count,data_size FROM meta where key in (?, ?, ?)", r1Key, r2Key, r3Key)
if err != nil {
t.Fatalf("failed to retrieve record: %v", err)
}

if len(keys) != 3 {
t.Fatalf("expected 3 records, got %d", len(keys))
}

time.Sleep(1 * time.Second)

migID, err := mst.CreateNewMigration(ctx)
if err != nil {
t.Fatalf("failed to create migration: %v", err)
}

err = mst.InsertMetaMigrationData(ctx, migID, []string{r1Key, r2Key, r3Key})
if err != nil {
t.Fatalf("failed to insert meta migration data: %v", err)
}

err = mst.processMigrationInBatches(ctx, Migration{ID: migID})
if err != nil {
t.Fatalf("failed to process migration in batches: %v", err)
}

vals, _, err := store.RetrieveBatchValues(context.Background(), []string{r1Key, r2Key, r3Key}, true)
if err != nil {
t.Fatalf("failed to retrieve record: %v", err)
}

if len(vals) != 3 {
t.Fatalf("expected 3 records, got %d", len(vals))
}

if !bytes.Equal(vals[0], r1) {
t.Fatalf("expected %s, got %s", r1, vals[0])
}

if !bytes.Equal(vals[1], r2) {
t.Fatalf("expected %s, got %s", r2, vals[1])
}

if !bytes.Equal(vals[2], r3) {
t.Fatalf("expected %s, got %s", r3, vals[2])
}

// Allow some time for goroutines to exit
time.Sleep(5 * time.Second)

cancel() // Signal all contexts to finish
mst.updateTicker.Stop()
mst.insertTicker.Stop()

time.Sleep(1 * time.Second)

err = mst.cloud.Delete(r1Key)
if err != nil {
t.Fatalf("failed to delete record: %v", err)
}

err = mst.cloud.Delete(r2Key)
if err != nil {
t.Fatalf("failed to delete record: %v", err)
}

err = mst.cloud.Delete(r3Key)
if err != nil {
t.Fatalf("failed to delete record: %v", err)
}

}
Loading