Skip to content

Commit

Permalink
catchpoints: more support for EnableOnlineAccountCatchpoints (algoran…
Browse files Browse the repository at this point in the history
  • Loading branch information
cce authored Jan 8, 2025
1 parent 0b8d9e0 commit b2d41d0
Show file tree
Hide file tree
Showing 16 changed files with 497 additions and 89 deletions.
8 changes: 8 additions & 0 deletions cmd/catchpointdump/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ var databaseCmd = &cobra.Command{
if err != nil {
reportErrorf("Unable to print state proof verification database : %v", err)
}
err = printOnlineAccounts(ledgerTrackerFilename, ledgerTrackerStaging, outFile)
if err != nil {
reportErrorf("Unable to print online accounts : %v", err)
}
err = printOnlineRoundParams(ledgerTrackerFilename, ledgerTrackerStaging, outFile)
if err != nil {
reportErrorf("Unable to print online round params : %v", err)
}
},
}

Expand Down
86 changes: 84 additions & 2 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ var fileCmd = &cobra.Command{
if err != nil {
reportErrorf("Unable to print state proof verification database : %v", err)
}
err = printOnlineAccounts("./ledger.tracker.sqlite", true, outFile)
if err != nil {
reportErrorf("Unable to print online accounts : %v", err)
}
err = printOnlineRoundParams("./ledger.tracker.sqlite", true, outFile)
if err != nil {
reportErrorf("Unable to print online round params : %v", err)
}
}
},
}
Expand Down Expand Up @@ -219,8 +227,17 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
if err != nil {
return fileHeader, err
}
fmt.Printf("accounts digest=%s, spver digest=%s, onlineaccounts digest=%s onlineroundparams digest=%s\n\n",
fmt.Printf("accounts digest=%s, spver digest=%s, onlineaccounts digest=%s onlineroundparams digest=%s\n",
balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash)

fmt.Printf("Catchpoint label: %s\n", fileHeader.Catchpoint)
// make v7 label
v7Label := ledgercore.MakeCatchpointLabelMakerV7(fileHeader.BlocksRound, &fileHeader.BlockHeaderDigest, &balanceHash, fileHeader.Totals, &spverHash)
fmt.Printf("catchpoint v7 label: %s\n", ledgercore.MakeLabel(v7Label))

// make v8 label (current)
v8Label := ledgercore.MakeCatchpointLabelMakerCurrent(fileHeader.BlocksRound, &fileHeader.BlockHeaderDigest, &balanceHash, fileHeader.Totals, &spverHash, &onlineAccountsHash, &onlineRoundParamsHash)
fmt.Printf("catchpoint v8 label: %s\n\n", ledgercore.MakeLabel(v8Label))
}
return fileHeader, nil
}
Expand Down Expand Up @@ -296,6 +313,8 @@ func printAccountsDatabase(databaseName string, stagingTables bool, fileHeader l
"Catchpoint: %s",
"Total Accounts: %d",
"Total KVs: %d",
"Total Online Accounts: %d",
"Total Online Round Params: %d",
"Total Chunks: %d",
}
var headerValues = []interface{}{
Expand All @@ -306,6 +325,8 @@ func printAccountsDatabase(databaseName string, stagingTables bool, fileHeader l
fileHeader.Catchpoint,
fileHeader.TotalAccounts,
fileHeader.TotalKVs,
fileHeader.TotalOnlineAccounts,
fileHeader.TotalOnlineRoundParams,
fileHeader.TotalChunks,
}
// safety check
Expand Down Expand Up @@ -511,7 +532,6 @@ func printKeyValue(writer *bufio.Writer, key, value []byte) {
}

func printKeyValueStore(databaseName string, stagingTables bool, outFile *os.File) error {
fmt.Printf("\n")
printDumpingCatchpointProgressLine(0, 50, 0)
lastProgressUpdate := time.Now()
progress := uint64(0)
Expand Down Expand Up @@ -560,3 +580,65 @@ func printKeyValueStore(databaseName string, stagingTables bool, outFile *os.Fil
return nil
})
}

func printOnlineAccounts(databaseName string, stagingTables bool, outFile *os.File) error {
fileWriter := bufio.NewWriterSize(outFile, 1024*1024)
defer fileWriter.Flush()

dbAccessor, err := db.MakeAccessor(databaseName, true, false)
if err != nil || dbAccessor.Handle == nil {
return err
}

return dbAccessor.Atomic(func(ctx context.Context, tx *sql.Tx) error {
rows, err := sqlitedriver.MakeOnlineAccountsIter(ctx, tx, stagingTables, 0)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
row, err := rows.GetItem()
if err != nil {
return err
}
jsonData, err := json.Marshal(row)
if err != nil {
return err
}

fmt.Fprintf(fileWriter, "onlineaccount: %s\n", string(jsonData))
}
return nil
})
}

