Skip to content

Commit e0c8beb

Browse files
authored
Merge pull request #3751 from nspcc-dev/debug-upload-bin
cli: add debug on retry in `upload-bin`
2 parents 9834b83 + 46cbfab commit e0c8beb

File tree

1 file changed

+17
-14
lines changed

1 file changed

+17
-14
lines changed

cli/util/upload_bin.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func uploadBin(ctx *cli.Context) error {
9191
var errNet error
9292
net, errNet = p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{})
9393
return errNet
94-
}, maxRetries)
94+
}, maxRetries, debug)
9595
if err != nil {
9696
return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1)
9797
}
@@ -101,7 +101,7 @@ func uploadBin(ctx *cli.Context) error {
101101
err = retry(func() error {
102102
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
103103
return err
104-
}, maxRetries)
104+
}, maxRetries, debug)
105105
if err != nil {
106106
return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1)
107107
}
@@ -121,7 +121,7 @@ func uploadBin(ctx *cli.Context) error {
121121
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
122122
}
123123
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
124-
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries)
124+
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug)
125125
if err != nil {
126126
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1)
127127
}
@@ -134,13 +134,16 @@ func uploadBin(ctx *cli.Context) error {
134134
}
135135

136136
// retry function with exponential backoff.
137-
func retry(action func() error, maxRetries uint) error {
137+
func retry(action func() error, maxRetries uint, debug bool) error {
138138
var err error
139139
backoff := neofs.InitialBackoff
140-
for range maxRetries {
140+
for i := range maxRetries {
141141
if err = action(); err == nil {
142142
return nil // Success, no retry needed.
143143
}
144+
if debug {
145+
fmt.Printf("Retry %d: %v\n", i, err)
146+
}
144147
time.Sleep(backoff) // Backoff before retrying.
145148
backoff *= time.Duration(neofs.BackoffFactor)
146149
if backoff > neofs.MaxBackoff {
@@ -185,7 +188,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
185188
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
186189
}
187190
return nil
188-
}, maxRetries)
191+
}, maxRetries, debug)
189192
if errGet != nil {
190193
select {
191194
case errCh <- errGet:
@@ -225,7 +228,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
225228
fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String())
226229
}
227230
return errUpload
228-
}, maxRetries)
231+
}, maxRetries, debug)
229232
if errRetr != nil {
230233
select {
231234
case errCh <- errRetr:
@@ -265,7 +268,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
265268
var errUpload error
266269
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs, homomorphicHashingDisabled)
267270
return errUpload
268-
}, maxRetries)
271+
}, maxRetries, debug)
269272
if err != nil {
270273
return fmt.Errorf("failed to upload index file: %w", err)
271274
}
@@ -277,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
277280
}
278281

279282
// searchIndexFile returns the ID and buffer for the next index file to be uploaded.
280-
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) {
283+
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
281284
var (
282285
// buf is used to store OIDs of the uploaded blocks.
283286
buf = make([]byte, indexFileSize*neofs.OIDSize)
@@ -292,7 +295,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
292295
// Search for existing index files.
293296
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
294297
for i := 0; ; i++ {
295-
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, errCh, filters)
298+
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters)
296299
count := 0
297300
for range indexIDs {
298301
count++
@@ -329,7 +332,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
329332
var errGet error
330333
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
331334
return errGet
332-
}, maxRetries)
335+
}, maxRetries, debug)
333336
if errRetr != nil {
334337
select {
335338
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
@@ -354,7 +357,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
354357
}
355358

356359
// Search for blocks within the index file range.
357-
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, errCh)
360+
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh)
358361
for id := range objIDs {
359362
oidCh <- id
360363
}
@@ -373,7 +376,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
373376
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
374377
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
375378
// OID search is finished. Errors are sent to errCh in a non-blocking way.
376-
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
379+
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
377380
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
378381
go func() {
379382
var wg sync.WaitGroup
@@ -413,7 +416,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, accou
413416
var errBlockSearch error
414417
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
415418
return errBlockSearch
416-
}, maxRetries)
419+
}, maxRetries, debug)
417420
if err != nil {
418421
select {
419422
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):

0 commit comments

Comments
 (0)