Skip to content

Commit

Permalink
services: add default values for NeoFSBlockFetcher configuration
Browse files Browse the repository at this point in the history
The minimum sufficient configuration is Addresses and ContainerID,
example:
```
  NeoFSBlockFetcher:
    Enabled: true
    Addresses:
      - st1.storage.fs.neo.org:8080
      - st2.storage.fs.neo.org:8080
      - st3.storage.fs.neo.org:8080
      - st4.storage.fs.neo.org:8080
    ContainerID: "87JRc7vyWcjW8uS32LMoLTAj4ckCzFZWfKbacjU3sAob"
```

Close #3718

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
  • Loading branch information
AliceInHunterland committed Dec 12, 2024
1 parent cb4b21f commit e59780a
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 145 deletions.
5 changes: 3 additions & 2 deletions cli/util/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/cli/txctx"
vmcli "github.com/nspcc-dev/neo-go/cli/vm"
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
"github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -74,7 +75,7 @@ func NewCommands() []*cli.Command {
&cli.UintFlag{
Name: "index-file-size",
Usage: "Size of index file",
Value: 128000,
Value: neofs.DefaultIndexFileSize,
},
&cli.UintFlag{
Name: "workers",
Expand All @@ -89,7 +90,7 @@ func NewCommands() []*cli.Command {
&cli.UintFlag{
Name: "retries",
Usage: "Maximum number of Neo/NeoFS node request retries",
Value: 5,
Value: neofs.MaxRetries,
Action: func(context *cli.Context, u uint) error {
if u < 1 {
return cli.Exit("retries should be greater than 0", 1)
Expand Down
70 changes: 20 additions & 50 deletions cli/util/upload_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package util

import (
"context"
"crypto/sha256"
"fmt"
"slices"
"strconv"
Expand All @@ -14,7 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
Expand All @@ -30,35 +29,6 @@ import (
"github.com/urfave/cli/v2"
)

const (
// Number of objects to search in a batch. We need to search with EQ filter to
// avoid partially-completed SEARCH responses. If EQ search haven't found object
// the object will be uploaded one more time which may lead to duplicating objects.
// We will have a risk of duplicates until #3645 is resolved (NeoFS guarantees
// search results).
searchBatchSize = 1
// Size of object ID.
oidSize = sha256.Size
)

// Constants related to retry mechanism.
const (
// Initial backoff duration.
initialBackoff = 500 * time.Millisecond
// Backoff multiplier.
backoffFactor = 2
// Maximum backoff duration.
maxBackoff = 20 * time.Second
)

// Constants related to NeoFS pool request timeouts.
// Such big values are used to avoid NeoFS pool timeouts during block search and upload.
const (
defaultDialTimeout = 10 * time.Minute
defaultStreamTimeout = 10 * time.Minute
defaultHealthcheckTimeout = 10 * time.Second
)

// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
type poolWrapper struct {
*pool.Pool
Expand Down Expand Up @@ -103,9 +73,9 @@ func uploadBin(ctx *cli.Context) error {
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)

params := pool.DefaultOptions()
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
params.SetNodeDialTimeout(defaultDialTimeout)
params.SetNodeStreamTimeout(defaultStreamTimeout)
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
Expand Down Expand Up @@ -166,15 +136,15 @@ func uploadBin(ctx *cli.Context) error {
// retry function with exponential backoff.
func retry(action func() error, maxRetries uint) error {
var err error
backoff := initialBackoff
backoff := neofs.InitialBackoff

Check warning on line 139 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L139

Added line #L139 was not covered by tests
for range maxRetries {
if err = action(); err == nil {
return nil // Success, no retry needed.
}
time.Sleep(backoff) // Backoff before retrying.
backoff *= time.Duration(backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
backoff *= time.Duration(neofs.BackoffFactor)
if backoff > neofs.MaxBackoff {
backoff = neofs.MaxBackoff

Check warning on line 147 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L145-L147

Added lines #L145 - L147 were not covered by tests
}
}
return err // Return the last error after exhausting retries.
Expand All @@ -193,15 +163,15 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
errCh = make(chan error)
doneCh = make(chan struct{})
wg sync.WaitGroup
emptyOID = make([]byte, oidSize)
emptyOID = make([]byte, neofs.OidSize)

Check warning on line 166 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L166

Added line #L166 was not covered by tests
)
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1)
wg.Add(int(numWorkers))
for i := range numWorkers {
go func(i uint) {
defer wg.Done()
for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers {
if slices.Compare(buf[blockIndex%indexFileSize*oidSize:blockIndex%indexFileSize*oidSize+oidSize], emptyOID) != 0 {
if slices.Compare(buf[blockIndex%indexFileSize*neofs.OidSize:blockIndex%indexFileSize*neofs.OidSize+neofs.OidSize], emptyOID) != 0 {

Check warning on line 174 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L174

Added line #L174 was not covered by tests
if debug {
fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex)
}
Expand Down Expand Up @@ -263,7 +233,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
}
return
}
resOid.Encode(buf[blockIndex%indexFileSize*oidSize:])
resOid.Encode(buf[blockIndex%indexFileSize*neofs.OidSize:])

Check warning on line 236 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L236

Added line #L236 was not covered by tests
}
}(i)
}
Expand All @@ -281,9 +251,9 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1)

// Additional check for empty OIDs in the buffer.
for k := uint(0); k < (indexFileEnd-indexFileStart)*oidSize; k += oidSize {
if slices.Compare(buf[k:k+oidSize], emptyOID) == 0 {
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oidSize, indexFileStart/indexFileSize*indexFileSize+k/oidSize)
for k := uint(0); k < (indexFileEnd-indexFileStart)*neofs.OidSize; k += neofs.OidSize {
if slices.Compare(buf[k:k+neofs.OidSize], emptyOID) == 0 {
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/neofs.OidSize, indexFileStart/indexFileSize*indexFileSize+k/neofs.OidSize)

Check warning on line 256 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L254-L256

Added lines #L254 - L256 were not covered by tests
}
}
if indexFileEnd-indexFileStart == indexFileSize {
Expand All @@ -310,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) {
var (
// buf is used to store OIDs of the uploaded blocks.
buf = make([]byte, indexFileSize*oidSize)
buf = make([]byte, indexFileSize*neofs.OidSize)

Check warning on line 283 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L283

Added line #L283 was not covered by tests
doneCh = make(chan struct{})
errCh = make(chan error)

Expand Down Expand Up @@ -377,7 +347,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
}
pos := uint(blockIndex) % indexFileSize
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
id.Encode(buf[pos*oidSize:])
id.Encode(buf[pos*neofs.OidSize:])

Check warning on line 350 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L350

Added line #L350 was not covered by tests
}
}
}()
Expand All @@ -404,15 +374,15 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*searchBatchSize)
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)

Check warning on line 377 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L377

Added line #L377 was not covered by tests
go func() {
var wg sync.WaitGroup
defer close(res)

for i := startIndex; i < endIndex; i += searchBatchSize * maxParallelSearches {
for i := startIndex; i < endIndex; i += neofs.DefaultSearchBatchSize * maxParallelSearches {

Check warning on line 382 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L382

Added line #L382 was not covered by tests
for j := range maxParallelSearches {
start := i + j*searchBatchSize
end := start + searchBatchSize
start := i + j*neofs.DefaultSearchBatchSize
end := start + neofs.DefaultSearchBatchSize

Check warning on line 385 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L384-L385

Added lines #L384 - L385 were not covered by tests

if start >= endIndex {
break
Expand Down
14 changes: 0 additions & 14 deletions pkg/config/application_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,6 @@ func TestNeoFSBlockFetcherValidation(t *testing.T) {
shouldFail: true,
errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
SkipIndexFilesSearch: false,
IndexFileSize: 0,
},
shouldFail: true,
errMsg: "IndexFileSize is not set",
},
}

for _, c := range cases {
Expand Down
3 changes: 0 additions & 3 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,5 @@ func (cfg *NeoFSBlockFetcher) Validate() error {
if len(cfg.Addresses) == 0 {
return errors.New("addresses are not set")
}
if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 {
return errors.New("IndexFileSize is not set")
}
return nil
}
4 changes: 4 additions & 0 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -224,6 +225,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = neofs.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
Expand Down
78 changes: 23 additions & 55 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package blockfetcher

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
Expand All @@ -16,7 +15,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
Expand All @@ -28,43 +27,6 @@ import (
"go.uber.org/zap"
)

const (
// oidSize is the size of the object ID in NeoFS.
oidSize = sha256.Size
// defaultTimeout is the default timeout for NeoFS requests.
defaultTimeout = 5 * time.Minute
// defaultOIDBatchSize is the default number of OIDs to search and fetch at once.
defaultOIDBatchSize = 8000
// defaultDownloaderWorkersCount is the default number of workers downloading blocks.
defaultDownloaderWorkersCount = 100
)

// Constants related to NeoFS pool request timeouts.
const (
// defaultDialTimeout is a default timeout used to establish connection with
// NeoFS storage nodes.
defaultDialTimeout = 30 * time.Second
// defaultStreamTimeout is a default timeout used for NeoFS streams processing.
// It has significantly large value to reliably avoid timeout problems with heavy
// SEARCH requests.
defaultStreamTimeout = 10 * time.Minute
// defaultHealthcheckTimeout is a timeout for request to NeoFS storage node to
// decide if it is alive.
defaultHealthcheckTimeout = 10 * time.Second
)

// Constants related to retry mechanism.
const (
// maxRetries is the maximum number of retries for a single operation.
maxRetries = 5
// initialBackoff is the initial backoff duration.
initialBackoff = 500 * time.Millisecond
// backoffFactor is the factor by which the backoff duration is multiplied.
backoffFactor = 2
// maxBackoff is the maximum backoff duration.
maxBackoff = 20 * time.Second
)

// Ledger is an interface to Blockchain sufficient for Service.
type Ledger interface {
GetConfig() config.Blockchain
Expand Down Expand Up @@ -143,22 +105,28 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
}
}
if cfg.Timeout <= 0 {
cfg.Timeout = defaultTimeout
cfg.Timeout = neofs.DefaultTimeout
}
if cfg.OIDBatchSize <= 0 {
cfg.OIDBatchSize = defaultOIDBatchSize
cfg.OIDBatchSize = neofs.DefaultQueueCacheSize / 2
}
if cfg.DownloaderWorkersCount <= 0 {
cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount
cfg.DownloaderWorkersCount = neofs.DefaultDownloaderWorkersCount
}
if cfg.IndexFileSize <= 0 {
cfg.IndexFileSize = neofs.DefaultIndexFileSize
}
if cfg.BlockAttribute == "" {
cfg.BlockAttribute = neofs.DefaultBlockAttribute
}
if len(cfg.Addresses) == 0 {
return nil, errors.New("no addresses provided")
if cfg.IndexFileAttribute == "" {
cfg.IndexFileAttribute = neofs.DefaultIndexFileAttribute
}

params := pool.DefaultOptions()
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
params.SetNodeDialTimeout(defaultDialTimeout)
params.SetNodeStreamTimeout(defaultStreamTimeout)
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(cfg.Addresses), user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey), params)
if err != nil {
return nil, err
Expand Down Expand Up @@ -357,7 +325,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
defer rc.Close()
oidBytes := make([]byte, oidSize)
oidBytes := make([]byte, neofs.OidSize)

Check warning on line 328 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L328

Added line #L328 was not covered by tests
oidsProcessed := 0

for {
Expand Down Expand Up @@ -397,7 +365,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
func (bfs *Service) fetchOIDsBySearch() error {
startIndex := bfs.chain.BlockHeight()
//We need to search with EQ filter to avoid partially-completed SEARCH responses.
batchSize := uint32(1)
batchSize := uint32(neofs.DefaultSearchBatchSize)

Check warning on line 368 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L368

Added line #L368 was not covered by tests

for {
select {
Expand Down Expand Up @@ -513,7 +481,7 @@ func (bfs *Service) IsActive() bool {
func (bfs *Service) retry(action func() error) error {
var (
err error
backoff = initialBackoff
backoff = neofs.InitialBackoff

Check warning on line 484 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L484

Added line #L484 was not covered by tests
timer = time.NewTimer(0)
)
defer func() {
Expand All @@ -525,11 +493,11 @@ func (bfs *Service) retry(action func() error) error {
}
}()

for i := range maxRetries {
for i := range neofs.MaxRetries {

Check warning on line 496 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L496

Added line #L496 was not covered by tests
if err = action(); err == nil {
return nil
}
if i == maxRetries-1 {
if i == neofs.MaxRetries-1 {

Check warning on line 500 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L500

Added line #L500 was not covered by tests
break
}
timer.Reset(backoff)
Expand All @@ -539,9 +507,9 @@ func (bfs *Service) retry(action func() error) error {
case <-bfs.ctx.Done():
return bfs.ctx.Err()
}
backoff *= time.Duration(backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
backoff *= time.Duration(neofs.BackoffFactor)
if backoff > neofs.MaxBackoff {
backoff = neofs.MaxBackoff

Check warning on line 512 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L510-L512

Added lines #L510 - L512 were not covered by tests
}
}
return err
Expand Down
Loading

0 comments on commit e59780a

Please sign in to comment.