Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PSL-1213] handle bottlenecks during registration of huge files ~5GB #896

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/types/ticket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type File struct {

type Files []*File

func (f Files) Names() []string {
names := make([]string, 0, len(f))
for _, file := range f {
names = append(names, file.FileID)
}
return names
}

type RegistrationAttempt struct {
ID int
FileID string
Expand Down
2 changes: 1 addition & 1 deletion p2p/kademlia/store/sqlite/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *Store) UpdateReplicationInfo(_ context.Context, rep domain.NodeReplicat
_, err := s.db.Exec(`UPDATE replication_info SET ip = ?, is_active = ?, is_adjusted = ?, lastReplicatedAt = ?, updatedAt =?, port = ?, last_seen = ? WHERE id = ?`,
rep.IP, rep.Active, rep.IsAdjusted, rep.LastReplicatedAt, rep.UpdatedAt, rep.Port, string(rep.ID), rep.LastSeen)
if err != nil {
return fmt.Errorf("failed to update replicated records: %v", err)
return fmt.Errorf("failed to update replication info: %v", err)
}

return err
Expand Down
2 changes: 1 addition & 1 deletion p2p/kademlia/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (s *Store) updateKeyReplication(key []byte, replicatedAt time.Time) error {
keyStr := hex.EncodeToString(key)
_, err := s.db.Exec(`UPDATE data SET replicatedAt = ? WHERE key = ?`, replicatedAt, keyStr)
if err != nil {
return fmt.Errorf("failed to update replicated records: %v", err)
return fmt.Errorf("failed to update key replication: %v", err)
}

return err
Expand Down
33 changes: 17 additions & 16 deletions pastel/jsonrpc/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
"time"

"encoding/json"

"golang.org/x/sync/semaphore"
)

const (
jsonrpcVersion = "2.0"
timeout = 70 * time.Second
httpTimeout = 60 * time.Second
maxConcurrentRequests = int64(350)
jsonrpcVersion = "2.0"
timeout = 70 * time.Second
httpTimeout = 60 * time.Second
maxConcurrentRequests = int64(650)
)

// RPCClient sends JSON-RPC requests over HTTP to the provided JSON-RPC backend.
Expand Down Expand Up @@ -248,7 +249,7 @@ type rpcClient struct {
endpoint string
httpClient *http.Client
customHeaders map[string]string
sem *semaphore.Weighted
sem *semaphore.Weighted
}

// RPCClientOpts can be provided to NewClientWithOpts() to change configuration of RPCClient.
Expand Down Expand Up @@ -318,12 +319,12 @@ func NewClientWithOpts(endpoint string, opts *RPCClientOpts) RPCClient {
httpClient: &http.Client{
Timeout: httpTimeout,
Transport: &http.Transport{
DisableKeepAlives: false, // explicitly enable keep-alives - although its the default behavior
MaxIdleConnsPerHost: 75, // increase the number of idle connections per host as we are connecting to the same host
IdleConnTimeout: 60 * time.Second,
DisableKeepAlives: false, // explicitly enable keep-alives - although its the default behavior
MaxIdleConnsPerHost: 250, // increase the number of idle connections per host as we are connecting to the same host
IdleConnTimeout: 30 * time.Second,
},
},
sem: semaphore.NewWeighted(maxConcurrentRequests),
sem: semaphore.NewWeighted(maxConcurrentRequests),
customHeaders: make(map[string]string),
}

Expand Down Expand Up @@ -454,9 +455,9 @@ func (client *rpcClient) newRequest(ctx context.Context, req interface{}) (*http

func (client *rpcClient) doCall(cctx context.Context, RPCRequest *RPCRequest) (*RPCResponse, error) {
if err := client.sem.Acquire(cctx, 1); err != nil {
return nil, fmt.Errorf("waiting for semaphore on rpc call on %v", err.Error())
}
defer client.sem.Release(1)
return nil, fmt.Errorf("waiting for semaphore on rpc call on %v", err.Error())
}
defer client.sem.Release(1)

ctx, cancel := context.WithTimeout(cctx, timeout)
defer cancel()
Expand All @@ -465,7 +466,7 @@ func (client *rpcClient) doCall(cctx context.Context, RPCRequest *RPCRequest) (*
if err != nil {
return nil, fmt.Errorf("rpc call %v() on %v: %v", RPCRequest.Method, client.endpoint, err.Error())
}

httpResponse, err := client.httpClient.Do(httpRequest)
if err != nil {
return nil, fmt.Errorf("rpc call %v() on %v: %v", RPCRequest.Method, httpRequest.URL.String(), err.Error())
Expand Down Expand Up @@ -507,9 +508,9 @@ func (client *rpcClient) doCall(cctx context.Context, RPCRequest *RPCRequest) (*

func (client *rpcClient) doBatchCall(rpcRequest []*RPCRequest) ([]*RPCResponse, error) {
if err := client.sem.Acquire(context.Background(), 1); err != nil {
return nil, fmt.Errorf("waiting for semaphore on rpc batch call on %v", err.Error())
}
defer client.sem.Release(1)
return nil, fmt.Errorf("waiting for semaphore on rpc batch call on %v", err.Error())
}
defer client.sem.Release(1)

httpRequest, err := client.newRequest(context.Background(), rpcRequest)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion raptorq/node/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

const (
logPrefix = "grpc-raptorqClient"
defaultConnectTimeout = 45 * time.Second
defaultConnectTimeout = 120 * time.Second
)

type client struct{}
Expand Down
2 changes: 1 addition & 1 deletion raptorq/node/grpc/raptorq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
inputEncodeFileName = "input.data"
symbolIDFileSubDir = "meta"
symbolFileSubdir = "symbols"
concurrency = 4
concurrency = 1
)

type raptorQ struct {
Expand Down
2 changes: 1 addition & 1 deletion supernode/services/cascaderegister/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (task *CascadeRegistrationTask) ValidateAndRegister(_ context.Context, tick
log.WithContext(ctx).Infof("isPrimary: %t", task.NetworkHandler.ConnectedTo == nil)
if err = task.signAndSendCascadeTicket(ctx, task.NetworkHandler.ConnectedTo == nil); err != nil {
log.WithContext(ctx).WithError(err).Errorf("signed and send Cascade ticket")
err = errors.Errorf("signed and send NFT ticket: %w", err)
err = errors.Errorf("signed and send Cascade ticket: %w", err)
return nil
}

Expand Down
47 changes: 36 additions & 11 deletions supernode/services/common/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
const (
loadSymbolsBatchSize = 2500
storeSymbolsPercent = 10
concurrency = 2
)

// StorageHandler provides common logic for RQ and P2P operations
Expand All @@ -36,7 +37,8 @@ type StorageHandler struct {
TaskID string
TxID string

store rqstore.Store
store rqstore.Store
semaphore chan struct{}
}

// NewStorageHandler creates instance of StorageHandler
Expand All @@ -49,6 +51,7 @@ func NewStorageHandler(p2p p2p.Client, rq rqnode.ClientInterface,
rqAddress: rqAddress,
rqDir: rqDir,
store: store,
semaphore: make(chan struct{}, concurrency),
}
}

Expand Down Expand Up @@ -86,7 +89,7 @@ func (h *StorageHandler) GenerateRaptorQSymbols(ctx context.Context, data []byte
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 2 * time.Minute
b.MaxElapsedTime = 3 * time.Minute
b.InitialInterval = 200 * time.Millisecond

var conn rqnode.Connection
Expand All @@ -111,9 +114,19 @@ func (h *StorageHandler) GenerateRaptorQSymbols(ctx context.Context, data []byte
RqFilesDir: h.rqDir,
})

encodeResp, err := rqService.RQEncode(ctx, data, h.TxID, h.store)
if err != nil {
return nil, errors.Errorf("create raptorq symbol from data %s: %w", name, err)
b.Reset()

encodeResp := &rqnode.Encode{}
if err := backoff.Retry(backoff.Operation(func() error {
var err error
encodeResp, err = rqService.RQEncode(ctx, data, h.TxID, h.store)
if err != nil {
return errors.Errorf("create raptorq symbol from data %s: %w", name, err)
}

return nil
}), b); err != nil {
return nil, fmt.Errorf("retry do rqencode service: %w", err)
}

return encodeResp.Symbols, nil
Expand All @@ -122,15 +135,15 @@ func (h *StorageHandler) GenerateRaptorQSymbols(ctx context.Context, data []byte
// GetRaptorQEncodeInfo calls RQ service to get Encoding info and list of RQIDs
func (h *StorageHandler) GetRaptorQEncodeInfo(ctx context.Context,
data []byte, num uint32, hash string, pastelID string,
) (*rqnode.EncodeInfo, error) {
) (encodeInfo *rqnode.EncodeInfo, err error) {
if h.RqClient == nil {
log.WithContext(ctx).Warnf("RQ Server is not initialized")
return nil, errors.Errorf("RQ Server is not initialized")
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 2 * time.Minute
b.InitialInterval = 200 * time.Millisecond
b.MaxElapsedTime = 3 * time.Minute
b.InitialInterval = 500 * time.Millisecond

var conn rqnode.Connection
if err := backoff.Retry(backoff.Operation(func() error {
Expand All @@ -154,9 +167,16 @@ func (h *StorageHandler) GetRaptorQEncodeInfo(ctx context.Context,
RqFilesDir: h.rqDir,
})

encodeInfo, err := rqService.EncodeInfo(ctx, data, num, hash, pastelID)
if err != nil {
return nil, errors.Errorf("get raptorq encode info: %w", err)
b.Reset()
if err := backoff.Retry(backoff.Operation(func() error {
var err error
encodeInfo, err = rqService.EncodeInfo(ctx, data, num, hash, pastelID)
if err != nil {
return errors.Errorf("get raptorq encode info: %w", err)
}
return nil
}), b); err != nil {
return nil, fmt.Errorf("retry do encode info on raptorq service: %w", err)
}

return encodeInfo, nil
Expand Down Expand Up @@ -231,6 +251,11 @@ func (h *StorageHandler) storeSymbolsInP2P(ctx context.Context, dir string, batc
}

func (h *StorageHandler) StoreRaptorQSymbolsIntoP2P(ctx context.Context, data []byte, name string) error {
h.semaphore <- struct{}{} // Acquire slot
defer func() {
<-h.semaphore // Release the semaphore slot
}()

// Generate the keys for RaptorQ symbols, with empty values
log.WithContext(ctx).Info("generating RaptorQ symbols")
keysMap, err := h.GenerateRaptorQSymbols(ctx, data, name)
Expand Down
2 changes: 1 addition & 1 deletion supernode/services/senseregister/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (task *SenseRegistrationTask) ValidateAndRegister(_ context.Context, ticket
log.WithContext(ctx).Infof("isPrimary: %t", task.NetworkHandler.ConnectedTo == nil)
if err = task.signAndSendSenseTicket(ctx, task.NetworkHandler.ConnectedTo == nil); err != nil {
log.WithContext(ctx).WithError(err).Errorf("sign and send Sense ticket")
err = errors.Errorf("signed and send NFT ticket")
err = errors.Errorf("signed and send Sense ticket")
return nil
}

Expand Down
43 changes: 31 additions & 12 deletions walletnode/api/services/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/pastelnetwork/gonode/common/types"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/pastelnetwork/gonode/common/types"

"github.com/gorilla/websocket"
"github.com/pastelnetwork/gonode/common/errors"
"github.com/pastelnetwork/gonode/common/log"
Expand All @@ -29,7 +30,7 @@ import (
)

const (
maxFileSize = 350 * 1024 * 1024 // 350MB in bytes
maxFileSize = 300 * 1024 * 1024 // 300MB in bytes
maxFileRegistrationAttempts = 3
downloadDeadline = 30 * time.Minute
)
Expand Down Expand Up @@ -287,6 +288,14 @@ func (service *CascadeAPIHandler) Download(ctx context.Context, p *cascade.Downl
Txid string
}

// Create directory with p.Txid
folderPath := filepath.Join(service.config.StaticFilesDir, p.Txid)
if _, err := os.Stat(folderPath); os.IsNotExist(err) {
if err := os.MkdirAll(folderPath, os.ModePerm); err != nil {
return nil, err
}
}

// Channel to control the concurrency of downloads
sem := make(chan struct{}, 3) // Max 3 concurrent downloads
taskResults := make(chan *DownloadResult)
Expand All @@ -296,7 +305,26 @@ func (service *CascadeAPIHandler) Download(ctx context.Context, p *cascade.Downl
defer cancel()

// Starting multiple download tasks
tasks := 0
for _, txID := range txIDs {

filename, size, err := service.download.GetFilenameAndSize(ctx, txID)
if err != nil {
return nil, err
}

// check if file already exists
filePath := filepath.Join(folderPath, filename)
if fileinfo, err := os.Stat(filePath); err == nil {
if fileinfo.Size() == int64(size) {
log.WithContext(ctx).WithField("filename", filename).WithField("txid", txID).Info("skipping file download as it already exists")
continue
} else {
log.WithContext(ctx).WithField("filename", filename).WithField("txid", txID).Warn("file exists but downloading again due to size mismatch")
}
}

tasks++
go func(txID string) {
sem <- struct{}{} // Acquiring the semaphore
defer func() { <-sem }() // Releasing the semaphore
Expand Down Expand Up @@ -341,17 +369,8 @@ func (service *CascadeAPIHandler) Download(ctx context.Context, p *cascade.Downl
}(txID)
}

// Create directory with p.Txid
folderPath := filepath.Join(service.config.StaticFilesDir, p.Txid)
if _, err := os.Stat(folderPath); os.IsNotExist(err) {
if err := os.MkdirAll(folderPath, os.ModePerm); err != nil {
cancel()
return nil, err
}
}

var filePath string
for i := 0; i < len(txIDs); i++ {
for i := 0; i < tasks; i++ {
select {
case res := <-taskResults:
filePath = filepath.Join(folderPath, res.Filename)
Expand Down
3 changes: 2 additions & 1 deletion walletnode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
appUsage = "WalletNode" // TODO: Write a clear description.
rqFilesDir = "rqfiles"
staticFilesDir = "files"
cascadeFiles = "cascadefiles"
)

var (
Expand All @@ -52,7 +53,7 @@ var (
defaultPastelConfigFile = filepath.Join(defaultPath, "pastel.conf")
defaultRqFilesDir = filepath.Join(defaultPath, rqFilesDir)
defaultStaticFilesDir = filepath.Join(defaultPath, staticFilesDir)
defaultCascadeFilesDir = filepath.Join(defaultPath, "files")
defaultCascadeFilesDir = filepath.Join(defaultPath, cascadeFiles)
)

// NewApp configures our app by parsing command line flags, config files, and setting up logging and temporary directories
Expand Down
5 changes: 3 additions & 2 deletions walletnode/services/cascaderegister/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package cascaderegister

import (
"path/filepath"

"github.com/pastelnetwork/gonode/common/configurer"
"github.com/pastelnetwork/gonode/walletnode/services/common"
"path/filepath"
)

const (
Expand All @@ -17,7 +18,7 @@ const (
)

var (
defaultCascadeFilesDir = filepath.Join(configurer.DefaultPath(), "files")
defaultCascadeFilesDir = filepath.Join(configurer.DefaultPath(), "cascadefiles")
)

// Config contains settings of the registering nft.
Expand Down
Loading
Loading