Skip to content

Commit

Permalink
[PSL-1239] cloud storage handler (#913)
Browse files Browse the repository at this point in the history
* [PSL-1239] cloud storage handler
  • Loading branch information
mateeullahmalik authored Aug 1, 2024
1 parent 167864a commit d9ae328
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 31 deletions.
2 changes: 1 addition & 1 deletion p2p/kademlia/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (ts *testSuite) SetupSuite() {
defaultReplicateInterval := time.Second * 3600
defaultRepublishInterval := time.Second * 3600 * 24

dbStore, err := sqlite.NewStore(ts.ctx, filepath.Join(workDir, "p2p"), defaultReplicateInterval, defaultRepublishInterval)
dbStore, err := sqlite.NewStore(ts.ctx, filepath.Join(workDir, "p2p"), defaultReplicateInterval, defaultRepublishInterval, nil)
if err != nil {
ts.T().Fatalf("new sqlite store: %v", err)
}
Expand Down
36 changes: 18 additions & 18 deletions p2p/kademlia/hashtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ func TestClosestContactsWithIncludingNode(t *testing.T) {
ignoredNodes: make([]*Node, 0),
topCount: 6,
expectedResults: &NodeList{Nodes: []*Node{
&Node{IP: "3.18.240.77"},
&Node{IP: "38.242.151.208"},
&Node{IP: "154.38.160.234"},
&Node{IP: "154.38.161.58"},
&Node{IP: "154.12.235.12"},
&Node{IP: "38.242.151.234"},
{IP: "3.18.240.77"},
{IP: "38.242.151.208"},
{IP: "154.38.160.234"},
{IP: "154.38.161.58"},
{IP: "154.12.235.12"},
{IP: "38.242.151.234"},
},
},
},
Expand Down Expand Up @@ -81,12 +81,12 @@ func TestClosestContactsWithIncludingNode(t *testing.T) {
ignoredNodes: make([]*Node, 0),
topCount: 6,
expectedResults: &NodeList{Nodes: []*Node{
&Node{IP: "154.38.161.58"},
&Node{IP: "154.38.160.234"},
&Node{IP: "38.242.151.208"},
&Node{IP: "3.18.240.77"},
&Node{IP: "154.12.235.12"},
&Node{IP: "149.102.147.122"},
{IP: "154.38.161.58"},
{IP: "154.38.160.234"},
{IP: "38.242.151.208"},
{IP: "3.18.240.77"},
{IP: "154.12.235.12"},
{IP: "149.102.147.122"},
},
},
},
Expand Down Expand Up @@ -117,12 +117,12 @@ func TestClosestContactsWithIncludingNode(t *testing.T) {
topCount: 6,
targetKey: "517b1003b805793f35f4242f481a48cd45e5431a2af6e10339cbcf97f7b1a27e",
expectedResults: &NodeList{Nodes: []*Node{
&Node{IP: "18.189.251.53"},
&Node{IP: "149.102.147.113"},
&Node{IP: "154.12.235.19"},
&Node{IP: "154.38.161.44"},
&Node{IP: "95.111.253.200"},
&Node{IP: "3.19.48.187"},
{IP: "18.189.251.53"},
{IP: "149.102.147.113"},
{IP: "154.12.235.19"},
{IP: "154.38.161.44"},
{IP: "95.111.253.200"},
{IP: "3.19.48.187"},
},
},
},
Expand Down
128 changes: 128 additions & 0 deletions p2p/kademlia/store/cloud.go/cloud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package cloud

import (
"bytes"
"fmt"

"os"
"os/exec"
"path/filepath"
"sync"

"github.com/pastelnetwork/gonode/common/log"
)

type Storage interface {
Store(key string, data []byte) (string, error)
Fetch(key string) ([]byte, error)
StoreBatch(data [][]byte) error
FetchBatch(keys []string) (map[string][]byte, error)
}

type RcloneStorage struct {
bucketName string
specName string
}

func NewRcloneStorage(bucketName, specName string) *RcloneStorage {
return &RcloneStorage{
bucketName: bucketName,
specName: specName,
}
}

func (r *RcloneStorage) Store(key string, data []byte) (string, error) {
filePath := filepath.Join(os.TempDir(), key)

// Write data to a temporary file using os.WriteFile
if err := os.WriteFile(filePath, data, 0644); err != nil {
return "", fmt.Errorf("failed to write data to file: %w", err)
}

// Construct the remote path where the file will be stored
// This example places the file at the root of the remote, but you can modify the path as needed
remotePath := fmt.Sprintf("%s:%s/%s", r.specName, r.bucketName, key)

// Use rclone to copy the file to the remote
cmd := exec.Command("rclone", "copyto", filePath, remotePath)
if err := cmd.Run(); err != nil {
// Clean up the local file if the upload fails
os.Remove(filePath)
return "", fmt.Errorf("rclone command failed: %w", err)
}

// Delete the local file after successful upload
go func() {
if err := os.Remove(filePath); err != nil {
log.Error("failed to delete local file", "path", filePath, "error", err)
}
}()

// Return the remote path where the file was stored
return remotePath, nil
}

func (r *RcloneStorage) Fetch(key string) ([]byte, error) {
// Construct the rclone command to fetch the file
cmd := exec.Command("rclone", "cat", fmt.Sprintf("%s:%s/%s", r.specName, r.bucketName, key))
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
return nil, fmt.Errorf("rclone command failed: %w - out %s", err, out.String())
}

return out.Bytes(), nil
}

func (r *RcloneStorage) StoreBatch(data [][]byte) error {
// Placeholder for StoreBatch implementation
return nil
}

func (r *RcloneStorage) FetchBatch(keys []string) (map[string][]byte, error) {
results := make(map[string][]byte)
errs := make(map[string]error)
var mu sync.Mutex

semaphore := make(chan struct{}, 50)

var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
semaphore <- struct{}{} // Acquire a token

go func(key string) {
defer wg.Done()
data, err := r.Fetch(key)

func() {
mu.Lock()
defer mu.Unlock()
if err != nil {
errs[key] = err
} else {
results[key] = data
}

}()
<-semaphore // Release the token
}(key)
}

wg.Wait()

if len(results) > 0 {
return results, nil
}

if len(errs) > 0 {
combinedError := fmt.Errorf("errors occurred in fetching keys")
for k, e := range errs {
combinedError = fmt.Errorf("%v; key %s error: %v", combinedError, k, e)
}
return nil, combinedError
}

return results, nil
}
102 changes: 102 additions & 0 deletions p2p/kademlia/store/cloud.go/cloud_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package cloud

