From d9ae328d07d3cbc556abcdc1e8e7998a939beb4c Mon Sep 17 00:00:00 2001 From: Matee ullah Malik <46045452+mateeullahmalik@users.noreply.github.com> Date: Fri, 2 Aug 2024 00:44:20 +0500 Subject: [PATCH] [PSL-1239] cloud storage handler (#913) * [PSL-1239] cloud storage handler --- p2p/kademlia/dht_test.go | 2 +- p2p/kademlia/hashtable_test.go | 36 +++--- p2p/kademlia/store/cloud.go/cloud.go | 128 ++++++++++++++++++++++ p2p/kademlia/store/cloud.go/cloud_test.go | 102 +++++++++++++++++ p2p/kademlia/store/sqlite/replication.go | 40 ++++++- p2p/kademlia/store/sqlite/sqlite.go | 83 +++++++++++++- p2p/kademlia/store/sqlite/sqlite_test.go | 2 +- p2p/p2p.go | 5 +- supernode/cmd/app.go | 4 +- 9 files changed, 371 insertions(+), 31 deletions(-) create mode 100644 p2p/kademlia/store/cloud.go/cloud.go create mode 100644 p2p/kademlia/store/cloud.go/cloud_test.go diff --git a/p2p/kademlia/dht_test.go b/p2p/kademlia/dht_test.go index 2c4f62d92..8d81ab80b 100644 --- a/p2p/kademlia/dht_test.go +++ b/p2p/kademlia/dht_test.go @@ -132,7 +132,7 @@ func (ts *testSuite) SetupSuite() { defaultReplicateInterval := time.Second * 3600 defaultRepublishInterval := time.Second * 3600 * 24 - dbStore, err := sqlite.NewStore(ts.ctx, filepath.Join(workDir, "p2p"), defaultReplicateInterval, defaultRepublishInterval) + dbStore, err := sqlite.NewStore(ts.ctx, filepath.Join(workDir, "p2p"), defaultReplicateInterval, defaultRepublishInterval, nil) if err != nil { ts.T().Fatalf("new sqlite store: %v", err) } diff --git a/p2p/kademlia/hashtable_test.go b/p2p/kademlia/hashtable_test.go index b10c5a36f..0fe6f1f57 100644 --- a/p2p/kademlia/hashtable_test.go +++ b/p2p/kademlia/hashtable_test.go @@ -45,12 +45,12 @@ func TestClosestContactsWithIncludingNode(t *testing.T) { ignoredNodes: make([]*Node, 0), topCount: 6, expectedResults: &NodeList{Nodes: []*Node{ - &Node{IP: "3.18.240.77"}, - &Node{IP: "38.242.151.208"}, - &Node{IP: "154.38.160.234"}, - &Node{IP: "154.38.161.58"}, - &Node{IP: "154.12.235.12"}, - &Node{IP: "38.242.151.234"}, + {IP: "3.18.240.77"}, + {IP: "38.242.151.208"}, + {IP: "154.38.160.234"}, + {IP: "154.38.161.58"}, + {IP: "154.12.235.12"}, + {IP: "38.242.151.234"}, }, }, }, @@ -81,12 +81,12 @@ func TestClosestContactsWithIncludingNode(t *testing.T) { ignoredNodes: make([]*Node, 0), topCount: 6, expectedResults: &NodeList{Nodes: []*Node{ - &Node{IP: "154.38.161.58"}, - &Node{IP: "154.38.160.234"}, - &Node{IP: "38.242.151.208"}, - &Node{IP: "3.18.240.77"}, - &Node{IP: "154.12.235.12"}, - &Node{IP: "149.102.147.122"}, + {IP: "154.38.161.58"}, + {IP: "154.38.160.234"}, + {IP: "38.242.151.208"}, + {IP: "3.18.240.77"}, + {IP: "154.12.235.12"}, + {IP: "149.102.147.122"}, }, }, }, @@ -117,12 +117,12 @@ func TestClosestContactsWithIncludingNode(t *testing.T) { topCount: 6, targetKey: "517b1003b805793f35f4242f481a48cd45e5431a2af6e10339cbcf97f7b1a27e", expectedResults: &NodeList{Nodes: []*Node{ - &Node{IP: "18.189.251.53"}, - &Node{IP: "149.102.147.113"}, - &Node{IP: "154.12.235.19"}, - &Node{IP: "154.38.161.44"}, - &Node{IP: "95.111.253.200"}, - &Node{IP: "3.19.48.187"}, + {IP: "18.189.251.53"}, + {IP: "149.102.147.113"}, + {IP: "154.12.235.19"}, + {IP: "154.38.161.44"}, + {IP: "95.111.253.200"}, + {IP: "3.19.48.187"}, }, }, }, diff --git a/p2p/kademlia/store/cloud.go/cloud.go b/p2p/kademlia/store/cloud.go/cloud.go new file mode 100644 index 000000000..cc8a3f76e --- /dev/null +++ b/p2p/kademlia/store/cloud.go/cloud.go @@ -0,0 +1,128 @@ +package cloud + +import ( + "bytes" + "fmt" + + "os" + "os/exec" + "path/filepath" + "sync" + + "github.com/pastelnetwork/gonode/common/log" +) + +type Storage interface { + Store(key string, data []byte) (string, error) + Fetch(key string) ([]byte, error) + StoreBatch(data [][]byte) error + FetchBatch(keys []string) (map[string][]byte, error) +} + +type RcloneStorage struct { + bucketName string + specName string +} + +func NewRcloneStorage(bucketName, specName string) *RcloneStorage { + return &RcloneStorage{ + bucketName: bucketName, + specName: specName, + } +} + +func (r *RcloneStorage) Store(key string, data []byte) (string, error) { + filePath := filepath.Join(os.TempDir(), key) + + // Write data to a temporary file using os.WriteFile + if err := os.WriteFile(filePath, data, 0644); err != nil { + return "", fmt.Errorf("failed to write data to file: %w", err) + } + + // Construct the remote path where the file will be stored + // This example places the file at the root of the remote, but you can modify the path as needed + remotePath := fmt.Sprintf("%s:%s/%s", r.specName, r.bucketName, key) + + // Use rclone to copy the file to the remote + cmd := exec.Command("rclone", "copyto", filePath, remotePath) + if err := cmd.Run(); err != nil { + // Clean up the local file if the upload fails + os.Remove(filePath) + return "", fmt.Errorf("rclone command failed: %w", err) + } + + // Delete the local file after successful upload + go func() { + if err := os.Remove(filePath); err != nil { + log.Error("failed to delete local file", "path", filePath, "error", err) + } + }() + + // Return the remote path where the file was stored + return remotePath, nil +} + +func (r *RcloneStorage) Fetch(key string) ([]byte, error) { + // Construct the rclone command to fetch the file + cmd := exec.Command("rclone", "cat", fmt.Sprintf("%s:%s/%s", r.specName, r.bucketName, key)) + var out bytes.Buffer + cmd.Stdout = &out + err := cmd.Run() + if err != nil { + return nil, fmt.Errorf("rclone command failed: %w - out %s", err, out.String()) + } + + return out.Bytes(), nil +} + +func (r *RcloneStorage) StoreBatch(data [][]byte) error { + // Placeholder for StoreBatch implementation + return nil +} + +func (r *RcloneStorage) FetchBatch(keys []string) (map[string][]byte, error) { + results := make(map[string][]byte) + errs := make(map[string]error) + var mu sync.Mutex + + semaphore := make(chan struct{}, 50) + + var wg sync.WaitGroup + for _, key := range keys { + wg.Add(1) + semaphore <- struct{}{} // Acquire a token + + go func(key string) { + defer wg.Done() + data, err := r.Fetch(key) + + func() { + mu.Lock() + defer mu.Unlock() + if err != nil { + errs[key] = err + } else { + results[key] = data + } + + }() + <-semaphore // Release the token + }(key) + } + + wg.Wait() + + if len(results) > 0 { + return results, nil + } + + if len(errs) > 0 { + combinedError := fmt.Errorf("errors occurred in fetching keys") + for k, e := range errs { + combinedError = fmt.Errorf("%v; key %s error: %v", combinedError, k, e) + } + return nil, combinedError + } + + return results, nil +} diff --git a/p2p/kademlia/store/cloud.go/cloud_test.go b/p2p/kademlia/store/cloud.go/cloud_test.go new file mode 100644 index 000000000..e3c0b412f --- /dev/null +++ b/p2p/kademlia/store/cloud.go/cloud_test.go @@ -0,0 +1,102 @@ +package cloud + +/* +import ( + "fmt" + "os" + "testing" +) + +func TestRcloneStorage_Fetch(t *testing.T) { + SetRcloneB2Env("0058a83f04e57580000000002", "K005lP8Wd0Gvr4JOdEI6e6BTAyt6iZA") + storage := NewRcloneStorage("pastel-test", "b2") + + tests := map[string]struct { + key string + expected string + wantErr bool + }{ + "File Exists": { + key: "copy_45707.txt", + expected: "expected content of testfile.txt", + wantErr: false, + }, + "File Does Not Exist": { + key: "copy_45707_nonexistent.txt", + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + data, err := storage.Fetch(tc.key) + if (err != nil) != tc.wantErr { + t.Errorf("Fetch() error = %v, wantErr %v", err, tc.wantErr) + } + if !tc.wantErr && string(data) != tc.expected { + t.Errorf("Fetch() got = %v, want %v", string(data), tc.expected) + } + }) + } +} + +// SetRcloneB2Env sets the environment variables for rclone configuration. +func SetRcloneB2Env(accountID, appKey string) error { + err := os.Setenv("RCLONE_CONFIG_B2_TYPE", "b2") + if err != nil { + return fmt.Errorf("failed to set RCLONE_CONFIG_B2_TYPE: %w", err) + } + + err = os.Setenv("RCLONE_CONFIG_B2_ACCOUNT", accountID) + if err != nil { + return fmt.Errorf("failed to set RCLONE_CONFIG_B2_ACCOUNT: %w", err) + } + + err = os.Setenv("RCLONE_CONFIG_B2_KEY", appKey) + if err != nil { + return fmt.Errorf("failed to set RCLONE_CONFIG_B2_KEY: %w", err) + } + + return nil +} + + +func TestRcloneStorage_FetchBatch(t *testing.T) { + storage := NewRcloneStorage("mybucket") + + tests := map[string]struct { + keys []string + expected map[string]string + wantErr bool + }{ + "Multiple Files Exist": { + keys: []string{"testfile1.txt", "testfile2.txt"}, // Ensure these files exist + expected: map[string]string{ + "testfile1.txt": "content of testfile1.txt", + "testfile2.txt": "content of testfile2.txt", + }, + wantErr: false, + }, + "Some Files Do Not Exist": { + keys: []string{"testfile1.txt", "doesNotExist.txt"}, + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + results, err := storage.FetchBatch(tc.keys) + if (err != nil) != tc.wantErr { + t.Errorf("FetchBatch() error = %v, wantErr %v", err, tc.wantErr) + } + if !tc.wantErr { + for k, v := range tc.expected { + if results[k] != v { + t.Errorf("FetchBatch() got = %v, want %v for key %v", string(results[k]), v, k) + } + } + } + }) + } +} +*/ diff --git a/p2p/kademlia/store/sqlite/replication.go b/p2p/kademlia/store/sqlite/replication.go index aa2e8a4ec..e73d598af 100644 --- a/p2p/kademlia/store/sqlite/replication.go +++ b/p2p/kademlia/store/sqlite/replication.go @@ -434,7 +434,7 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt keyToIndex[keys[i]] = i } - query := fmt.Sprintf(`SELECT key, data FROM data WHERE key IN (%s)`, strings.Join(placeholders, ",")) + query := fmt.Sprintf(`SELECT key, data, is_on_cloud FROM data WHERE key IN (%s)`, strings.Join(placeholders, ",")) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, 0, fmt.Errorf("failed to retrieve records: %w", err) @@ -442,12 +442,14 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt defer rows.Close() values := make([][]byte, len(keys)) + var cloudKeys []string keysFound := 0 for rows.Next() { var key string var value []byte - if err := rows.Scan(&key, &value); err != nil { + var is_on_cloud bool + if err := rows.Scan(&key, &value, &is_on_cloud); err != nil { return nil, keysFound, fmt.Errorf("failed to scan key and value: %w", err) } @@ -455,7 +457,12 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt values[idx] = value keysFound++ - PostAccessUpdate([]string{key}) + if s.isCloudBackupOn() { + if len(value) == 0 && is_on_cloud { + cloudKeys = append(cloudKeys, key) + } + PostAccessUpdate([]string{key}) + } } } @@ -463,6 +470,33 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt return nil, keysFound, fmt.Errorf("rows processing error: %w", err) } + if len(cloudKeys) > 0 { + // Fetch from cloud + cloudValues, err := s.cloud.FetchBatch(cloudKeys) + if err != nil { + log.WithContext(ctx).WithError(err).Error("failed to fetch from cloud") + } + + for key, value := range cloudValues { + if idx, found := keyToIndex[key]; found { + values[idx] = value + keysFound++ + } + } + + go func() { + datList := make([][]byte, 0, len(cloudValues)) + for _, v := range cloudValues { + datList = append(datList, v) + } + + // Store the fetched data in the local store + if err := s.StoreBatch(ctx, datList, 0, false); err != nil { + log.WithError(err).Error("failed to store fetched data in local store") + } + }() + } + return values, keysFound, nil } diff --git a/p2p/kademlia/store/sqlite/sqlite.go b/p2p/kademlia/store/sqlite/sqlite.go index 8c3cf260a..934bfcacf 100644 --- a/p2p/kademlia/store/sqlite/sqlite.go +++ b/p2p/kademlia/store/sqlite/sqlite.go @@ -11,6 +11,7 @@ import ( "time" "github.com/pastelnetwork/gonode/common/log" + "github.com/pastelnetwork/gonode/p2p/kademlia/store/cloud.go" "github.com/cenkalti/backoff" "github.com/jmoiron/sqlx" @@ -50,6 +51,7 @@ type Worker struct { type Store struct { db *sqlx.DB worker *Worker + cloud cloud.Storage } // Record is a data record @@ -61,10 +63,11 @@ type Record struct { CreatedAt time.Time UpdatedAt time.Time ReplicatedAt time.Time + IsOnCloud bool `db:"is_on_cloud"` } // NewStore returns a new store -func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Duration) (*Store, error) { +func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Duration, cloud cloud.Storage) (*Store, error) { worker := &Worker{ JobQueue: make(chan Job, 500), quit: make(chan bool), @@ -90,6 +93,7 @@ func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Durat s := &Store{ worker: worker, db: db, + cloud: cloud, } if !s.checkStore() { @@ -118,6 +122,10 @@ func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Durat log.WithContext(ctx).WithError(err).Error("URGENT! unable to create attemps column in p2p database") } + if err := s.ensureIsOnCloudColumn(); err != nil { + log.WithContext(ctx).WithError(err).Error("URGENT! unable to create is_on_cloud column in p2p database") + } + if err := s.ensureLastSeenColumn(); err != nil { log.WithContext(ctx).WithError(err).Error("URGENT! unable to create datatype column in p2p database") } @@ -159,6 +167,10 @@ func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Durat return s, nil } +func (s *Store) isCloudBackupOn() bool { + return s.cloud != nil +} + func (s *Store) checkStore() bool { query := `SELECT name FROM sqlite_master WHERE type='table' AND name='data'` var name string @@ -166,6 +178,39 @@ func (s *Store) checkStore() bool { return err == nil } +func (s *Store) ensureIsOnCloudColumn() error { + rows, err := s.db.Query("PRAGMA table_info(data)") + if err != nil { + return fmt.Errorf("failed to fetch table 'data' info: %w", err) + } + defer rows.Close() + + for rows.Next() { + var cid, notnull, pk int + var name, dtype string + var dfltValue *string + err = rows.Scan(&cid, &name, &dtype, ¬null, &dfltValue, &pk) + if err != nil { + return fmt.Errorf("failed to scan row: %w", err) + } + + if name == "is_on_cloud" { + return nil + } + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("error during iteration: %w", err) + } + + _, err = s.db.Exec(`ALTER TABLE data ADD COLUMN is_on_cloud BOOL DEFAULT false`) + if err != nil { + return fmt.Errorf("failed to add column 'is_on_cloud' to table 'data': %w", err) + } + + return nil +} + func (s *Store) ensureDatatypeColumn() error { rows, err := s.db.Query("PRAGMA table_info(data)") if err != nil { @@ -382,9 +427,32 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) { return nil, fmt.Errorf("failed to get record by key %s: %w", hkey, err) } - PostAccessUpdate([]string{hkey}) + if s.isCloudBackupOn() { + PostAccessUpdate([]string{hkey}) + } + + if len(r.Data) > 0 { + return r.Data, nil + } + + if !r.IsOnCloud { + return nil, fmt.Errorf("failed to retrieve data from cloud: data is neither on cloud nor on local - this shouldn't happen") + } + + if !s.isCloudBackupOn() { + return nil, fmt.Errorf("failed to retrieve data from cloud: data is supposed to be on cloud but backup is not enabled") + } + + data, err := s.cloud.Fetch(r.Key) + if err != nil { + return nil, fmt.Errorf("failed to retrieve data from cloud: %w", err) + } + + if err := s.Store(context.Background(), key, data, r.Datatype, r.Isoriginal); err != nil { + return nil, fmt.Errorf("failed to store data retrieved from cloud: %w", err) + } - return r.Data, nil + return data, nil } // Checkpoint method for the store @@ -460,7 +528,10 @@ func (s *Store) storeRecord(key []byte, value []byte, typ int, isOriginal bool) if err != nil { return fmt.Errorf("error storing data: %w", err) } - PostKeysInsert([]UpdateMessage{{Key: hkey, LastAccessTime: time.Now(), Size: len(value)}}) + + if s.isCloudBackupOn() { + PostKeysInsert([]UpdateMessage{{Key: hkey, LastAccessTime: time.Now(), Size: len(value)}}) + } return nil } @@ -524,7 +595,9 @@ func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) erro return fmt.Errorf("error storing data: %w", err) } - PostKeysInsert(hkeys) + if s.isCloudBackupOn() { + PostKeysInsert(hkeys) + } return nil } diff --git a/p2p/kademlia/store/sqlite/sqlite_test.go b/p2p/kademlia/store/sqlite/sqlite_test.go index 71ad766f4..4abd27b3f 100644 --- a/p2p/kademlia/store/sqlite/sqlite_test.go +++ b/p2p/kademlia/store/sqlite/sqlite_test.go @@ -21,7 +21,7 @@ func TestStoreAndRetrieve(t *testing.T) { defer os.RemoveAll(tempDir) dbPath := filepath.Join(tempDir, "test.db") - store, err := NewStore(context.Background(), dbPath, time.Minute, time.Minute) + store, err := NewStore(context.Background(), dbPath, time.Minute, time.Minute, nil) if err != nil { t.Fatalf("failed to create store: %v", err) } diff --git a/p2p/p2p.go b/p2p/p2p.go index 2312f8e41..82b314fcf 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/pastelnetwork/gonode/p2p/kademlia/store/cloud.go" "github.com/pastelnetwork/gonode/p2p/kademlia/store/meta" "github.com/btcsuite/btcutil/base58" @@ -261,8 +262,8 @@ func (s *p2p) configure(ctx context.Context) error { } // New returns a new p2p instance. -func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store) (P2P, error) { - store, err := sqlite.NewStore(ctx, config.DataDir, defaultReplicateInterval, defaultRepublishInterval) +func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage) (P2P, error) { + store, err := sqlite.NewStore(ctx, config.DataDir, defaultReplicateInterval, defaultRepublishInterval, cloud) if err != nil { return nil, errors.Errorf("new kademlia store: %w", err) } diff --git a/supernode/cmd/app.go b/supernode/cmd/app.go index fbf534995..cce88c79f 100644 --- a/supernode/cmd/app.go +++ b/supernode/cmd/app.go @@ -26,6 +26,7 @@ import ( "github.com/pastelnetwork/gonode/dupedetection/ddclient" "github.com/pastelnetwork/gonode/mixins" "github.com/pastelnetwork/gonode/p2p" + "github.com/pastelnetwork/gonode/p2p/kademlia/store/cloud.go" "github.com/pastelnetwork/gonode/pastel" "github.com/pastelnetwork/gonode/supernode/configs" "github.com/pastelnetwork/gonode/supernode/debug" @@ -224,7 +225,8 @@ func runApp(ctx context.Context, config *configs.Config) error { // p2p service (currently using kademlia) config.P2P.SetWorkDir(config.WorkDir) config.P2P.ID = config.PastelID - p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore) + cloudStorage := cloud.NewRcloneStorage("bucket", "spec") + p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage) if err != nil { return errors.Errorf("could not create p2p service, %w", err) }