func printOnlineRoundParams(databaseName string, stagingTables bool, outFile *os.File) error {
fileWriter := bufio.NewWriterSize(outFile, 1024*1024)
defer fileWriter.Flush()

dbAccessor, err := db.MakeAccessor(databaseName, true, false)
if err != nil || dbAccessor.Handle == nil {
return err
}

return dbAccessor.Atomic(func(ctx context.Context, tx *sql.Tx) error {
rows, err := sqlitedriver.MakeOnlineRoundParamsIter(ctx, tx, stagingTables, 0)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
row, err := rows.GetItem()
if err != nil {
return err
}
jsonData, err := json.Marshal(row)
if err != nil {
return err
}

fmt.Fprintf(fileWriter, "onlineroundparams: %s\n", string(jsonData))
}
return nil
})
}
9 changes: 8 additions & 1 deletion cmd/catchpointdump/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,14 @@ func loadAndDump(addr string, tarFile string, genesisInitState ledgercore.InitSt
if err != nil {
return err
}

err = printOnlineAccounts("./ledger.tracker.sqlite", true, outFile)
if err != nil {
return err
}
err = printOnlineRoundParams("./ledger.tracker.sqlite", true, outFile)
if err != nil {
return err
}
}
return nil
}
94 changes: 92 additions & 2 deletions ledger/acctdeltas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2750,6 +2750,27 @@ func TestAccountOnlineRoundParams(t *testing.T) {
require.Equal(t, onlineRoundParams, dbOnlineRoundParams[1:])
require.Equal(t, maxRounds, int(endRound))

// Use MakeOnlineRoundParamsIter to dump all data, starting from 10
iter, err := sqlitedriver.MakeOnlineRoundParamsIter(context.Background(), tx, false, 10)
require.NoError(t, err)
defer iter.Close()
var roundParamsIterData []ledgercore.OnlineRoundParamsData
var roundParamsIterLastRound basics.Round
for iter.Next() {
item, err := iter.GetItem()
require.NoError(t, err)

var orpData ledgercore.OnlineRoundParamsData
err = protocol.Decode(item.Data, &orpData)
require.NoError(t, err)
roundParamsIterLastRound = item.Round

roundParamsIterData = append(roundParamsIterData, orpData)
}
require.Equal(t, onlineRoundParams[9:], roundParamsIterData)
require.Equal(t, maxRounds, int(roundParamsIterLastRound))

// Prune online round params to rnd 10
err = arw.AccountsPruneOnlineRoundParams(10)
require.NoError(t, err)

Expand All @@ -2772,6 +2793,15 @@ func TestAccountOnlineRoundParams(t *testing.T) {
func TestOnlineAccountsDeletion(t *testing.T) {
partitiontest.PartitionTest(t)

t.Run("delete", func(t *testing.T) {
runTestOnlineAccountsDeletion(t, testOnlineAccountsDeletion)
})
t.Run("excludeBefore", func(t *testing.T) {
runTestOnlineAccountsDeletion(t, testOnlineAccountsExcludeBefore)
})
}

func runTestOnlineAccountsDeletion(t *testing.T, assertFunc func(*testing.T, basics.Address, basics.Address, *sql.Tx)) {
dbs, _ := storetesting.DbOpenTest(t, true)
storetesting.SetDbLogging(t, dbs)
defer dbs.Close()
Expand All @@ -2783,8 +2813,6 @@ func TestOnlineAccountsDeletion(t *testing.T) {
var accts map[basics.Address]basics.AccountData
sqlitedriver.AccountsInitTest(t, tx, accts, protocol.ConsensusCurrentVersion)

arw := sqlitedriver.NewAccountsSQLReaderWriter(tx)

updates := compactOnlineAccountDeltas{}
addrA := ledgertesting.RandomAddress()
addrB := ledgertesting.RandomAddress()
Expand Down Expand Up @@ -2837,6 +2865,12 @@ func TestOnlineAccountsDeletion(t *testing.T) {
require.NoError(t, err)
require.Len(t, updated, 5)

assertFunc(t, addrA, addrB, tx)
}

func testOnlineAccountsDeletion(t *testing.T, addrA, addrB basics.Address, tx *sql.Tx) {
arw := sqlitedriver.NewAccountsSQLReaderWriter(tx)

queries, err := sqlitedriver.OnlineAccountsInitDbQueries(tx)
require.NoError(t, err)

Expand Down Expand Up @@ -2898,6 +2932,62 @@ func TestOnlineAccountsDeletion(t *testing.T) {
}
}

// same assertions as testOnlineAccountsDeletion but with excludeBefore
func testOnlineAccountsExcludeBefore(t *testing.T, addrA, addrB basics.Address, tx *sql.Tx) {
// Use MakeOnlineAccountsIter to dump all data, starting from rnd
getAcctDataForRound := func(rnd basics.Round, expectedCount int64) map[basics.Address][]*encoded.OnlineAccountRecordV6 {
it, err := sqlitedriver.MakeOnlineAccountsIter(context.Background(), tx, false, rnd)
require.NoError(t, err)

var count int64
ret := make(map[basics.Address][]*encoded.OnlineAccountRecordV6)
for it.Next() {
acct, err := it.GetItem()
require.NoError(t, err)
ret[acct.Address] = append(ret[acct.Address], acct)
count++
}
require.Equal(t, expectedCount, count)
return ret
}

for _, rnd := range []basics.Round{1, 2, 3} {
vals := getAcctDataForRound(rnd, 5)

history, ok := vals[addrA]
require.True(t, ok)
require.Len(t, history, 3)

history, ok = vals[addrB]
require.True(t, ok)
require.Len(t, history, 2)
}

for _, rnd := range []basics.Round{4, 5, 6, 7} {
vals := getAcctDataForRound(rnd, 3)

history, ok := vals[addrA]
require.True(t, ok)
require.Len(t, history, 1)

history, ok = vals[addrB]
require.True(t, ok)
require.Len(t, history, 2)
}

for _, rnd := range []basics.Round{8, 9} {
vals := getAcctDataForRound(rnd, 2)

history, ok := vals[addrA]
require.True(t, ok)
require.Len(t, history, 1)

history, ok = vals[addrB]
require.True(t, ok)
require.Len(t, history, 1)
}
}

type mockOnlineAccountsErrorWriter struct {
}

Expand Down
49 changes: 49 additions & 0 deletions ledger/acctonline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,7 @@ func TestAcctOnlineVotersLongerHistory(t *testing.T) {
require.NoError(t, err)
require.Equal(t, oa.latest()-basics.Round(conf.MaxAcctLookback), endRound)
require.Equal(t, maxBlocks-int(lowest)-int(conf.MaxAcctLookback)+1, len(dbOnlineRoundParams))
require.Equal(t, endRound, oa.cachedDBRoundOnline)

_, err = oa.onlineTotalsEx(lowest)
require.NoError(t, err)
Expand All @@ -1324,6 +1325,54 @@ func TestAcctOnlineVotersLongerHistory(t *testing.T) {
// ensure the cache size for addrA does not have more entries than maxBalLookback + 1
// +1 comes from the deletion before X without checking account state at X
require.Equal(t, maxBalLookback+1, oa.onlineAccountsCache.accounts[addrA].Len())

// Test if "excludeBefore" argument works for MakeOnlineAccountsIter & MakeOnlineRoundParamsIter
// when longer history is being used. Exclude rows older than round=lowest+2
excludeRound := lowest + 2

// Test MakeOnlineAccountsIter
var foundCount int
err = oa.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) error {
// read staging = false, excludeBefore = excludeRound
it, err2 := tx.MakeOnlineAccountsIter(ctx, false, excludeRound)
require.NoError(t, err2)
defer it.Close()

firstSeen := make(map[basics.Address]basics.Round)
for it.Next() {
acct, acctErr := it.GetItem()
require.NoError(t, acctErr)
// We expect all rows to either:
// - have updRound >= excludeRound
// - or have updRound < excludeRound, and only appear once in the iteration (no updates since excludeRound)
if acct.UpdateRound < excludeRound {
require.NotContains(t, firstSeen, acct.Address, "MakeOnlineAccountsIter produced two rows acct %s for dbRound %d updRound %d < excludeRound %d (first seen %d)", acct.Address, endRound, acct.UpdateRound, excludeRound, firstSeen[acct.Address])
}
firstSeen[acct.Address] = acct.UpdateRound
foundCount++
}
return nil
})
require.NoError(t, err)
require.True(t, foundCount > 0, "Should see some accounts that satisfy updRound >= excludeRound")

// Test MakeOnlineRoundParamsIter
foundCount = 0
err = oa.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) error {
it, err2 := tx.MakeOnlineRoundParamsIter(ctx, false, excludeRound)
require.NoError(t, err2)
defer it.Close()

for it.Next() {
roundParams, roundParamsErr := it.GetItem()
require.NoError(t, roundParamsErr)
require.True(t, roundParams.Round >= excludeRound, "MakeOnlineRoundParamsIter produced row for round %d < excludeRound %d", roundParams.Round, excludeRound)
foundCount++
}
return nil
})
require.NoError(t, err)
require.EqualValues(t, endRound-excludeRound+1, foundCount, "Should see all round params for rounds >= excludeRound")
}

// compareTopAccounts makes sure that accounts returned from OnlineTop function are sorted and contains the online accounts on the test
Expand Down
Loading

0 comments on commit b2d41d0

Please sign in to comment.