/*
import (
"fmt"
"os"
"testing"
)
func TestRcloneStorage_Fetch(t *testing.T) {
SetRcloneB2Env("0058a83f04e57580000000002", "K005lP8Wd0Gvr4JOdEI6e6BTAyt6iZA")
storage := NewRcloneStorage("pastel-test", "b2")
tests := map[string]struct {
key string
expected string
wantErr bool
}{
"File Exists": {
key: "copy_45707.txt",
expected: "expected content of testfile.txt",
wantErr: false,
},
"File Does Not Exist": {
key: "copy_45707_nonexistent.txt",
wantErr: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
data, err := storage.Fetch(tc.key)
if (err != nil) != tc.wantErr {
t.Errorf("Fetch() error = %v, wantErr %v", err, tc.wantErr)
}
if !tc.wantErr && string(data) != tc.expected {
t.Errorf("Fetch() got = %v, want %v", string(data), tc.expected)
}
})
}
}
// SetRcloneB2Env sets the environment variables for rclone configuration.
func SetRcloneB2Env(accountID, appKey string) error {
err := os.Setenv("RCLONE_CONFIG_B2_TYPE", "b2")
if err != nil {
return fmt.Errorf("failed to set RCLONE_CONFIG_B2_TYPE: %w", err)
}
err = os.Setenv("RCLONE_CONFIG_B2_ACCOUNT", accountID)
if err != nil {
return fmt.Errorf("failed to set RCLONE_CONFIG_B2_ACCOUNT: %w", err)
}
err = os.Setenv("RCLONE_CONFIG_B2_KEY", appKey)
if err != nil {
return fmt.Errorf("failed to set RCLONE_CONFIG_B2_KEY: %w", err)
}
return nil
}
func TestRcloneStorage_FetchBatch(t *testing.T) {
storage := NewRcloneStorage("mybucket")
tests := map[string]struct {
keys []string
expected map[string]string
wantErr bool
}{
"Multiple Files Exist": {
keys: []string{"testfile1.txt", "testfile2.txt"}, // Ensure these files exist
expected: map[string]string{
"testfile1.txt": "content of testfile1.txt",
"testfile2.txt": "content of testfile2.txt",
},
wantErr: false,
},
"Some Files Do Not Exist": {
keys: []string{"testfile1.txt", "doesNotExist.txt"},
wantErr: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
results, err := storage.FetchBatch(tc.keys)
if (err != nil) != tc.wantErr {
t.Errorf("FetchBatch() error = %v, wantErr %v", err, tc.wantErr)
}
if !tc.wantErr {
for k, v := range tc.expected {
if results[k] != v {
t.Errorf("FetchBatch() got = %v, want %v for key %v", string(results[k]), v, k)
}
}
}
})
}
}
*/
40 changes: 37 additions & 3 deletions p2p/kademlia/store/sqlite/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,35 +434,69 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt
keyToIndex[keys[i]] = i
}

query := fmt.Sprintf(`SELECT key, data FROM data WHERE key IN (%s)`, strings.Join(placeholders, ","))
query := fmt.Sprintf(`SELECT key, data, is_on_cloud FROM data WHERE key IN (%s)`, strings.Join(placeholders, ","))
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, fmt.Errorf("failed to retrieve records: %w", err)
}
defer rows.Close()

values := make([][]byte, len(keys))
var cloudKeys []string
keysFound := 0

for rows.Next() {
var key string
var value []byte
if err := rows.Scan(&key, &value); err != nil {
var is_on_cloud bool
if err := rows.Scan(&key, &value, &is_on_cloud); err != nil {
return nil, keysFound, fmt.Errorf("failed to scan key and value: %w", err)
}

if idx, found := keyToIndex[key]; found {
values[idx] = value
keysFound++

PostAccessUpdate([]string{key})
if s.isCloudBackupOn() {
if len(value) == 0 && is_on_cloud {
cloudKeys = append(cloudKeys, key)
}
PostAccessUpdate([]string{key})
}
}
}

if err := rows.Err(); err != nil {
return nil, keysFound, fmt.Errorf("rows processing error: %w", err)
}

if len(cloudKeys) > 0 {
// Fetch from cloud
cloudValues, err := s.cloud.FetchBatch(cloudKeys)
if err != nil {
log.WithContext(ctx).WithError(err).Error("failed to fetch from cloud")
}

for key, value := range cloudValues {
if idx, found := keyToIndex[key]; found {
values[idx] = value
keysFound++
}
}

go func() {
datList := make([][]byte, 0, len(cloudValues))
for _, v := range cloudValues {
datList = append(datList, v)
}

// Store the fetched data in the local store
if err := s.StoreBatch(ctx, datList, 0, false); err != nil {
log.WithError(err).Error("failed to store fetched data in local store")
}
}()
}

return values, keysFound, nil
}

Expand Down
Loading

0 comments on commit d9ae328

Please sign in to comment.