From b7dd1e3eb3c32694770c626cb8f663649e911270 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 15 Aug 2024 19:10:51 +0530 Subject: [PATCH 01/11] feat: add prune all indexes command --- cmd/lotus-shed/indexes.go | 398 +++++++++++++++++++++++++++++++++++++- 1 file changed, 391 insertions(+), 7 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 07c934d5162..a9ef296acc9 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -12,6 +12,7 @@ import ( "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -20,6 +21,9 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/ethhashlookup" + "github.com/filecoin-project/lotus/chain/events/filter" + "github.com/filecoin-project/lotus/chain/index" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -27,13 +31,33 @@ import ( const ( // same as in chain/events/index.go - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - eventCount = `SELECT COUNT(*) FROM event WHERE tipset_key_cid=?` - entryCount = `SELECT COUNT(*) FROM event_entry JOIN event ON event_entry.event_id=event.id WHERE event.tipset_key_cid=?` - insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` - insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` - upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` - tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` + eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + eventCount = `SELECT COUNT(*) FROM event WHERE tipset_key_cid=?` + entryCount = `SELECT COUNT(*) FROM event_entry JOIN event ON event_entry.event_id=event.id WHERE event.tipset_key_cid=?` + insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` + upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` + tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` + selectEventIdsFromHeight = `SELECT id, height FROM event WHERE height < ?` + selectEventIdsBetweenRange = `SELECT id, height FROM event WHERE height < ? AND height >= ?` +) + +// delete queries +const ( + deleteMsgIndexFromStartHeight = `DELETE FROM messages WHERE epoch < ?` + deleteMsgIndexBetweenRange = `DELETE FROM messages WHERE epoch < ? AND epoch >= ?` + + deleteTxHashIndexByCID = `DELETE FROM eth_tx_hashes WHERE cid = ?` + + deleteEventsByIDs = `DELETE FROM event WHERE id IN (%s)` + deleteEventEntriesByIDs = `DELETE FROM event_entry WHERE event_id IN (%s)` + deleteEventSeenByHeights = `DELETE FROM events_seen WHERE height IN (%s)` +) + +// database related constants +const ( + databaseType = "sqlite" + databaseDriver = "sqlite3" ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -51,6 +75,7 @@ var indexesCmd = &cli.Command{ withCategory("txhash", backfillTxHashCmd), withCategory("events", backfillEventsCmd), withCategory("events", inspectEventsCmd), + withCategory("indexes", pruneAllIndexesCmd), }, } @@ -879,3 +904,362 @@ var backfillTxHashCmd = &cli.Command{ return nil }, } + +// pruneAllIndexesCmd is a command that prunes all the indexes +var pruneAllIndexesCmd = &cli.Command{ + Name: "prune-all-indexes", + Usage: "Prune all the index for a number of epochs starting from a specified height", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "from", + Value: 0, + Usage: "the tipset height to start backfilling from (0 is head of chain)", + }, + &cli.IntFlag{ + Name: "epochs", + Value: 0, + Usage: "the number of epochs to prune (working backwards, 0 means prune all from the specified height)", + }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return err + } + defer srv.Close() //nolint:errcheck + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + basePath, err := homedir.Expand(cctx.String("repo")) + if err != nil { + return err + } + + startHeight := cctx.Int64("from") + if startHeight == 0 { + currTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + + startHeight += int64(currTs.Height()) + + if startHeight < 0 || startHeight == 0 { + return xerrors.Errorf("bogus start height %d", startHeight) + } + } + + epochs := cctx.Int64("epochs") + + var g = new(errgroup.Group) + + g.Go(func() error { + if err := pruneMsgIndex(ctx, basePath, startHeight, epochs); err != nil { + return xerrors.Errorf("error pruning msgindex: %w", err) + } + + return nil + }) + + g.Go(func() error { + if err := pruneTxIndex(ctx, api, basePath, startHeight, epochs); err != nil { + return xerrors.Errorf("error pruning txindex: %w", err) + } + + return nil + }) + + g.Go(func() error { + if err := pruneEventsIndex(ctx, basePath, startHeight, epochs); err != nil { + return xerrors.Errorf("error pruning events index: %w", err) + } + + return nil + }) + + return g.Wait() + }, +} + +// pruneEventsIndex is a helper function that prunes the events index.db for a number of epochs starting from a specified height +func pruneEventsIndex(_ context.Context, basePath string, startHeight, epochs int64) error { + dbPath := path.Join(basePath, databaseType, filter.DefaultDbFilename) + db, err := sql.Open(databaseDriver, dbPath+"?_txlock=immediate") + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + tx, err := db.Begin() + if err != nil { + return err + } + + var rows *sql.Rows + var res sql.Result + if epochs == 0 { + rows, err = tx.Query(selectEventIdsFromHeight, startHeight) + } else { + lowerBound := startHeight - epochs + if lowerBound < 0 { + lowerBound = 0 + } + + rows, err = tx.Query(selectEventIdsBetweenRange, startHeight, lowerBound) + } + + if err != nil { + if err := tx.Rollback(); err != nil { + fmt.Printf("ERROR: rollback: %s", err) + } + return err + } + + defer func() { + _ = rows.Close() + }() + + deleteEventQuery, deleteEventEntriesQuery, deleteEventSeenQuery, err := getEventsDBDeleteQueries(rows) + if err != nil { + return xerrors.Errorf("error getting delete query: %w", err) + } + + res, err = tx.Exec(deleteEventQuery) + if err != nil { + if err := tx.Rollback(); err != nil { + fmt.Printf("ERROR: rollback event: %s", err) + } + return err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("error getting event table rows affected: %s", err) + } + + log.Infof("pruned %s table, rows affected: %d", "event", rowsAffected) + + res, err = tx.Exec(deleteEventEntriesQuery) + if err != nil { + if err := tx.Rollback(); err != nil { + fmt.Printf("ERROR: rollback event_entries: %s", err) + } + return err + } + + rowsAffected, err = res.RowsAffected() + if err != nil { + return xerrors.Errorf("error getting event entries table rows affected: %s", err) + } + + log.Infof("pruned %s table, rows affected: %d", "event_entries", rowsAffected) + + res, err = tx.Exec(deleteEventSeenQuery) + if err != nil { + if err := tx.Rollback(); err != nil { + fmt.Printf("ERROR: rollback event_seen: %s", err) + } + return err + } + + if err := tx.Commit(); err != nil { + return err + } + + rowsAffected, err = res.RowsAffected() + if err != nil { + return xerrors.Errorf("error getting event seen table rows affected: %s", err) + } + + log.Infof("pruned %s table, rows affected: %d", "event_seen", rowsAffected) + + return nil +} + +func getEventsDBDeleteQueries(rows *sql.Rows) (string, string, string, error) { + var ( + eventIds []int64 + heights []int64 + ) + for rows.Next() { + var ( + id int64 + height int64 + ) + if err := rows.Scan(&id, &height); err != nil { + return "", "", "", xerrors.Errorf("error scanning event id: %w", err) + } + eventIds = append(eventIds, id) + heights = append(heights, height) + } + + if err := rows.Err(); err != nil { + return "", "", "", xerrors.Errorf("error iterating event ids: %w", err) + } + + if len(eventIds) == 0 { + log.Info("No events to delete") + return "", "", "", nil + } + + // Delete event entries + idPlaceholders := strings.Repeat("?,", len(eventIds)) + idPlaceholders = idPlaceholders[:len(idPlaceholders)-1] // Remove trailing comma + + deleteEventTableQuery := fmt.Sprintf(deleteEventsByIDs, idPlaceholders) + deleteEventEntriesTableQuery := fmt.Sprintf(deleteEventEntriesByIDs, idPlaceholders) + + heightPlaceholders := strings.Repeat("?,", len(heights)) + heightPlaceholders = heightPlaceholders[:len(heightPlaceholders)-1] // Remove trailing comma + + deleteEventSeenTableQuery := fmt.Sprintf(deleteEventSeenByHeights, heightPlaceholders) + + return deleteEventTableQuery, deleteEventEntriesTableQuery, deleteEventSeenTableQuery, nil +} + +// pruneTxIndex is a helper function that prunes the txindex.db for a number of epochs starting from a specified height +func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, startHeight, epochs int64) error { + dbPath := path.Join(basePath, databaseType, ethhashlookup.DefaultDbFilename) + db, err := sql.Open(databaseDriver, dbPath) + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + // we want to delete the txhash for the previous epoch + startHeight = startHeight - 1 + + deleteStmt, err := db.Prepare(deleteTxHashIndexByCID) + if err != nil { + return xerrors.Errorf("failed to prepare tx hash index delete statement: %w", err) + } + + currTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + + var totalRowsAffected int64 = 0 + for i := 0; i < int(epochs); i++ { + epoch := abi.ChainEpoch(startHeight - int64(i)) + + select { + case <-ctx.Done(): + fmt.Println("request cancelled") + return nil + default: + } + + currTsk := currTs.Parents() + execTs, err := api.ChainGetTipSet(ctx, currTsk) + if err != nil { + return fmt.Errorf("failed to call ChainGetTipSet for %s: %w", currTsk, err) + } + + for _, blockheader := range execTs.Blocks() { + blkMsgs, err := api.ChainGetBlockMessages(ctx, blockheader.Cid()) + if err != nil { + log.Infof("failed to get block messages at epoch: %d", epoch) + continue // should we break here? + } + + for _, smsg := range blkMsgs.SecpkMessages { + // we are only storing delegated messages. + // This will reduce unnecessary calls to db. + if smsg.Signature.Type != crypto.SigTypeDelegated { + continue + } + + cid := smsg.Cid().String() + res, err := deleteStmt.Exec(cid) + if err != nil { + return fmt.Errorf("failed to delete tx hash index at cid: %s: %w", cid, err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + log.Infof("No tx hash index found at cid: %s", cid) + } + + totalRowsAffected += rowsAffected + } + + } + + currTs = execTs + } + + log.Infof("Done pruning %s, rows affected: %d", ethhashlookup.DefaultDbFilename, totalRowsAffected) + + return nil +} + +// pruneMsgIndex is a helper function that prunes the msgindex.db for a number of epochs starting from a specified height +func pruneMsgIndex(_ context.Context, basePath string, startHeight int64, epochs int64) error { + dbPath := path.Join(basePath, databaseType, index.DefaultDbFilename) + db, err := sql.Open(databaseDriver, dbPath) + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + tx, err := db.Begin() + if err != nil { + return err + } + + var res sql.Result + if epochs == 0 { // if epochs is 0, delete all messages before the start height + res, err = tx.Exec(deleteMsgIndexFromStartHeight, startHeight) + } else { // if epochs is not 0, delete messages between the start height and the start height - epochs + lowerBound := startHeight - epochs + if lowerBound < 0 { // if lowerBound is negative, set it to 0 (delete all messages from the start height) + lowerBound = 0 + } + + res, err = tx.Exec(deleteMsgIndexBetweenRange, startHeight, lowerBound) + } + + if err != nil { + if err := tx.Rollback(); err != nil { + fmt.Printf("ERROR: rollback: %s", err) + } + return err + } + + if err := tx.Commit(); err != nil { + return err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("error getting rows affected: %s", err) + } + + log.Infof("Done pruning %s, rows affected: %d", index.DefaultDbFilename, rowsAffected) + + return nil +} From a11ac44a7067c3d2c6af7d6ff79d7280779ba5ce Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 16 Aug 2024 15:21:06 +0530 Subject: [PATCH 02/11] feat: remove epochs and update delete queries --- cmd/lotus-shed/indexes.go | 359 +++++++++++++++++++------------------- 1 file changed, 177 insertions(+), 182 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index a9ef296acc9..bfb51ccf478 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -31,33 +31,36 @@ import ( const ( // same as in chain/events/index.go - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - eventCount = `SELECT COUNT(*) FROM event WHERE tipset_key_cid=?` - entryCount = `SELECT COUNT(*) FROM event_entry JOIN event ON event_entry.event_id=event.id WHERE event.tipset_key_cid=?` - insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` - insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` - upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` - tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` - selectEventIdsFromHeight = `SELECT id, height FROM event WHERE height < ?` - selectEventIdsBetweenRange = `SELECT id, height FROM event WHERE height < ? AND height >= ?` + eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + eventCount = `SELECT COUNT(*) FROM event WHERE tipset_key_cid=?` + entryCount = `SELECT COUNT(*) FROM event_entry JOIN event ON event_entry.event_id=event.id WHERE event.tipset_key_cid=?` + insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` + upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` + tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` ) // delete queries const ( deleteMsgIndexFromStartHeight = `DELETE FROM messages WHERE epoch < ?` - deleteMsgIndexBetweenRange = `DELETE FROM messages WHERE epoch < ? AND epoch >= ?` deleteTxHashIndexByCID = `DELETE FROM eth_tx_hashes WHERE cid = ?` - deleteEventsByIDs = `DELETE FROM event WHERE id IN (%s)` - deleteEventEntriesByIDs = `DELETE FROM event_entry WHERE event_id IN (%s)` - deleteEventSeenByHeights = `DELETE FROM events_seen WHERE height IN (%s)` + deleteEventFromStartHeight = `DELETE FROM event WHERE height < ?;` + deleteEventEntriesByEventIDs = `DELETE FROM event_entry WHERE event_id IN (SELECT id FROM event WHERE height < ?);` + deleteEventsSeenByHeight = `DELETE FROM events_seen WHERE height < ?;` ) // database related constants const ( databaseType = "sqlite" databaseDriver = "sqlite3" + + eventTable = "event" + eventEntryTable = "event_entry" + eventsSeenTable = "events_seen" + msgIndexTable = "messages" + txHashIndexTable = "eth_tx_hashes" ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -73,8 +76,10 @@ var indexesCmd = &cli.Command{ withCategory("msgindex", backfillMsgIndexCmd), withCategory("msgindex", pruneMsgIndexCmd), withCategory("txhash", backfillTxHashCmd), + withCategory("txhash", pruneTxHashCmd), withCategory("events", backfillEventsCmd), withCategory("events", inspectEventsCmd), + withCategory("events", pruneEventsCmd), withCategory("indexes", pruneAllIndexesCmd), }, } @@ -707,11 +712,12 @@ var backfillMsgIndexCmd = &cli.Command{ var pruneMsgIndexCmd = &cli.Command{ Name: "prune-msgindex", - Usage: "Prune the msgindex.db for messages included before a given epoch", + Usage: "Prune the msgindex.db for messages older than a given height", Flags: []cli.Flag{ &cli.IntFlag{ - Name: "from", - Usage: "height to start the prune; if negative it indicates epochs from current head", + Name: "older-than", + Value: 0, + Usage: "height to start pruning events older than this epoch, 0 means start from head", }, }, Action: func(cctx *cli.Context) error { @@ -723,7 +729,7 @@ var pruneMsgIndexCmd = &cli.Command{ defer closer() ctx := lcli.ReqContext(cctx) - startHeight := int64(cctx.Int("from")) + startHeight := int64(cctx.Int("older-than")) if startHeight < 0 { curTs, err := api.ChainHead(ctx) if err != nil { @@ -742,33 +748,9 @@ var pruneMsgIndexCmd = &cli.Command{ return err } - dbPath := path.Join(basePath, "sqlite", "msgindex.db") - db, err := sql.Open("sqlite3", dbPath) + err = pruneMsgIndex(ctx, basePath, startHeight) if err != nil { - return err - } - - defer func() { - err := db.Close() - if err != nil { - fmt.Printf("ERROR: closing db: %s", err) - } - }() - - tx, err := db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec("DELETE FROM messages WHERE epoch < ?", startHeight); err != nil { - if err := tx.Rollback(); err != nil { - fmt.Printf("ERROR: rollback: %s", err) - } - return err - } - - if err := tx.Commit(); err != nil { - return err + return xerrors.Errorf("error pruning msgindex: %w", err) } return nil @@ -905,20 +887,61 @@ var backfillTxHashCmd = &cli.Command{ }, } -// pruneAllIndexesCmd is a command that prunes all the indexes -var pruneAllIndexesCmd = &cli.Command{ - Name: "prune-all-indexes", - Usage: "Prune all the index for a number of epochs starting from a specified height", +var pruneTxHashCmd = &cli.Command{ + Name: "prune-txhash", + Usage: "Prune the txhash.db for txs older than a given height", Flags: []cli.Flag{ &cli.IntFlag{ - Name: "from", + Name: "older-than", Value: 0, - Usage: "the tipset height to start backfilling from (0 is head of chain)", + Usage: "height to start pruning txhashes older than this epoch, 0 means start from head", }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return err + } + defer srv.Close() //nolint:errcheck + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + basePath, err := homedir.Expand(cctx.String("repo")) + if err != nil { + return err + } + + startHeight := cctx.Int64("older-than") + if startHeight == 0 { + currTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + + startHeight += int64(currTs.Height()) + + if startHeight == 0 { + return xerrors.Errorf("bogus start height %d", startHeight) + } + } + + if err := pruneTxIndex(ctx, api, basePath, startHeight); err != nil { + return xerrors.Errorf("error pruning txindex: %w", err) + } + + return nil + }, +} + +var pruneEventsCmd = &cli.Command{ + Name: "prune-events", + Usage: "Prune the events.db for events older than a given height", + Flags: []cli.Flag{ &cli.IntFlag{ - Name: "epochs", + Name: "older-than", Value: 0, - Usage: "the number of epochs to prune (working backwards, 0 means prune all from the specified height)", + Usage: "height to start pruning events older than this epoch, 0 means start from head", }, }, Action: func(cctx *cli.Context) error { @@ -936,7 +959,7 @@ var pruneAllIndexesCmd = &cli.Command{ return err } - startHeight := cctx.Int64("from") + startHeight := cctx.Int64("older-than") if startHeight == 0 { currTs, err := api.ChainHead(ctx) if err != nil { @@ -950,12 +973,58 @@ var pruneAllIndexesCmd = &cli.Command{ } } - epochs := cctx.Int64("epochs") + if err := pruneEventsIndex(ctx, basePath, startHeight); err != nil { + return xerrors.Errorf("error pruning events index: %w", err) + } + + return nil + }, +} + +// pruneAllIndexesCmd is a command that prunes all the indexes +var pruneAllIndexesCmd = &cli.Command{ + Name: "prune-all-indexes", + Usage: "Prune all the indexes DBs (msgindex, txhash, events) older than a given height", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "older-than", + Value: 0, + Usage: "height to start pruning events older than this epoch, 0 means start from head", + }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return err + } + defer srv.Close() //nolint:errcheck + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + basePath, err := homedir.Expand(cctx.String("repo")) + if err != nil { + return err + } + + startHeight := cctx.Int64("older-than") + if startHeight == 0 { + currTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + + startHeight += int64(currTs.Height()) + + if startHeight < 0 || startHeight == 0 { + return xerrors.Errorf("bogus start height %d", startHeight) + } + } var g = new(errgroup.Group) g.Go(func() error { - if err := pruneMsgIndex(ctx, basePath, startHeight, epochs); err != nil { + if err := pruneMsgIndex(ctx, basePath, startHeight); err != nil { return xerrors.Errorf("error pruning msgindex: %w", err) } @@ -963,7 +1032,7 @@ var pruneAllIndexesCmd = &cli.Command{ }) g.Go(func() error { - if err := pruneTxIndex(ctx, api, basePath, startHeight, epochs); err != nil { + if err := pruneTxIndex(ctx, api, basePath, startHeight); err != nil { return xerrors.Errorf("error pruning txindex: %w", err) } @@ -971,7 +1040,7 @@ var pruneAllIndexesCmd = &cli.Command{ }) g.Go(func() error { - if err := pruneEventsIndex(ctx, basePath, startHeight, epochs); err != nil { + if err := pruneEventsIndex(ctx, basePath, startHeight); err != nil { return xerrors.Errorf("error pruning events index: %w", err) } @@ -983,7 +1052,7 @@ var pruneAllIndexesCmd = &cli.Command{ } // pruneEventsIndex is a helper function that prunes the events index.db for a number of epochs starting from a specified height -func pruneEventsIndex(_ context.Context, basePath string, startHeight, epochs int64) error { +func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) error { dbPath := path.Join(basePath, databaseType, filter.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath+"?_txlock=immediate") if err != nil { @@ -1002,130 +1071,66 @@ func pruneEventsIndex(_ context.Context, basePath string, startHeight, epochs in return err } - var rows *sql.Rows - var res sql.Result - if epochs == 0 { - rows, err = tx.Query(selectEventIdsFromHeight, startHeight) - } else { - lowerBound := startHeight - epochs - if lowerBound < 0 { - lowerBound = 0 - } - - rows, err = tx.Query(selectEventIdsBetweenRange, startHeight, lowerBound) - } - - if err != nil { + rollback := func(tableName string) { if err := tx.Rollback(); err != nil { - fmt.Printf("ERROR: rollback: %s", err) + fmt.Printf("ERROR: rollback %s tx :%s", tableName, err) } - return err } - defer func() { - _ = rows.Close() - }() - - deleteEventQuery, deleteEventEntriesQuery, deleteEventSeenQuery, err := getEventsDBDeleteQueries(rows) - if err != nil { - return xerrors.Errorf("error getting delete query: %w", err) - } - - res, err = tx.Exec(deleteEventQuery) + res, err := tx.Exec(deleteEventFromStartHeight, startHeight) if err != nil { - if err := tx.Rollback(); err != nil { - fmt.Printf("ERROR: rollback event: %s", err) - } - return err + rollback(eventTable) + return xerrors.Errorf("error deleting event: %w", err) } + var totalRowsAffected int64 = 0 rowsAffected, err := res.RowsAffected() if err != nil { - return xerrors.Errorf("error getting event table rows affected: %s", err) + return xerrors.Errorf("error getting rows affected: %w", err) } - log.Infof("pruned %s table, rows affected: %d", "event", rowsAffected) + totalRowsAffected += rowsAffected + log.Infof("pruned %s table, rows affected: %d", eventTable, rowsAffected) - res, err = tx.Exec(deleteEventEntriesQuery) + res, err = tx.Exec(deleteEventEntriesByEventIDs, startHeight) if err != nil { - if err := tx.Rollback(); err != nil { - fmt.Printf("ERROR: rollback event_entries: %s", err) - } - return err + rollback(eventEntryTable) + return xerrors.Errorf("error deleting event entries: %w", err) } rowsAffected, err = res.RowsAffected() if err != nil { - return xerrors.Errorf("error getting event entries table rows affected: %s", err) + return xerrors.Errorf("error getting rows affected: %w", err) } - log.Infof("pruned %s table, rows affected: %d", "event_entries", rowsAffected) + totalRowsAffected += rowsAffected + log.Infof("pruned %s table, rows affected: %d", eventEntryTable, rowsAffected) - res, err = tx.Exec(deleteEventSeenQuery) + res, err = tx.Exec(deleteEventsSeenByHeight, startHeight) if err != nil { - if err := tx.Rollback(); err != nil { - fmt.Printf("ERROR: rollback event_seen: %s", err) - } - return err - } - - if err := tx.Commit(); err != nil { - return err + rollback(eventsSeenTable) + return xerrors.Errorf("error deleting events seen: %w", err) } rowsAffected, err = res.RowsAffected() if err != nil { - return xerrors.Errorf("error getting event seen table rows affected: %s", err) + return xerrors.Errorf("error getting rows affected: %w", err) } - log.Infof("pruned %s table, rows affected: %d", "event_seen", rowsAffected) - - return nil -} - -func getEventsDBDeleteQueries(rows *sql.Rows) (string, string, string, error) { - var ( - eventIds []int64 - heights []int64 - ) - for rows.Next() { - var ( - id int64 - height int64 - ) - if err := rows.Scan(&id, &height); err != nil { - return "", "", "", xerrors.Errorf("error scanning event id: %w", err) - } - eventIds = append(eventIds, id) - heights = append(heights, height) - } - - if err := rows.Err(); err != nil { - return "", "", "", xerrors.Errorf("error iterating event ids: %w", err) - } - - if len(eventIds) == 0 { - log.Info("No events to delete") - return "", "", "", nil + err = tx.Commit() + if err != nil { + return xerrors.Errorf("error committing tx: %w", err) } - // Delete event entries - idPlaceholders := strings.Repeat("?,", len(eventIds)) - idPlaceholders = idPlaceholders[:len(idPlaceholders)-1] // Remove trailing comma - - deleteEventTableQuery := fmt.Sprintf(deleteEventsByIDs, idPlaceholders) - deleteEventEntriesTableQuery := fmt.Sprintf(deleteEventEntriesByIDs, idPlaceholders) - - heightPlaceholders := strings.Repeat("?,", len(heights)) - heightPlaceholders = heightPlaceholders[:len(heightPlaceholders)-1] // Remove trailing comma + totalRowsAffected += rowsAffected + log.Infof("pruned %s table, rows affected: %d", eventsSeenTable, rowsAffected) - deleteEventSeenTableQuery := fmt.Sprintf(deleteEventSeenByHeights, heightPlaceholders) - - return deleteEventTableQuery, deleteEventEntriesTableQuery, deleteEventSeenTableQuery, nil + log.Infof("Done pruning all events indexes, total rows affected: %d", totalRowsAffected) + return nil } // pruneTxIndex is a helper function that prunes the txindex.db for a number of epochs starting from a specified height -func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, startHeight, epochs int64) error { +func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, startHeight int64) error { dbPath := path.Join(basePath, databaseType, ethhashlookup.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath) if err != nil { @@ -1139,23 +1144,21 @@ func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, start } }() - // we want to delete the txhash for the previous epoch - startHeight = startHeight - 1 - deleteStmt, err := db.Prepare(deleteTxHashIndexByCID) if err != nil { return xerrors.Errorf("failed to prepare tx hash index delete statement: %w", err) } - currTs, err := api.ChainHead(ctx) + // we want to start pruning older than the specified height + startHeight = startHeight - 1 + + currTs, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(startHeight), types.EmptyTSK) if err != nil { return err } var totalRowsAffected int64 = 0 - for i := 0; i < int(epochs); i++ { - epoch := abi.ChainEpoch(startHeight - int64(i)) - + for currTs != nil { select { case <-ctx.Done(): fmt.Println("request cancelled") @@ -1163,16 +1166,10 @@ func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, start default: } - currTsk := currTs.Parents() - execTs, err := api.ChainGetTipSet(ctx, currTsk) - if err != nil { - return fmt.Errorf("failed to call ChainGetTipSet for %s: %w", currTsk, err) - } - - for _, blockheader := range execTs.Blocks() { + for _, blockheader := range currTs.Blocks() { blkMsgs, err := api.ChainGetBlockMessages(ctx, blockheader.Cid()) if err != nil { - log.Infof("failed to get block messages at epoch: %d", epoch) + log.Infof("failed to get block messages at height: %d", currTs.Height()) continue // should we break here? } @@ -1203,16 +1200,25 @@ func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, start } - currTs = execTs + // move to the previous epoch + currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) + if err != nil { + return xerrors.Errorf("error walking chain: %w", err) + } + } + + err = deleteStmt.Close() + if err != nil { + return xerrors.Errorf("error closing delete statement: %w", err) } - log.Infof("Done pruning %s, rows affected: %d", ethhashlookup.DefaultDbFilename, totalRowsAffected) + log.Infof("Done pruning db %s table %s, rows affected: %d", ethhashlookup.DefaultDbFilename, txHashIndexTable, totalRowsAffected) return nil } -// pruneMsgIndex is a helper function that prunes the msgindex.db for a number of epochs starting from a specified height -func pruneMsgIndex(_ context.Context, basePath string, startHeight int64, epochs int64) error { +// pruneMsgIndex is a helper function that prunes the msgindex.db for messages older than a given height +func pruneMsgIndex(_ context.Context, basePath string, startHeight int64) error { dbPath := path.Join(basePath, databaseType, index.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath) if err != nil { @@ -1231,21 +1237,10 @@ func pruneMsgIndex(_ context.Context, basePath string, startHeight int64, epochs return err } - var res sql.Result - if epochs == 0 { // if epochs is 0, delete all messages before the start height - res, err = tx.Exec(deleteMsgIndexFromStartHeight, startHeight) - } else { // if epochs is not 0, delete messages between the start height and the start height - epochs - lowerBound := startHeight - epochs - if lowerBound < 0 { // if lowerBound is negative, set it to 0 (delete all messages from the start height) - lowerBound = 0 - } - - res, err = tx.Exec(deleteMsgIndexBetweenRange, startHeight, lowerBound) - } - + res, err := tx.Exec(deleteMsgIndexFromStartHeight, startHeight) if err != nil { if err := tx.Rollback(); err != nil { - fmt.Printf("ERROR: rollback: %s", err) + fmt.Printf("ERROR: rollback %s tx :%s", msgIndexTable, err) } return err } @@ -1256,10 +1251,10 @@ func pruneMsgIndex(_ context.Context, basePath string, startHeight int64, epochs rowsAffected, err := res.RowsAffected() if err != nil { - return xerrors.Errorf("error getting rows affected: %s", err) + return xerrors.Errorf("error getting rows affected: %w", err) } - log.Infof("Done pruning %s, rows affected: %d", index.DefaultDbFilename, rowsAffected) + log.Infof("Done pruning db %s table %s, rows affected: %d", index.DefaultDbFilename, msgIndexTable, rowsAffected) return nil } From fe483b3aea38b56973f4accc656b49036a1c633d Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 16 Aug 2024 18:52:24 +0530 Subject: [PATCH 03/11] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14d892a7f1e..e87a75add24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - https://github.com/filecoin-project/lotus/pull/12332: fix: ETH RPC: receipts: use correct txtype in receipts - https://github.com/filecoin-project/lotus/pull/12335: fix: lotus-shed: store processed tipset after backfilling events - https://github.com/filecoin-project/lotus/pull/12341: fix: miner: Fix DDO pledge math +- https://github.com/filecoin-project/lotus/pull/12393: feat(index): add lotus shed commands to prune all indexes ## ☢️ Upgrade Warnings ☢️ From 88f04cc3aed0b560086d154e66ae2e6842ba3ae0 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 16 Aug 2024 18:59:08 +0530 Subject: [PATCH 04/11] fix: ci issues --- cmd/lotus-shed/indexes.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index bfb51ccf478..8e8ed2f3a41 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -816,7 +816,7 @@ var backfillTxHashCmd = &cli.Command{ return err } - var totalRowsAffected int64 = 0 + var totalRowsAffected int64 for i := 0; i < epochs; i++ { epoch := abi.ChainEpoch(startHeight - int64(i)) @@ -1083,7 +1083,7 @@ func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) err return xerrors.Errorf("error deleting event: %w", err) } - var totalRowsAffected int64 = 0 + var totalRowsAffected int64 rowsAffected, err := res.RowsAffected() if err != nil { return xerrors.Errorf("error getting rows affected: %w", err) @@ -1157,7 +1157,7 @@ func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, start return err } - var totalRowsAffected int64 = 0 + var totalRowsAffected int64 for currTs != nil { select { case <-ctx.Done(): From 67817cc405b08e4049012ad60c5e7f3cfa798f3b Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 19 Aug 2024 19:56:04 +0530 Subject: [PATCH 05/11] address comments --- cmd/lotus-shed/indexes.go | 107 +++++++++++++++----------------------- 1 file changed, 42 insertions(+), 65 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 8e8ed2f3a41..e8df436d5d3 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -68,6 +68,30 @@ func withCategory(cat string, cmd *cli.Command) *cli.Command { return cmd } +// parsePruneArgs parses, a utility function to parse the arguments for the prune commands +func parsePruneArgs(ctx context.Context, cctx *cli.Context, api lapi.FullNode) (basePath string, startHeight int64, err error) { + basePath, err = homedir.Expand(cctx.String("repo")) + if err != nil { + return "", 0, xerrors.Errorf("failed to get base path: %w", err) + } + + startHeight = cctx.Int64("older-than") + if startHeight == 0 { + currTs, err := api.ChainHead(ctx) + if err != nil { + return "", 0, xerrors.Errorf("failed to get chain head: %w", err) + } + + startHeight += int64(currTs.Height()) + + if startHeight <= 0 { + return "", 0, xerrors.Errorf("invalid start height %d", startHeight) + } + } + + return basePath, startHeight, nil +} + var indexesCmd = &cli.Command{ Name: "indexes", Usage: "Commands related to managing sqlite indexes", @@ -721,31 +745,20 @@ var pruneMsgIndexCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, closer, err := lcli.GetFullNodeAPI(cctx) + srvc, err := lcli.GetFullNodeServices(cctx) if err != nil { return err } + api := srvc.FullNodeAPI() + closer := srvc.Close + defer closer() ctx := lcli.ReqContext(cctx) - startHeight := int64(cctx.Int("older-than")) - if startHeight < 0 { - curTs, err := api.ChainHead(ctx) - if err != nil { - return err - } - - startHeight += int64(curTs.Height()) - - if startHeight < 0 { - return xerrors.Errorf("bogus start height %d", startHeight) - } - } - - basePath, err := homedir.Expand(cctx.String("repo")) + basePath, startHeight, err := parsePruneArgs(ctx, cctx, api) if err != nil { - return err + return xerrors.Errorf("error parsing prune args: %w", err) } err = pruneMsgIndex(ctx, basePath, startHeight) @@ -907,23 +920,9 @@ var pruneTxHashCmd = &cli.Command{ api := srv.FullNodeAPI() ctx := lcli.ReqContext(cctx) - basePath, err := homedir.Expand(cctx.String("repo")) + basePath, startHeight, err := parsePruneArgs(ctx, cctx, api) if err != nil { - return err - } - - startHeight := cctx.Int64("older-than") - if startHeight == 0 { - currTs, err := api.ChainHead(ctx) - if err != nil { - return err - } - - startHeight += int64(currTs.Height()) - - if startHeight == 0 { - return xerrors.Errorf("bogus start height %d", startHeight) - } + return xerrors.Errorf("error parsing prune args: %w", err) } if err := pruneTxIndex(ctx, api, basePath, startHeight); err != nil { @@ -954,23 +953,9 @@ var pruneEventsCmd = &cli.Command{ api := srv.FullNodeAPI() ctx := lcli.ReqContext(cctx) - basePath, err := homedir.Expand(cctx.String("repo")) + basePath, startHeight, err := parsePruneArgs(ctx, cctx, api) if err != nil { - return err - } - - startHeight := cctx.Int64("older-than") - if startHeight == 0 { - currTs, err := api.ChainHead(ctx) - if err != nil { - return err - } - - startHeight += int64(currTs.Height()) - - if startHeight < 0 || startHeight == 0 { - return xerrors.Errorf("bogus start height %d", startHeight) - } + return xerrors.Errorf("error parsing prune args: %w", err) } if err := pruneEventsIndex(ctx, basePath, startHeight); err != nil { @@ -1002,23 +987,9 @@ var pruneAllIndexesCmd = &cli.Command{ api := srv.FullNodeAPI() ctx := lcli.ReqContext(cctx) - basePath, err := homedir.Expand(cctx.String("repo")) + basePath, startHeight, err := parsePruneArgs(ctx, cctx, api) if err != nil { - return err - } - - startHeight := cctx.Int64("older-than") - if startHeight == 0 { - currTs, err := api.ChainHead(ctx) - if err != nil { - return err - } - - startHeight += int64(currTs.Height()) - - if startHeight < 0 || startHeight == 0 { - return xerrors.Errorf("bogus start height %d", startHeight) - } + return xerrors.Errorf("error parsing prune args: %w", err) } var g = new(errgroup.Group) @@ -1053,6 +1024,8 @@ var pruneAllIndexesCmd = &cli.Command{ // pruneEventsIndex is a helper function that prunes the events index.db for a number of epochs starting from a specified height func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) error { + log.Infof("pruning events index") + dbPath := path.Join(basePath, databaseType, filter.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath+"?_txlock=immediate") if err != nil { @@ -1131,6 +1104,8 @@ func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) err // pruneTxIndex is a helper function that prunes the txindex.db for a number of epochs starting from a specified height func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, startHeight int64) error { + log.Infof("pruning txindex") + dbPath := path.Join(basePath, databaseType, ethhashlookup.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath) if err != nil { @@ -1219,6 +1194,8 @@ func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, start // pruneMsgIndex is a helper function that prunes the msgindex.db for messages older than a given height func pruneMsgIndex(_ context.Context, basePath string, startHeight int64) error { + log.Infof("pruning msgindex") + dbPath := path.Join(basePath, databaseType, index.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath) if err != nil { From fb1758e9556ac72da96a4f10800372f7ddb25acb Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 20 Aug 2024 16:52:39 +0530 Subject: [PATCH 06/11] fix: ci issues --- cmd/lotus-shed/indexes.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index e8df436d5d3..abf62c4e03e 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -653,7 +653,7 @@ var backfillMsgIndexCmd = &cli.Command{ return err } - defer closer() + defer closer() //nolint:errcheck ctx := lcli.ReqContext(cctx) curTs, err := api.ChainHead(ctx) @@ -753,7 +753,7 @@ var pruneMsgIndexCmd = &cli.Command{ api := srvc.FullNodeAPI() closer := srvc.Close - defer closer() + defer closer() //nolint:errcheck ctx := lcli.ReqContext(cctx) basePath, startHeight, err := parsePruneArgs(ctx, cctx, api) @@ -790,7 +790,8 @@ var backfillTxHashCmd = &cli.Command{ if err != nil { return err } - defer closer() + + defer closer() //nolint:errcheck ctx := lcli.ReqContext(cctx) From 2f2a23f85aee1223bb222d37b7e016471674d314 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 26 Aug 2024 11:48:04 +0530 Subject: [PATCH 07/11] address comments --- .../eth_transaction_hash_lookup.go | 24 +-- cmd/lotus-shed/indexes.go | 152 +++++------------- 2 files changed, 58 insertions(+), 118 deletions(-) diff --git a/chain/ethhashlookup/eth_transaction_hash_lookup.go b/chain/ethhashlookup/eth_transaction_hash_lookup.go index 2a34e37aa03..66e6a7b947d 100644 --- a/chain/ethhashlookup/eth_transaction_hash_lookup.go +++ b/chain/ethhashlookup/eth_transaction_hash_lookup.go @@ -6,6 +6,9 @@ import ( "errors" "strconv" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/ipfs/go-cid" _ "github.com/mattn/go-sqlite3" "golang.org/x/xerrors" @@ -38,10 +41,9 @@ const ( type EthTxHashLookup struct { db *sql.DB - stmtInsertTxHash *sql.Stmt - stmtGetCidFromHash *sql.Stmt - stmtGetHashFromCid *sql.Stmt - stmtDeleteOlderThan *sql.Stmt + stmtInsertTxHash *sql.Stmt + stmtGetCidFromHash *sql.Stmt + stmtGetHashFromCid *sql.Stmt } func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLookup, error) { @@ -78,10 +80,6 @@ func (ei *EthTxHashLookup) initStatements() (err error) { if err != nil { return xerrors.Errorf("prepare stmtGetHashFromCid: %w", err) } - ei.stmtDeleteOlderThan, err = ei.db.Prepare(deleteOlderThan) - if err != nil { - return xerrors.Errorf("prepare stmtDeleteOlderThan: %w", err) - } return nil } @@ -133,10 +131,18 @@ func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) { return 0, xerrors.New("db closed") } - res, err := ei.stmtDeleteOlderThan.Exec("-" + strconv.Itoa(days) + " day") + epochs := abi.ChainEpoch(buildconstants.BlockDelaySecs * builtin.SecondsInDay * uint64(days)) + return DeleteEntriesOlderThan(ei.db, epochs) +} + +// DeleteEntriesOlderThan deletes entries older than the given number of epochs from now. +func DeleteEntriesOlderThan(db *sql.DB, epochs abi.ChainEpoch) (int64, error) { + secondsAgo := int64(epochs) * int64(buildconstants.BlockDelaySecs) + res, err := db.Exec(deleteOlderThan, "-"+strconv.FormatInt(secondsAgo, 10)+" seconds") if err != nil { return 0, err } + return res.RowsAffected() } diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index abf62c4e03e..de0665b104f 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -53,7 +53,7 @@ const ( // database related constants const ( - databaseType = "sqlite" + indexesDBDir = "sqlite" databaseDriver = "sqlite3" eventTable = "event" @@ -745,15 +745,13 @@ var pruneMsgIndexCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - srvc, err := lcli.GetFullNodeServices(cctx) + srv, err := lcli.GetFullNodeServices(cctx) if err != nil { return err } + defer srv.Close() //nolint:errcheck - api := srvc.FullNodeAPI() - closer := srvc.Close - - defer closer() //nolint:errcheck + api := srv.FullNodeAPI() ctx := lcli.ReqContext(cctx) basePath, startHeight, err := parsePruneArgs(ctx, cctx, api) @@ -926,7 +924,7 @@ var pruneTxHashCmd = &cli.Command{ return xerrors.Errorf("error parsing prune args: %w", err) } - if err := pruneTxIndex(ctx, api, basePath, startHeight); err != nil { + if err := pruneTxIndex(basePath, startHeight); err != nil { return xerrors.Errorf("error pruning txindex: %w", err) } @@ -959,7 +957,7 @@ var pruneEventsCmd = &cli.Command{ return xerrors.Errorf("error parsing prune args: %w", err) } - if err := pruneEventsIndex(ctx, basePath, startHeight); err != nil { + if err := pruneEventsIndex(basePath, startHeight); err != nil { return xerrors.Errorf("error pruning events index: %w", err) } @@ -1004,7 +1002,7 @@ var pruneAllIndexesCmd = &cli.Command{ }) g.Go(func() error { - if err := pruneTxIndex(ctx, api, basePath, startHeight); err != nil { + if err := pruneTxIndex(basePath, startHeight); err != nil { return xerrors.Errorf("error pruning txindex: %w", err) } @@ -1012,7 +1010,7 @@ var pruneAllIndexesCmd = &cli.Command{ }) g.Go(func() error { - if err := pruneEventsIndex(ctx, basePath, startHeight); err != nil { + if err := pruneEventsIndex(basePath, startHeight); err != nil { return xerrors.Errorf("error pruning events index: %w", err) } @@ -1024,18 +1022,17 @@ var pruneAllIndexesCmd = &cli.Command{ } // pruneEventsIndex is a helper function that prunes the events index.db for a number of epochs starting from a specified height -func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) error { +func pruneEventsIndex(basePath string, startHeight int64) error { log.Infof("pruning events index") - dbPath := path.Join(basePath, databaseType, filter.DefaultDbFilename) + dbPath := path.Join(basePath, indexesDBDir, filter.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath+"?_txlock=immediate") if err != nil { return err } defer func() { - err := db.Close() - if err != nil { + if err = db.Close(); err != nil { fmt.Printf("ERROR: closing db: %s", err) } }() @@ -1046,149 +1043,87 @@ func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) err } rollback := func(tableName string) { - if err := tx.Rollback(); err != nil { + if err = tx.Rollback(); err != nil { fmt.Printf("ERROR: rollback %s tx :%s", tableName, err) } } - res, err := tx.Exec(deleteEventFromStartHeight, startHeight) + log.Infof("pruning `%s` table", eventEntryTable) + + res, err := tx.Exec(deleteEventEntriesByEventIDs, startHeight) if err != nil { - rollback(eventTable) - return xerrors.Errorf("error deleting event: %w", err) + rollback(eventEntryTable) + return xerrors.Errorf("error deleting event entries: %w", err) } - var totalRowsAffected int64 - rowsAffected, err := res.RowsAffected() + eventEntryTableRowsAffected, err := res.RowsAffected() if err != nil { return xerrors.Errorf("error getting rows affected: %w", err) } - totalRowsAffected += rowsAffected - log.Infof("pruned %s table, rows affected: %d", eventTable, rowsAffected) + log.Infof("pruning `%s` table", eventTable) - res, err = tx.Exec(deleteEventEntriesByEventIDs, startHeight) + res, err = tx.Exec(deleteEventFromStartHeight, startHeight) if err != nil { - rollback(eventEntryTable) - return xerrors.Errorf("error deleting event entries: %w", err) + rollback(eventTable) + return xerrors.Errorf("error deleting event: %w", err) } - rowsAffected, err = res.RowsAffected() + eventTableRowsAffected, err := res.RowsAffected() if err != nil { return xerrors.Errorf("error getting rows affected: %w", err) } - totalRowsAffected += rowsAffected - log.Infof("pruned %s table, rows affected: %d", eventEntryTable, rowsAffected) + log.Infof("pruning `%s` table", eventTable) - res, err = tx.Exec(deleteEventsSeenByHeight, startHeight) + res, err = tx.Exec(deleteEventsSeenByHeight, eventsSeenTable) if err != nil { rollback(eventsSeenTable) return xerrors.Errorf("error deleting events seen: %w", err) } - rowsAffected, err = res.RowsAffected() + eventSeenTableRowsAffected, err := res.RowsAffected() if err != nil { return xerrors.Errorf("error getting rows affected: %w", err) } - err = tx.Commit() - if err != nil { + if err = tx.Commit(); err != nil { return xerrors.Errorf("error committing tx: %w", err) } - totalRowsAffected += rowsAffected - log.Infof("pruned %s table, rows affected: %d", eventsSeenTable, rowsAffected) + log.Infof("pruned %s table, rows affected: %d", eventTable, eventTableRowsAffected) + + log.Infof("pruned %s table, rows affected: %d", eventEntryTable, eventEntryTableRowsAffected) + log.Infof("pruned %s table, rows affected: %d", eventsSeenTable, eventSeenTableRowsAffected) + + totalRowsAffected := eventEntryTableRowsAffected + eventTableRowsAffected + eventSeenTableRowsAffected log.Infof("Done pruning all events indexes, total rows affected: %d", totalRowsAffected) return nil } // pruneTxIndex is a helper function that prunes the txindex.db for a number of epochs starting from a specified height -func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, startHeight int64) error { +func pruneTxIndex(basePath string, startHeight int64) error { log.Infof("pruning txindex") - dbPath := path.Join(basePath, databaseType, ethhashlookup.DefaultDbFilename) + dbPath := path.Join(basePath, indexesDBDir, ethhashlookup.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath) if err != nil { return err } defer func() { - err := db.Close() - if err != nil { + if err = db.Close(); err != nil { fmt.Printf("ERROR: closing db: %s", err) } }() - deleteStmt, err := db.Prepare(deleteTxHashIndexByCID) - if err != nil { - return xerrors.Errorf("failed to prepare tx hash index delete statement: %w", err) - } - - // we want to start pruning older than the specified height - startHeight = startHeight - 1 - - currTs, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(startHeight), types.EmptyTSK) - if err != nil { - return err - } - - var totalRowsAffected int64 - for currTs != nil { - select { - case <-ctx.Done(): - fmt.Println("request cancelled") - return nil - default: - } - - for _, blockheader := range currTs.Blocks() { - blkMsgs, err := api.ChainGetBlockMessages(ctx, blockheader.Cid()) - if err != nil { - log.Infof("failed to get block messages at height: %d", currTs.Height()) - continue // should we break here? - } - - for _, smsg := range blkMsgs.SecpkMessages { - // we are only storing delegated messages. - // This will reduce unnecessary calls to db. - if smsg.Signature.Type != crypto.SigTypeDelegated { - continue - } - - cid := smsg.Cid().String() - res, err := deleteStmt.Exec(cid) - if err != nil { - return fmt.Errorf("failed to delete tx hash index at cid: %s: %w", cid, err) - } - - rowsAffected, err := res.RowsAffected() - if err != nil { - return fmt.Errorf("failed to get rows affected: %w", err) - } - - if rowsAffected == 0 { - log.Infof("No tx hash index found at cid: %s", cid) - } - - totalRowsAffected += rowsAffected - } - - } - - // move to the previous epoch - currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) - if err != nil { - return xerrors.Errorf("error walking chain: %w", err) - } - } - - err = deleteStmt.Close() + rowsAffected, err := ethhashlookup.DeleteEntriesOlderThan(db, abi.ChainEpoch(startHeight)) if err != nil { - return xerrors.Errorf("error closing delete statement: %w", err) + return xerrors.Errorf("failed to delete entries: %w", err) } - log.Infof("Done pruning db %s table %s, rows affected: %d", ethhashlookup.DefaultDbFilename, txHashIndexTable, totalRowsAffected) + log.Infof("Done pruning db %s table %s, rows affected: %d", ethhashlookup.DefaultDbFilename, txHashIndexTable, rowsAffected) return nil } @@ -1197,15 +1132,14 @@ func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, start func pruneMsgIndex(_ context.Context, basePath string, startHeight int64) error { log.Infof("pruning msgindex") - dbPath := path.Join(basePath, databaseType, index.DefaultDbFilename) + dbPath := path.Join(basePath, indexesDBDir, index.DefaultDbFilename) db, err := sql.Open(databaseDriver, dbPath) if err != nil { return err } defer func() { - err := db.Close() - if err != nil { + if err = db.Close(); err != nil { fmt.Printf("ERROR: closing db: %s", err) } }() @@ -1217,13 +1151,13 @@ func pruneMsgIndex(_ context.Context, basePath string, startHeight int64) error res, err := tx.Exec(deleteMsgIndexFromStartHeight, startHeight) if err != nil { - if err := tx.Rollback(); err != nil { + if err = tx.Rollback(); err != nil { fmt.Printf("ERROR: rollback %s tx :%s", msgIndexTable, err) } return err } - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { return err } From e25ebeed7713731df45c6aa214e8e026ad5771d7 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 26 Aug 2024 11:55:40 +0530 Subject: [PATCH 08/11] fix: ci issues --- chain/ethhashlookup/eth_transaction_hash_lookup.go | 6 +++--- cmd/lotus-shed/indexes.go | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/chain/ethhashlookup/eth_transaction_hash_lookup.go b/chain/ethhashlookup/eth_transaction_hash_lookup.go index 66e6a7b947d..7dd57bc737a 100644 --- a/chain/ethhashlookup/eth_transaction_hash_lookup.go +++ b/chain/ethhashlookup/eth_transaction_hash_lookup.go @@ -6,13 +6,13 @@ import ( "errors" "strconv" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/builtin" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/ipfs/go-cid" _ "github.com/mattn/go-sqlite3" "golang.org/x/xerrors" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/types/ethtypes" "github.com/filecoin-project/lotus/lib/sqlite" ) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index de0665b104f..1d4680d21b1 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -44,8 +44,6 @@ const ( const ( deleteMsgIndexFromStartHeight = `DELETE FROM messages WHERE epoch < ?` - deleteTxHashIndexByCID = `DELETE FROM eth_tx_hashes WHERE cid = ?` - deleteEventFromStartHeight = `DELETE FROM event WHERE height < ?;` deleteEventEntriesByEventIDs = `DELETE FROM event_entry WHERE event_id IN (SELECT id FROM event WHERE height < ?);` deleteEventsSeenByHeight = `DELETE FROM events_seen WHERE height < ?;` From 2d29ff4b8045b964538f7acbb015010af59d2523 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 26 Aug 2024 13:07:06 +0530 Subject: [PATCH 09/11] fix: ci issue --- chain/ethhashlookup/eth_transaction_hash_lookup.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chain/ethhashlookup/eth_transaction_hash_lookup.go b/chain/ethhashlookup/eth_transaction_hash_lookup.go index 7dd57bc737a..57d6820bb74 100644 --- a/chain/ethhashlookup/eth_transaction_hash_lookup.go +++ b/chain/ethhashlookup/eth_transaction_hash_lookup.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/types/ethtypes" "github.com/filecoin-project/lotus/lib/sqlite" From cb1c30fab4c38c7a484a0601ae348926f9617937 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 26 Aug 2024 13:43:55 +0530 Subject: [PATCH 10/11] small change --- cmd/lotus-shed/indexes.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 1d4680d21b1..dc3b666e5bb 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -757,7 +757,7 @@ var pruneMsgIndexCmd = &cli.Command{ return xerrors.Errorf("error parsing prune args: %w", err) } - err = pruneMsgIndex(ctx, basePath, startHeight) + err = pruneMsgIndex(basePath, startHeight) if err != nil { return xerrors.Errorf("error pruning msgindex: %w", err) } @@ -992,7 +992,7 @@ var pruneAllIndexesCmd = &cli.Command{ var g = new(errgroup.Group) g.Go(func() error { - if err := pruneMsgIndex(ctx, basePath, startHeight); err != nil { + if err := pruneMsgIndex(basePath, startHeight); err != nil { return xerrors.Errorf("error pruning msgindex: %w", err) } @@ -1127,7 +1127,7 @@ func pruneTxIndex(basePath string, startHeight int64) error { } // pruneMsgIndex is a helper function that prunes the msgindex.db for messages older than a given height -func pruneMsgIndex(_ context.Context, basePath string, startHeight int64) error { +func pruneMsgIndex(basePath string, startHeight int64) error { log.Infof("pruning msgindex") dbPath := path.Join(basePath, indexesDBDir, index.DefaultDbFilename) From 3519664c9e33a4c83a1f63372da2bf0c0afb1f52 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 2 Sep 2024 17:05:41 +0530 Subject: [PATCH 11/11] feat: use days for pruning tx db --- .../eth_transaction_hash_lookup.go | 12 ++---- cmd/lotus-shed/indexes.go | 40 ++++++++++++------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/chain/ethhashlookup/eth_transaction_hash_lookup.go b/chain/ethhashlookup/eth_transaction_hash_lookup.go index 57d6820bb74..a07e5d17cab 100644 --- a/chain/ethhashlookup/eth_transaction_hash_lookup.go +++ b/chain/ethhashlookup/eth_transaction_hash_lookup.go @@ -10,10 +10,6 @@ import ( _ "github.com/mattn/go-sqlite3" "golang.org/x/xerrors" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/builtin" - - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/types/ethtypes" "github.com/filecoin-project/lotus/lib/sqlite" ) @@ -132,14 +128,12 @@ func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) { return 0, xerrors.New("db closed") } - epochs := abi.ChainEpoch(buildconstants.BlockDelaySecs * builtin.SecondsInDay * uint64(days)) - return DeleteEntriesOlderThan(ei.db, epochs) + return DeleteEntriesOlderThan(ei.db, days) } // DeleteEntriesOlderThan deletes entries older than the given number of epochs from now. -func DeleteEntriesOlderThan(db *sql.DB, epochs abi.ChainEpoch) (int64, error) { - secondsAgo := int64(epochs) * int64(buildconstants.BlockDelaySecs) - res, err := db.Exec(deleteOlderThan, "-"+strconv.FormatInt(secondsAgo, 10)+" seconds") +func DeleteEntriesOlderThan(db *sql.DB, days int) (int64, error) { + res, err := db.Exec(deleteOlderThan, "-"+strconv.Itoa(days)+" day") if err != nil { return 0, err } diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index dc3b666e5bb..1d5351d3590 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -899,12 +899,11 @@ var backfillTxHashCmd = &cli.Command{ var pruneTxHashCmd = &cli.Command{ Name: "prune-txhash", - Usage: "Prune the txhash.db for txs older than a given height", + Usage: "Prune the txhash.db for txs older than provided days", Flags: []cli.Flag{ &cli.IntFlag{ - Name: "older-than", - Value: 0, - Usage: "height to start pruning txhashes older than this epoch, 0 means start from head", + Name: "older-than-days", + Usage: "prune tx hashes older than this number of days, 0 means prune all", }, }, Action: func(cctx *cli.Context) error { @@ -914,15 +913,17 @@ var pruneTxHashCmd = &cli.Command{ } defer srv.Close() //nolint:errcheck - api := srv.FullNodeAPI() - ctx := lcli.ReqContext(cctx) - - basePath, startHeight, err := parsePruneArgs(ctx, cctx, api) + basePath, err := homedir.Expand(cctx.String("repo")) if err != nil { - return xerrors.Errorf("error parsing prune args: %w", err) + return xerrors.Errorf("failed to get base path: %w", err) } - if err := pruneTxIndex(basePath, startHeight); err != nil { + days := cctx.Int("older-than-days") + if days < 0 { + return xerrors.Errorf("invalid number of days to prune: %d", days) + } + + if err := pruneTxIndex(basePath, days); err != nil { return xerrors.Errorf("error pruning txindex: %w", err) } @@ -971,7 +972,11 @@ var pruneAllIndexesCmd = &cli.Command{ &cli.IntFlag{ Name: "older-than", Value: 0, - Usage: "height to start pruning events older than this epoch, 0 means start from head", + Usage: "height to start pruning msg index and events older than this epoch, 0 means start from head", + }, + &cli.IntFlag{ + Name: "older-than-days", + Usage: "prune txhashes older than this number of days", }, }, Action: func(cctx *cli.Context) error { @@ -1000,7 +1005,12 @@ var pruneAllIndexesCmd = &cli.Command{ }) g.Go(func() error { - if err := pruneTxIndex(basePath, startHeight); err != nil { + days := cctx.Int("older-than-days") + if days <= 0 { + return xerrors.Errorf("invalid number of days to prune: %d", days) + } + + if err := pruneTxIndex(basePath, days); err != nil { return xerrors.Errorf("error pruning txindex: %w", err) } @@ -1100,8 +1110,8 @@ func pruneEventsIndex(basePath string, startHeight int64) error { return nil } -// pruneTxIndex is a helper function that prunes the txindex.db for a number of epochs starting from a specified height -func pruneTxIndex(basePath string, startHeight int64) error { +// pruneTxIndex is a helper function that prunes the txindex.db for txs older than a given number of days +func pruneTxIndex(basePath string, days int) error { log.Infof("pruning txindex") dbPath := path.Join(basePath, indexesDBDir, ethhashlookup.DefaultDbFilename) @@ -1116,7 +1126,7 @@ func pruneTxIndex(basePath string, startHeight int64) error { } }() - rowsAffected, err := ethhashlookup.DeleteEntriesOlderThan(db, abi.ChainEpoch(startHeight)) + rowsAffected, err := ethhashlookup.DeleteEntriesOlderThan(db, days) if err != nil { return xerrors.Errorf("failed to delete entries: %w", err) }