Skip to content

Commit 9e09f56

Browse files
services: add default values for NeoFSBlockFetcher configuration
The minimum sufficient configuration is Addresses and ContainerID, example: ``` NeoFSBlockFetcher: Enabled: true Addresses: - st1.storage.fs.neo.org:8080 - st2.storage.fs.neo.org:8080 - st3.storage.fs.neo.org:8080 - st4.storage.fs.neo.org:8080 ContainerID: "87JRc7vyWcjW8uS32LMoLTAj4ckCzFZWfKbacjU3sAob" ``` Close #3718 Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
1 parent b7b47a4 commit 9e09f56

File tree

13 files changed

+193
-145
lines changed

13 files changed

+193
-145
lines changed

cli/util/convert.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/nspcc-dev/neo-go/cli/options"
1212
"github.com/nspcc-dev/neo-go/cli/txctx"
1313
vmcli "github.com/nspcc-dev/neo-go/cli/vm"
14+
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
1415
"github.com/nspcc-dev/neo-go/pkg/vm"
1516
"github.com/urfave/cli/v2"
1617
)
@@ -74,7 +75,7 @@ func NewCommands() []*cli.Command {
7475
&cli.UintFlag{
7576
Name: "index-file-size",
7677
Usage: "Size of index file",
77-
Value: 128000,
78+
Value: neofs.DefaultIndexFileSize,
7879
},
7980
&cli.UintFlag{
8081
Name: "workers",
@@ -89,7 +90,7 @@ func NewCommands() []*cli.Command {
8990
&cli.UintFlag{
9091
Name: "retries",
9192
Usage: "Maximum number of Neo/NeoFS node request retries",
92-
Value: 5,
93+
Value: neofs.MaxRetries,
9394
Action: func(context *cli.Context, u uint) error {
9495
if u < 1 {
9596
return cli.Exit("retries should be greater than 0", 1)

cli/util/upload_bin.go

Lines changed: 20 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package util
22

33
import (
44
"context"
5-
"crypto/sha256"
65
"fmt"
76
"slices"
87
"strconv"
@@ -14,7 +13,7 @@ import (
1413
"github.com/nspcc-dev/neo-go/pkg/core/block"
1514
"github.com/nspcc-dev/neo-go/pkg/io"
1615
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
17-
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
16+
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
1817
"github.com/nspcc-dev/neo-go/pkg/util"
1918
"github.com/nspcc-dev/neo-go/pkg/wallet"
2019
"github.com/nspcc-dev/neofs-sdk-go/checksum"
@@ -30,35 +29,6 @@ import (
3029
"github.com/urfave/cli/v2"
3130
)
3231

33-
const (
34-
// Number of objects to search in a batch. We need to search with EQ filter to
35-
// avoid partially-completed SEARCH responses. If EQ search haven't found object
36-
// the object will be uploaded one more time which may lead to duplicating objects.
37-
// We will have a risk of duplicates until #3645 is resolved (NeoFS guarantees
38-
// search results).
39-
searchBatchSize = 1
40-
// Size of object ID.
41-
oidSize = sha256.Size
42-
)
43-
44-
// Constants related to retry mechanism.
45-
const (
46-
// Initial backoff duration.
47-
initialBackoff = 500 * time.Millisecond
48-
// Backoff multiplier.
49-
backoffFactor = 2
50-
// Maximum backoff duration.
51-
maxBackoff = 20 * time.Second
52-
)
53-
54-
// Constants related to NeoFS pool request timeouts.
55-
// Such big values are used to avoid NeoFS pool timeouts during block search and upload.
56-
const (
57-
defaultDialTimeout = 10 * time.Minute
58-
defaultStreamTimeout = 10 * time.Minute
59-
defaultHealthcheckTimeout = 10 * time.Second
60-
)
61-
6232
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
6333
type poolWrapper struct {
6434
*pool.Pool
@@ -103,9 +73,9 @@ func uploadBin(ctx *cli.Context) error {
10373
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)
10474

10575
params := pool.DefaultOptions()
106-
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
107-
params.SetNodeDialTimeout(defaultDialTimeout)
108-
params.SetNodeStreamTimeout(defaultStreamTimeout)
76+
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
77+
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
78+
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
10979
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
11080
if err != nil {
11181
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
@@ -166,15 +136,15 @@ func uploadBin(ctx *cli.Context) error {
166136
// retry function with exponential backoff.
167137
func retry(action func() error, maxRetries uint) error {
168138
var err error
169-
backoff := initialBackoff
139+
backoff := neofs.InitialBackoff
170140
for range maxRetries {
171141
if err = action(); err == nil {
172142
return nil // Success, no retry needed.
173143
}
174144
time.Sleep(backoff) // Backoff before retrying.
175-
backoff *= time.Duration(backoffFactor)
176-
if backoff > maxBackoff {
177-
backoff = maxBackoff
145+
backoff *= time.Duration(neofs.BackoffFactor)
146+
if backoff > neofs.MaxBackoff {
147+
backoff = neofs.MaxBackoff
178148
}
179149
}
180150
return err // Return the last error after exhausting retries.
@@ -193,15 +163,15 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
193163
errCh = make(chan error)
194164
doneCh = make(chan struct{})
195165
wg sync.WaitGroup
196-
emptyOID = make([]byte, oidSize)
166+
emptyOID = make([]byte, neofs.OidSize)
197167
)
198168
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1)
199169
wg.Add(int(numWorkers))
200170
for i := range numWorkers {
201171
go func(i uint) {
202172
defer wg.Done()
203173
for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers {
204-
if slices.Compare(buf[blockIndex%indexFileSize*oidSize:blockIndex%indexFileSize*oidSize+oidSize], emptyOID) != 0 {
174+
if slices.Compare(buf[blockIndex%indexFileSize*neofs.OidSize:blockIndex%indexFileSize*neofs.OidSize+neofs.OidSize], emptyOID) != 0 {
205175
if debug {
206176
fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex)
207177
}
@@ -263,7 +233,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
263233
}
264234
return
265235
}
266-
resOid.Encode(buf[blockIndex%indexFileSize*oidSize:])
236+
resOid.Encode(buf[blockIndex%indexFileSize*neofs.OidSize:])
267237
}
268238
}(i)
269239
}
@@ -281,9 +251,9 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
281251
fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1)
282252

283253
// Additional check for empty OIDs in the buffer.
284-
for k := uint(0); k < (indexFileEnd-indexFileStart)*oidSize; k += oidSize {
285-
if slices.Compare(buf[k:k+oidSize], emptyOID) == 0 {
286-
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oidSize, indexFileStart/indexFileSize*indexFileSize+k/oidSize)
254+
for k := uint(0); k < (indexFileEnd-indexFileStart)*neofs.OidSize; k += neofs.OidSize {
255+
if slices.Compare(buf[k:k+neofs.OidSize], emptyOID) == 0 {
256+
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/neofs.OidSize, indexFileStart/indexFileSize*indexFileSize+k/neofs.OidSize)
287257
}
288258
}
289259
if indexFileEnd-indexFileStart == indexFileSize {
@@ -310,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
310280
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) {
311281
var (
312282
// buf is used to store OIDs of the uploaded blocks.
313-
buf = make([]byte, indexFileSize*oidSize)
283+
buf = make([]byte, indexFileSize*neofs.OidSize)
314284
doneCh = make(chan struct{})
315285
errCh = make(chan error)
316286

@@ -377,7 +347,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
377347
}
378348
pos := uint(blockIndex) % indexFileSize
379349
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
380-
id.Encode(buf[pos*oidSize:])
350+
id.Encode(buf[pos*neofs.OidSize:])
381351
}
382352
}
383353
}()
@@ -404,15 +374,15 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
404374
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
405375
// OID search is finished. Errors are sent to errCh in a non-blocking way.
406376
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
407-
var res = make(chan oid.ID, 2*searchBatchSize)
377+
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
408378
go func() {
409379
var wg sync.WaitGroup
410380
defer close(res)
411381

412-
for i := startIndex; i < endIndex; i += searchBatchSize * maxParallelSearches {
382+
for i := startIndex; i < endIndex; i += neofs.DefaultSearchBatchSize * maxParallelSearches {
413383
for j := range maxParallelSearches {
414-
start := i + j*searchBatchSize
415-
end := start + searchBatchSize
384+
start := i + j*neofs.DefaultSearchBatchSize
385+
end := start + neofs.DefaultSearchBatchSize
416386

417387
if start >= endIndex {
418388
break

pkg/config/application_config_test.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -206,20 +206,6 @@ func TestNeoFSBlockFetcherValidation(t *testing.T) {
206206
shouldFail: true,
207207
errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)",
208208
},
209-
{
210-
cfg: NeoFSBlockFetcher{
211-
InternalService: InternalService{Enabled: true},
212-
Timeout: time.Second,
213-
ContainerID: validContainerID,
214-
Addresses: []string{"127.0.0.1"},
215-
OIDBatchSize: 10,
216-
BQueueSize: 20,
217-
SkipIndexFilesSearch: false,
218-
IndexFileSize: 0,
219-
},
220-
shouldFail: true,
221-
errMsg: "IndexFileSize is not set",
222-
},
223209
}
224210

225211
for _, c := range cases {

pkg/config/blockfetcher_config.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,5 @@ func (cfg *NeoFSBlockFetcher) Validate() error {
4444
if len(cfg.Addresses) == 0 {
4545
return errors.New("addresses are not set")
4646
}
47-
if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 {
48-
return errors.New("IndexFileSize is not set")
49-
}
5047
return nil
5148
}

pkg/network/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
3030
"github.com/nspcc-dev/neo-go/pkg/network/payload"
3131
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
32+
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
3233
"github.com/nspcc-dev/neo-go/pkg/util"
3334
"go.uber.org/zap"
3435
)
@@ -224,6 +225,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
224225
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
225226

226227
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
228+
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
229+
s.NeoFSBlockFetcherCfg.BQueueSize = neofs.DefaultQueueCacheSize
230+
}
227231
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
228232
var err error
229233
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,

pkg/services/blockfetcher/blockfetcher.go

Lines changed: 23 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package blockfetcher
22

33
import (
44
"context"
5-
"crypto/sha256"
65
"errors"
76
"fmt"
87
"io"
@@ -16,7 +15,7 @@ import (
1615
"github.com/nspcc-dev/neo-go/pkg/config"
1716
"github.com/nspcc-dev/neo-go/pkg/core/block"
1817
gio "github.com/nspcc-dev/neo-go/pkg/io"
19-
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
18+
"github.com/nspcc-dev/neo-go/pkg/services/neofs"
2019
"github.com/nspcc-dev/neo-go/pkg/wallet"
2120
"github.com/nspcc-dev/neofs-sdk-go/client"
2221
"github.com/nspcc-dev/neofs-sdk-go/container"
@@ -28,43 +27,6 @@ import (
2827
"go.uber.org/zap"
2928
)
3029

31-
const (
32-
// oidSize is the size of the object ID in NeoFS.
33-
oidSize = sha256.Size
34-
// defaultTimeout is the default timeout for NeoFS requests.
35-
defaultTimeout = 5 * time.Minute
36-
// defaultOIDBatchSize is the default number of OIDs to search and fetch at once.
37-
defaultOIDBatchSize = 8000
38-
// defaultDownloaderWorkersCount is the default number of workers downloading blocks.
39-
defaultDownloaderWorkersCount = 100
40-
)
41-
42-
// Constants related to NeoFS pool request timeouts.
43-
const (
44-
// defaultDialTimeout is a default timeout used to establish connection with
45-
// NeoFS storage nodes.
46-
defaultDialTimeout = 30 * time.Second
47-
// defaultStreamTimeout is a default timeout used for NeoFS streams processing.
48-
// It has significantly large value to reliably avoid timeout problems with heavy
49-
// SEARCH requests.
50-
defaultStreamTimeout = 10 * time.Minute
51-
// defaultHealthcheckTimeout is a timeout for request to NeoFS storage node to
52-
// decide if it is alive.
53-
defaultHealthcheckTimeout = 10 * time.Second
54-
)
55-
56-
// Constants related to retry mechanism.
57-
const (
58-
// maxRetries is the maximum number of retries for a single operation.
59-
maxRetries = 5
60-
// initialBackoff is the initial backoff duration.
61-
initialBackoff = 500 * time.Millisecond
62-
// backoffFactor is the factor by which the backoff duration is multiplied.
63-
backoffFactor = 2
64-
// maxBackoff is the maximum backoff duration.
65-
maxBackoff = 20 * time.Second
66-
)
67-
6830
// Ledger is an interface to Blockchain sufficient for Service.
6931
type Ledger interface {
7032
GetConfig() config.Blockchain
@@ -143,22 +105,28 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
143105
}
144106
}
145107
if cfg.Timeout <= 0 {
146-
cfg.Timeout = defaultTimeout
108+
cfg.Timeout = neofs.DefaultTimeout
147109
}
148110
if cfg.OIDBatchSize <= 0 {
149-
cfg.OIDBatchSize = defaultOIDBatchSize
111+
cfg.OIDBatchSize = neofs.DefaultQueueCacheSize / 2
150112
}
151113
if cfg.DownloaderWorkersCount <= 0 {
152-
cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount
114+
cfg.DownloaderWorkersCount = neofs.DefaultDownloaderWorkersCount
115+
}
116+
if cfg.IndexFileSize <= 0 {
117+
cfg.IndexFileSize = neofs.DefaultIndexFileSize
118+
}
119+
if cfg.BlockAttribute == "" {
120+
cfg.BlockAttribute = neofs.DefaultBlockAttribute
153121
}
154-
if len(cfg.Addresses) == 0 {
155-
return nil, errors.New("no addresses provided")
122+
if cfg.IndexFileAttribute == "" {
123+
cfg.IndexFileAttribute = neofs.DefaultIndexFileAttribute
156124
}
157125

158126
params := pool.DefaultOptions()
159-
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
160-
params.SetNodeDialTimeout(defaultDialTimeout)
161-
params.SetNodeStreamTimeout(defaultStreamTimeout)
127+
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
128+
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
129+
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
162130
p, err := pool.New(pool.NewFlatNodeParams(cfg.Addresses), user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey), params)
163131
if err != nil {
164132
return nil, err
@@ -357,7 +325,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
357325
// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
358326
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
359327
defer rc.Close()
360-
oidBytes := make([]byte, oidSize)
328+
oidBytes := make([]byte, neofs.OidSize)
361329
oidsProcessed := 0
362330

363331
for {
@@ -397,7 +365,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
397365
func (bfs *Service) fetchOIDsBySearch() error {
398366
startIndex := bfs.chain.BlockHeight()
399367
//We need to search with EQ filter to avoid partially-completed SEARCH responses.
400-
batchSize := uint32(1)
368+
batchSize := uint32(neofs.DefaultSearchBatchSize)
401369

402370
for {
403371
select {
@@ -513,7 +481,7 @@ func (bfs *Service) IsActive() bool {
513481
func (bfs *Service) retry(action func() error) error {
514482
var (
515483
err error
516-
backoff = initialBackoff
484+
backoff = neofs.InitialBackoff
517485
timer = time.NewTimer(0)
518486
)
519487
defer func() {
@@ -525,11 +493,11 @@ func (bfs *Service) retry(action func() error) error {
525493
}
526494
}()
527495

528-
for i := range maxRetries {
496+
for i := range neofs.MaxRetries {
529497
if err = action(); err == nil {
530498
return nil
531499
}
532-
if i == maxRetries-1 {
500+
if i == neofs.MaxRetries-1 {
533501
break
534502
}
535503
timer.Reset(backoff)
@@ -539,9 +507,9 @@ func (bfs *Service) retry(action func() error) error {
539507
case <-bfs.ctx.Done():
540508
return bfs.ctx.Err()
541509
}
542-
backoff *= time.Duration(backoffFactor)
543-
if backoff > maxBackoff {
544-
backoff = maxBackoff
510+
backoff *= time.Duration(neofs.BackoffFactor)
511+
if backoff > neofs.MaxBackoff {
512+
backoff = neofs.MaxBackoff
545513
}
546514
}
547515
return err

0 commit comments

Comments
 (0)