Skip to content

Commit

Permalink
tests: rewrite triggerTrackerFlush test helper (algorand#5876)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Dec 22, 2023
1 parent 55cbb7f commit 3a80a40
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 52 deletions.
15 changes: 12 additions & 3 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ type catchpointTracker struct {
// catchpoint files even before the protocol upgrade took place.
forceCatchpointFileWriting bool

// catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound and
// `lastCatchpointLabel`.
// catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound,
// lastCatchpointLabel and balancesTrie.
catchpointsMu deadlock.RWMutex

// cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()),
Expand Down Expand Up @@ -555,16 +555,19 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans
}

var trie *merkletrie.Trie
ct.catchpointsMu.Lock()
if ct.balancesTrie == nil {
trie, err = merkletrie.MakeTrie(mc, trackerdb.TrieMemoryConfig)
if err != nil {
ct.log.Warnf("unable to create merkle trie during committedUpTo: %v", err)
ct.catchpointsMu.Unlock()
return err
}
ct.balancesTrie = trie
} else {
ct.balancesTrie.SetCommitter(mc)
}
ct.catchpointsMu.Unlock()
treeTargetRound = dbRound + basics.Round(offset)
}

Expand Down Expand Up @@ -610,14 +613,14 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans
}

func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
ct.catchpointsMu.Lock()
if ct.balancesTrie != nil {
_, err := ct.balancesTrie.Evict(false)
if err != nil {
ct.log.Warnf("merkle trie failed to evict: %v", err)
}
}

ct.catchpointsMu.Lock()
ct.roundDigest = ct.roundDigest[dcc.offset:]
ct.consensusVersion = ct.consensusVersion[dcc.offset:]
ct.cachedDBRound = dcc.newBase()
Expand Down Expand Up @@ -986,7 +989,9 @@ func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) {
// Specifically, modifications to the trie happen through accountsUpdateBalances,
// which happens before commit to disk. Errors in this tracker, subsequent trackers, or the commit to disk may cause the trie cache to be incorrect,
// affecting the perceived root on subsequent rounds
ct.catchpointsMu.Lock()
ct.balancesTrie = nil
ct.catchpointsMu.Unlock()
ct.cancelWrite(dcc)
}

Expand Down Expand Up @@ -1276,9 +1281,11 @@ func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx tracke
if err != nil {
return err
}
ct.catchpointsMu.Lock()
if ct.balancesTrie == nil {
trie, trieErr := merkletrie.MakeTrie(mc, trackerdb.TrieMemoryConfig)
if trieErr != nil {
ct.catchpointsMu.Unlock()
return trieErr
}
ct.balancesTrie = trie
Expand All @@ -1288,8 +1295,10 @@ func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx tracke

trieBalancesHash, err := ct.balancesTrie.RootHash()
if err != nil {
ct.catchpointsMu.Unlock()
return err
}
ct.catchpointsMu.Unlock()

cw, err := tx.MakeCatchpointWriter()
if err != nil {
Expand Down
92 changes: 43 additions & 49 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"runtime"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -1443,30 +1442,43 @@ func benchLedgerCache(b *testing.B, startRound basics.Round) {
}
}

func triggerTrackerFlush(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) {
// triggerTrackerFlush is based in the commit flow but executed it in a single (this) goroutine.
func triggerTrackerFlush(t *testing.T, l *Ledger) {
l.trackers.mu.Lock()
initialDbRound := l.trackers.dbRound
currentDbRound := initialDbRound
l.trackers.lastFlushTime = time.Time{}
dbRound := l.trackers.dbRound
l.trackers.mu.Unlock()

const timeout = 3 * time.Second
started := time.Now()
rnd := l.Latest()
minBlock := rnd
maxLookback := basics.Round(0)
for _, lt := range l.trackers.trackers {
retainRound, lookback := lt.committedUpTo(rnd)
if retainRound < minBlock {
minBlock = retainRound
}
if lookback > maxLookback {
maxLookback = lookback
}
}

// We can't truly wait for scheduleCommit to take place, which means without waiting using sleeps
// we might beat scheduleCommit's addition to accountsWriting, making our wait on it continue immediately.
// The solution is to continue to add blocks and wait for the advancement of l.trackers.dbRound,
// which is a side effect of postCommit's success.
for currentDbRound == initialDbRound {
time.Sleep(50 * time.Microsecond)
require.True(t, time.Since(started) < timeout)
addEmptyValidatedBlock(t, l, genesisInitState.Accounts)
l.WaitForCommit(l.Latest())
l.trackers.mu.RLock()
currentDbRound = l.trackers.dbRound
l.trackers.mu.RUnlock()
dcc := &deferredCommitContext{
deferredCommitRange: deferredCommitRange{
lookback: maxLookback,
},
}

l.trackers.mu.RLock()
cdr := l.trackers.produceCommittingTask(rnd, dbRound, &dcc.deferredCommitRange)
if cdr != nil {
dcc.deferredCommitRange = *cdr
} else {
dcc = nil
}
l.trackers.mu.RUnlock()
if dcc != nil {
l.trackers.accountsWriting.Add(1)
l.trackers.commitRound(dcc)
}
l.trackers.waitAccountsWriting()
}

func testLedgerReload(t *testing.T, cfg config.Local) {
Expand Down Expand Up @@ -1646,7 +1658,7 @@ func TestLedgerVerifiesOldStateProofs(t *testing.T) {
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
defer backlogPool.Shutdown()

triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)
l.WaitForCommit(l.Latest())
blk := createBlkWithStateproof(t, maxBlocks, proto, genesisInitState, l, accounts)
_, err = l.Validate(context.Background(), blk, backlogPool)
Expand All @@ -1656,7 +1668,7 @@ func TestLedgerVerifiesOldStateProofs(t *testing.T) {
addDummyBlock(t, addresses, proto, l, initKeys, genesisInitState)
}

triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)
addDummyBlock(t, addresses, proto, l, initKeys, genesisInitState)
l.WaitForCommit(l.Latest())
// At this point the block queue go-routine will start removing block . However, it might not complete the task
Expand Down Expand Up @@ -2767,11 +2779,11 @@ func verifyVotersContent(t *testing.T, expected map[basics.Round]*ledgercore.Vot

func triggerDeleteVoters(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) {
// We make the ledger flush tracker data to allow votersTracker to advance lowestRound
triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)

// We add another block to make the block queue query the voter's tracker lowest round again, which allows it to forget
// rounds based on the new lowest round.
triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)
}

func testVotersReloadFromDisk(t *testing.T, cfg config.Local) {
Expand All @@ -2796,7 +2808,7 @@ func testVotersReloadFromDisk(t *testing.T, cfg config.Local) {

// at this point the database should contain the voter for round 256 but the voters for round 512 should be in deltas
l.WaitForCommit(l.Latest())
triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)
vtSnapshot := l.acctsOnline.voters.votersForRoundCache

// ensuring no tree was evicted.
Expand Down Expand Up @@ -3028,7 +3040,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) {
}

l.WaitForCommit(l.Latest())
triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)

verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
numOfStateProofs-1, proto.StateProofInterval, true, trackerDB)
Expand All @@ -3037,7 +3049,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) {
1, proto.StateProofInterval, true, trackerMemory)

