Skip to content

Commit

Permalink
Merge pull request #13 from coinbase/patrick/better-inactive-reconcil…
Browse files Browse the repository at this point in the history
…iation

Improve Inactive Account Reconciliation
  • Loading branch information
patrick-ogrady authored Apr 30, 2020
2 parents 037ff78 + baa7907 commit 0f29df2
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 17 deletions.
1 change: 1 addition & 0 deletions cmd/check_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func runCheckAccountCmd(cmd *cobra.Command, args []string) {
ServerURL,
fetcher.WithBlockConcurrency(BlockConcurrency),
fetcher.WithTransactionConcurrency(TransactionConcurrency),
fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime),
)

primaryNetwork, _, err := fetcher.InitializeAsserter(ctx)
Expand Down
1 change: 1 addition & 0 deletions cmd/check_complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func runCheckCompleteCmd(cmd *cobra.Command, args []string) {
ServerURL,
fetcher.WithBlockConcurrency(BlockConcurrency),
fetcher.WithTransactionConcurrency(TransactionConcurrency),
fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime),
)

// TODO: sync and reconcile on subnetworks, if they exist.
Expand Down
1 change: 1 addition & 0 deletions cmd/check_quick.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func runCheckQuickCmd(cmd *cobra.Command, args []string) {
ServerURL,
fetcher.WithBlockConcurrency(BlockConcurrency),
fetcher.WithTransactionConcurrency(TransactionConcurrency),
fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime),
)

primaryNetwork, _, err := fetcher.InitializeAsserter(ctx)
Expand Down
11 changes: 11 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@ import (
"io/ioutil"
"log"
"path"
"time"

"github.com/coinbase/rosetta-cli/internal/reconciler"

"github.com/spf13/cobra"
)

const (
// ExtendedRetryElapsedTime is used to override the default fetcher
// retry elapsed time. In practice, extending the retry elapsed time
// has prevented retry exhaustion errors when many goroutines are
// used to fetch data from the Rosetta server.
//
// TODO: make configurable
ExtendedRetryElapsedTime = 5 * time.Minute
)

var (
rootCmd = &cobra.Command{
Use: "rosetta-cli",
Expand Down
69 changes: 52 additions & 17 deletions internal/reconciler/stateful_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"errors"
"fmt"
"log"
"math/rand"
"sync"
"time"

"github.com/coinbase/rosetta-cli/internal/logger"
Expand Down Expand Up @@ -66,6 +66,12 @@ const (
// inactiveReconciliationSleep is used as the time.Duration
// to sleep when there are no seen accounts to reconcile.
inactiveReconciliationSleep = 5 * time.Second

// inactiveReconciliationRequiredDepth is the minimum
// number of blocks the reconciler should wait between
// inactive reconciliations.
// TODO: make configurable
inactiveReconciliationRequiredDepth = 500
)

var (
Expand Down Expand Up @@ -106,7 +112,12 @@ type StatefulReconciler struct {

// seenAccts are stored for inactive account
// reconciliation.
seenAccts []*AccountCurrency
seenAccts []*AccountCurrency
inactiveQueue []*storage.BalanceChange

// inactiveQueueMutex needed because we can't peek at the tip
// of a channel to determine when it is ready to look at.
inactiveQueueMutex sync.Mutex
}

// NewStateful creates a new StatefulReconciler.
Expand All @@ -130,6 +141,7 @@ func NewStateful(
changeQueue: make(chan *storage.BalanceChange, backlogThreshold),
highWaterMark: -1,
seenAccts: make([]*AccountCurrency, 0),
inactiveQueue: make([]*storage.BalanceChange, 0),
}
}

Expand All @@ -152,7 +164,7 @@ func (r *StatefulReconciler) QueueChanges(
select {
case r.changeQueue <- change:
default:
log.Printf("skipping enqueue because backlog\n")
log.Println("skipping active enqueue because backlog")
}
}

Expand Down Expand Up @@ -349,10 +361,7 @@ func (r *StatefulReconciler) accountReconciliation(
return nil
}

if !inactive && !ContainsAccountCurrency(r.seenAccts, accountCurrency) {
r.seenAccts = append(r.seenAccts, accountCurrency)
}

r.inactiveAccountQueue(inactive, accountCurrency, liveBlock)
return r.logger.ReconcileSuccessStream(
ctx,
reconciliationType,
Expand All @@ -368,6 +377,29 @@ func (r *StatefulReconciler) accountReconciliation(
return nil
}

func (r *StatefulReconciler) inactiveAccountQueue(
inactive bool,
accountCurrency *AccountCurrency,
liveBlock *types.BlockIdentifier,
) {
// Only enqueue the first time we see an account on an active reconciliation.
shouldEnqueueInactive := false
if !inactive && !ContainsAccountCurrency(r.seenAccts, accountCurrency) {
r.seenAccts = append(r.seenAccts, accountCurrency)
shouldEnqueueInactive = true
}

if inactive || shouldEnqueueInactive {
r.inactiveQueueMutex.Lock()
r.inactiveQueue = append(r.inactiveQueue, &storage.BalanceChange{
Account: accountCurrency.Account,
Currency: accountCurrency.Currency,
Block: liveBlock,
})
r.inactiveQueueMutex.Unlock()
}
}

// simpleAccountCurrency returns a string that is a simple
// representation of an AccountCurrency struct.
func simpleAccountCurrency(
Expand Down Expand Up @@ -432,18 +464,20 @@ func (r *StatefulReconciler) reconcileActiveAccounts(
func (r *StatefulReconciler) reconcileInactiveAccounts(
ctx context.Context,
) error {
randSource := rand.NewSource(time.Now().UnixNano())
randGenerator := rand.New(randSource)
for ctx.Err() == nil {
if len(r.seenAccts) > 0 {
randAcct := r.seenAccts[randGenerator.Intn(len(r.seenAccts))]
txn := r.storage.NewDatabaseTransaction(ctx, false)
head, err := r.storage.GetHeadBlockIdentifier(ctx, txn)
if err != nil {
return err
}
txn.Discard(ctx)

txn := r.storage.NewDatabaseTransaction(ctx, false)
head, err := r.storage.GetHeadBlockIdentifier(ctx, txn)
if err != nil {
return err
}
txn.Discard(ctx)
r.inactiveQueueMutex.Lock()
if len(r.inactiveQueue) > 0 &&
r.inactiveQueue[0].Block.Index+inactiveReconciliationRequiredDepth < head.Index {
randAcct := r.inactiveQueue[0]
r.inactiveQueue = r.inactiveQueue[1:]
r.inactiveQueueMutex.Unlock()

block, amount, err := r.bestBalance(
ctx,
Expand All @@ -467,6 +501,7 @@ func (r *StatefulReconciler) reconcileInactiveAccounts(
return err
}
} else {
r.inactiveQueueMutex.Unlock()
time.Sleep(inactiveReconciliationSleep)
}
}
Expand Down

0 comments on commit 0f29df2

Please sign in to comment.