Skip to content

Commit

Permalink
Implement local rate limiting and parallel fan-out for batch NFD fetc…
Browse files Browse the repository at this point in the history
…hing from csv file input
  • Loading branch information
pbennett committed Oct 9, 2024
1 parent fb53988 commit 3083bc9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
4 changes: 4 additions & 0 deletions nfd-helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"regexp"
"strconv"
"strings"
"time"

"github.com/algorand/go-algorand-sdk/v2/transaction"
Expand Down Expand Up @@ -252,6 +253,9 @@ func isRateLimited(err error) (*nfdapi.RateLimited, bool) {
if limit, match := swaggerError.Model().(nfdapi.RateLimited); match {
return &limit, true
}
if strings.Contains(string(swaggerError.Body()), "429 Too Many Requests") {
return &nfdapi.RateLimited{0, 0}, true
}
}
return nil, false
}
Expand Down
60 changes: 49 additions & 11 deletions recips.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import (
"os"
"sort"
"strings"
"time"

"github.com/algorand/go-algorand-sdk/v2/types"
"github.com/antihax/optional"
"github.com/mailgun/holster/v4/syncutil"
"github.com/ssgreg/repeat"
"golang.org/x/time/rate"

"github.com/TxnLab/batch-asset-send/lib/misc"
nfdapi "github.com/TxnLab/batch-asset-send/lib/nfdapi/swagger"
Expand Down Expand Up @@ -77,21 +81,55 @@ func getNfdsToChooseFrom(config *BatchSendConfig) ([]*nfdapi.NfdRecord, error) {
)
if config.Destination.CsvFile != "" {
// read data from the csv file determining which column contains the nfd name (with column name 'name', or 'nfd')
var csvRecords []map[string]string
var (
fanSize = 40
csvRecords []map[string]string
fanOut = syncutil.NewFanOut(fanSize)
limiter = rate.NewLimiter(rate.Every(time.Minute/3900), fanSize)
)
csvRecords, err = processCsvFile(config.Destination.CsvFile)
if err == nil {
for _, csvRecord := range csvRecords {
view := "brief"
if len(config.Destination.VerifiedRequirements) > 0 {
view = "full"
misc.Infof(logger, "..read %d NFDs from csv file", len(csvRecords))
nfdFetchChan := make(chan *nfdapi.NfdRecord, fanSize)
go func() {
for _, csvRecord := range csvRecords {
fanOut.Run(func(val any) error {
nfdName := val.(string)
view := "brief"
if len(config.Destination.VerifiedRequirements) > 0 {
view = "full"
}
var (
fetchedNfd nfdapi.NfdRecord
err error
)
err = retryNfdApiCalls(func() error {
if err := limiter.Wait(ctx); err != nil {
return repeat.HintStop(err)
}
fetchedNfd, _, err = api.NfdApi.NfdGetNFD(ctx, nfdName, &nfdapi.NfdApiNfdGetNFDOpts{
View: optional.NewString(view),
})
return err
})
if err != nil {
return fmt.Errorf("error in getNfdsToChooseFrom: failed to fetch NFD: %s from API: %w", nfdName, err)
}
nfdFetchChan <- &fetchedNfd
return nil
}, csvRecord["nfd"])
}
fetchedNfd, _, err := api.NfdApi.NfdGetNFD(ctx, csvRecord["nfd"], &nfdapi.NfdApiNfdGetNFDOpts{
View: optional.NewString(view),
})
if err != nil {
return nil, fmt.Errorf("error in getNfdsToChooseFrom: failed to fetch NFD: %s from API: %w", csvRecord["nfd"], err)
errs := fanOut.Wait()
for _, err := range errs {
logger.Error(fmt.Sprintf("error in getNfdsToChooseFrom: %v", err))
}
close(nfdFetchChan)
}()
for nfd := range nfdFetchChan {
nfdRecords = append(nfdRecords, nfd)
if len(nfdRecords)%1000 == 0 {
misc.Infof(logger, "..fetched %d NFDs", len(nfdRecords))
}
nfdRecords = append(nfdRecords, &fetchedNfd)
}
}
} else {
Expand Down

0 comments on commit 3083bc9

Please sign in to comment.