diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 32216fcb20..7745e33f55 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -151,8 +151,8 @@ type catchpointTracker struct { // catchpoint files even before the protocol upgrade took place. forceCatchpointFileWriting bool - // catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound and - // `lastCatchpointLabel`. + // catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound, + // lastCatchpointLabel and balancesTrie. catchpointsMu deadlock.RWMutex // cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()), @@ -555,16 +555,19 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans } var trie *merkletrie.Trie + ct.catchpointsMu.Lock() if ct.balancesTrie == nil { trie, err = merkletrie.MakeTrie(mc, trackerdb.TrieMemoryConfig) if err != nil { ct.log.Warnf("unable to create merkle trie during committedUpTo: %v", err) + ct.catchpointsMu.Unlock() return err } ct.balancesTrie = trie } else { ct.balancesTrie.SetCommitter(mc) } + ct.catchpointsMu.Unlock() treeTargetRound = dbRound + basics.Round(offset) } @@ -610,6 +613,7 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans } func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { + ct.catchpointsMu.Lock() if ct.balancesTrie != nil { _, err := ct.balancesTrie.Evict(false) if err != nil { @@ -617,7 +621,6 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit } } - ct.catchpointsMu.Lock() ct.roundDigest = ct.roundDigest[dcc.offset:] ct.consensusVersion = ct.consensusVersion[dcc.offset:] ct.cachedDBRound = dcc.newBase() @@ -986,7 +989,9 @@ func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) { // Specifically, modifications to the trie happen through accountsUpdateBalances, // which happens before commit to disk. Errors in this tracker, subsequent trackers, or the commit to disk may cause the trie cache to be incorrect, // affecting the perceived root on subsequent rounds + ct.catchpointsMu.Lock() ct.balancesTrie = nil + ct.catchpointsMu.Unlock() ct.cancelWrite(dcc) } @@ -1276,9 +1281,11 @@ func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx tracke if err != nil { return err } + ct.catchpointsMu.Lock() if ct.balancesTrie == nil { trie, trieErr := merkletrie.MakeTrie(mc, trackerdb.TrieMemoryConfig) if trieErr != nil { + ct.catchpointsMu.Unlock() return trieErr } ct.balancesTrie = trie @@ -1288,8 +1295,10 @@ func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx tracke trieBalancesHash, err := ct.balancesTrie.RootHash() if err != nil { + ct.catchpointsMu.Unlock() return err } + ct.catchpointsMu.Unlock() cw, err := tx.MakeCatchpointWriter() if err != nil { diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 19bb6a0796..172643d48c 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -27,7 +27,6 @@ import ( "runtime" "sort" "testing" - "time" "github.com/stretchr/testify/require" "golang.org/x/exp/slices" @@ -1443,30 +1442,43 @@ func benchLedgerCache(b *testing.B, startRound basics.Round) { } } -func triggerTrackerFlush(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) { +// triggerTrackerFlush is based in the commit flow but executed it in a single (this) goroutine. +func triggerTrackerFlush(t *testing.T, l *Ledger) { l.trackers.mu.Lock() - initialDbRound := l.trackers.dbRound - currentDbRound := initialDbRound - l.trackers.lastFlushTime = time.Time{} + dbRound := l.trackers.dbRound l.trackers.mu.Unlock() - const timeout = 3 * time.Second - started := time.Now() + rnd := l.Latest() + minBlock := rnd + maxLookback := basics.Round(0) + for _, lt := range l.trackers.trackers { + retainRound, lookback := lt.committedUpTo(rnd) + if retainRound < minBlock { + minBlock = retainRound + } + if lookback > maxLookback { + maxLookback = lookback + } + } - // We can't truly wait for scheduleCommit to take place, which means without waiting using sleeps - // we might beat scheduleCommit's addition to accountsWriting, making our wait on it continue immediately. - // The solution is to continue to add blocks and wait for the advancement of l.trackers.dbRound, - // which is a side effect of postCommit's success. - for currentDbRound == initialDbRound { - time.Sleep(50 * time.Microsecond) - require.True(t, time.Since(started) < timeout) - addEmptyValidatedBlock(t, l, genesisInitState.Accounts) - l.WaitForCommit(l.Latest()) - l.trackers.mu.RLock() - currentDbRound = l.trackers.dbRound - l.trackers.mu.RUnlock() + dcc := &deferredCommitContext{ + deferredCommitRange: deferredCommitRange{ + lookback: maxLookback, + }, + } + + l.trackers.mu.RLock() + cdr := l.trackers.produceCommittingTask(rnd, dbRound, &dcc.deferredCommitRange) + if cdr != nil { + dcc.deferredCommitRange = *cdr + } else { + dcc = nil + } + l.trackers.mu.RUnlock() + if dcc != nil { + l.trackers.accountsWriting.Add(1) + l.trackers.commitRound(dcc) } - l.trackers.waitAccountsWriting() } func testLedgerReload(t *testing.T, cfg config.Local) { @@ -1646,7 +1658,7 @@ func TestLedgerVerifiesOldStateProofs(t *testing.T) { backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) defer backlogPool.Shutdown() - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) l.WaitForCommit(l.Latest()) blk := createBlkWithStateproof(t, maxBlocks, proto, genesisInitState, l, accounts) _, err = l.Validate(context.Background(), blk, backlogPool) @@ -1656,7 +1668,7 @@ func TestLedgerVerifiesOldStateProofs(t *testing.T) { addDummyBlock(t, addresses, proto, l, initKeys, genesisInitState) } - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) addDummyBlock(t, addresses, proto, l, initKeys, genesisInitState) l.WaitForCommit(l.Latest()) // At this point the block queue go-routine will start removing block . However, it might not complete the task @@ -2767,11 +2779,11 @@ func verifyVotersContent(t *testing.T, expected map[basics.Round]*ledgercore.Vot func triggerDeleteVoters(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) { // We make the ledger flush tracker data to allow votersTracker to advance lowestRound - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) // We add another block to make the block queue query the voter's tracker lowest round again, which allows it to forget // rounds based on the new lowest round. - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) } func testVotersReloadFromDisk(t *testing.T, cfg config.Local) { @@ -2796,7 +2808,7 @@ func testVotersReloadFromDisk(t *testing.T, cfg config.Local) { // at this point the database should contain the voter for round 256 but the voters for round 512 should be in deltas l.WaitForCommit(l.Latest()) - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) vtSnapshot := l.acctsOnline.voters.votersForRoundCache // ensuring no tree was evicted. @@ -3028,7 +3040,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) { } l.WaitForCommit(l.Latest()) - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound), numOfStateProofs-1, proto.StateProofInterval, true, trackerDB) @@ -3037,7 +3049,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) { 1, proto.StateProofInterval, true, trackerMemory) l.WaitForCommit(l.Latest()) - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound), numOfStateProofs, proto.StateProofInterval, true, spverDBLoc) @@ -3063,7 +3075,7 @@ func TestLedgerSPVerificationTracker(t *testing.T) { } l.WaitForCommit(blk.BlockHeader.Round) - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound), 1, proto.StateProofInterval, false, spverDBLoc) @@ -3101,16 +3113,7 @@ func TestLedgerReloadStateProofVerificationTracker(t *testing.T) { // trigger trackers flush // first ensure the block is committed into blockdb l.WaitForCommit(l.Latest()) - // wait for any pending tracker flushes - l.trackers.waitAccountsWriting() - // force flush as needed - if l.LatestTrackerCommitted() < l.Latest()+basics.Round(cfg.MaxAcctLookback) { - l.trackers.mu.Lock() - l.trackers.lastFlushTime = time.Time{} - l.trackers.mu.Unlock() - l.notifyCommit(l.Latest()) - l.trackers.waitAccountsWriting() - } + triggerTrackerFlush(t, l) verifyStateProofVerificationTracking(t, &l.spVerification, basics.Round(firstStateProofContextTargetRound), numOfStateProofs-1, proto.StateProofInterval, true, trackerDB) @@ -3167,7 +3170,7 @@ func TestLedgerCatchpointSPVerificationTracker(t *testing.T) { // Feeding blocks until we can know for sure we have at least one catchpoint written. blk = feedBlocksUntilRound(t, l, blk, basics.Round(cfg.CatchpointInterval*2)) l.WaitForCommit(basics.Round(cfg.CatchpointInterval * 2)) - triggerTrackerFlush(t, l, genesisInitState) + triggerTrackerFlush(t, l) numTrackedDataFirstCatchpoint := (cfg.CatchpointInterval - proto.MaxBalLookback) / proto.StateProofInterval @@ -3244,16 +3247,7 @@ func TestLedgerSPTrackerAfterReplay(t *testing.T) { // first ensure the block is committed into blockdb l.WaitForCommit(l.Latest()) - // wait for any pending tracker flushes - l.trackers.waitAccountsWriting() - // force flush as needed - if l.LatestTrackerCommitted() < l.Latest()+basics.Round(cfg.MaxAcctLookback) { - l.trackers.mu.Lock() - l.trackers.lastFlushTime = time.Time{} - l.trackers.mu.Unlock() - l.notifyCommit(spblk.BlockHeader.Round) - l.trackers.waitAccountsWriting() - } + triggerTrackerFlush(t, l) err = l.reloadLedger() a.NoError(err)