Skip to content

Commit

Permalink
cli: add more debug info about retry to the upload-bin
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
  • Loading branch information
AliceInHunterland committed Dec 13, 2024
1 parent 38f635b commit cd1872f
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions cli/util/upload_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func uploadBin(ctx *cli.Context) error {
err = retry(func() error {
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
return err
}, maxRetries)
}, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1)
}
Expand All @@ -121,7 +121,7 @@ func uploadBin(ctx *cli.Context) error {
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
}
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries)
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1)
}
Expand All @@ -134,13 +134,16 @@ func uploadBin(ctx *cli.Context) error {
}

// retry function with exponential backoff.
func retry(action func() error, maxRetries uint) error {
func retry(action func() error, maxRetries uint, debug bool) error {
var err error
backoff := neofs.InitialBackoff
for range maxRetries {
for i := range maxRetries {
if err = action(); err == nil {
return nil // Success, no retry needed.
}
if debug {
fmt.Printf("Retry %d: %v\n", i, err)
}
time.Sleep(backoff) // Backoff before retrying.
backoff *= time.Duration(neofs.BackoffFactor)
if backoff > neofs.MaxBackoff {
Expand Down Expand Up @@ -185,7 +188,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
}
return nil
}, maxRetries)
}, maxRetries, debug)
if errGet != nil {
select {
case errCh <- errGet:
Expand Down Expand Up @@ -225,7 +228,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String())
}
return errUpload
}, maxRetries)
}, maxRetries, debug)
if errRetr != nil {
select {
case errCh <- errRetr:
Expand Down Expand Up @@ -265,7 +268,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
var errUpload error
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs, homomorphicHashingDisabled)
return errUpload
}, maxRetries)
}, maxRetries, debug)
if err != nil {
return fmt.Errorf("failed to upload index file: %w", err)
}
Expand All @@ -277,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
}

// searchIndexFile returns the ID and buffer for the next index file to be uploaded.
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) {
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
var (
// buf is used to store OIDs of the uploaded blocks.
buf = make([]byte, indexFileSize*neofs.OIDSize)
Expand All @@ -292,7 +295,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
// Search for existing index files.
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
for i := 0; ; i++ {
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, errCh, filters)
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters)
count := 0
for range indexIDs {
count++
Expand Down Expand Up @@ -329,7 +332,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
var errGet error
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
return errGet
}, maxRetries)
}, maxRetries, debug)
if errRetr != nil {
select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
Expand All @@ -354,7 +357,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
}

// Search for blocks within the index file range.
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, errCh)
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh)
for id := range objIDs {
oidCh <- id
}
Expand All @@ -373,7 +376,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
go func() {
var wg sync.WaitGroup
Expand Down Expand Up @@ -413,7 +416,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, accou
var errBlockSearch error
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
return errBlockSearch
}, maxRetries)
}, maxRetries, debug)
if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):
Expand Down

0 comments on commit cd1872f

Please sign in to comment.