l.WaitForCommit(l.Latest())
triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)

verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
numOfStateProofs, proto.StateProofInterval, true, spverDBLoc)
Expand All @@ -3063,7 +3075,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) {
}

l.WaitForCommit(blk.BlockHeader.Round)
triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)

verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
1, proto.StateProofInterval, false, spverDBLoc)
Expand Down Expand Up @@ -3101,16 +3113,7 @@ func TestLedgerReloadStateProofVerificationTracker(t *testing.T) {
// trigger trackers flush
// first ensure the block is committed into blockdb
l.WaitForCommit(l.Latest())
// wait for any pending tracker flushes
l.trackers.waitAccountsWriting()
// force flush as needed
if l.LatestTrackerCommitted() < l.Latest()+basics.Round(cfg.MaxAcctLookback) {
l.trackers.mu.Lock()
l.trackers.lastFlushTime = time.Time{}
l.trackers.mu.Unlock()
l.notifyCommit(l.Latest())
l.trackers.waitAccountsWriting()
}
triggerTrackerFlush(t, l)

verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound),
numOfStateProofs-1, proto.StateProofInterval, true, trackerDB)
Expand Down Expand Up @@ -3167,7 +3170,7 @@ func TestLedgerCatchpointSPVerificationTracker(t *testing.T) {
// Feeding blocks until we can know for sure we have at least one catchpoint written.
blk = feedBlocksUntilRound(t, l, blk, basics.Round(cfg.CatchpointInterval*2))
l.WaitForCommit(basics.Round(cfg.CatchpointInterval * 2))
triggerTrackerFlush(t, l, genesisInitState)
triggerTrackerFlush(t, l)

numTrackedDataFirstCatchpoint := (cfg.CatchpointInterval - proto.MaxBalLookback) / proto.StateProofInterval

Expand Down Expand Up @@ -3244,16 +3247,7 @@ func TestLedgerSPTrackerAfterReplay(t *testing.T) {

// first ensure the block is committed into blockdb
l.WaitForCommit(l.Latest())
// wait for any pending tracker flushes
l.trackers.waitAccountsWriting()
// force flush as needed
if l.LatestTrackerCommitted() < l.Latest()+basics.Round(cfg.MaxAcctLookback) {
l.trackers.mu.Lock()
l.trackers.lastFlushTime = time.Time{}
l.trackers.mu.Unlock()
l.notifyCommit(spblk.BlockHeader.Round)
l.trackers.waitAccountsWriting()
}
triggerTrackerFlush(t, l)

err = l.reloadLedger()
a.NoError(err)
Expand Down

0 comments on commit 3a80a40

Please sign in to comment.