Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Aug 26, 2024
1 parent a2a46fa commit 2f2a23f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 118 deletions.
24 changes: 15 additions & 9 deletions chain/ethhashlookup/eth_transaction_hash_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"errors"
"strconv"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -38,10 +41,9 @@ const (
type EthTxHashLookup struct {
db *sql.DB

stmtInsertTxHash *sql.Stmt
stmtGetCidFromHash *sql.Stmt
stmtGetHashFromCid *sql.Stmt
stmtDeleteOlderThan *sql.Stmt
stmtInsertTxHash *sql.Stmt
stmtGetCidFromHash *sql.Stmt
stmtGetHashFromCid *sql.Stmt
}

func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLookup, error) {
Expand Down Expand Up @@ -78,10 +80,6 @@ func (ei *EthTxHashLookup) initStatements() (err error) {
if err != nil {
return xerrors.Errorf("prepare stmtGetHashFromCid: %w", err)
}
ei.stmtDeleteOlderThan, err = ei.db.Prepare(deleteOlderThan)
if err != nil {
return xerrors.Errorf("prepare stmtDeleteOlderThan: %w", err)
}
return nil
}

Expand Down Expand Up @@ -133,10 +131,18 @@ func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) {
return 0, xerrors.New("db closed")
}

res, err := ei.stmtDeleteOlderThan.Exec("-" + strconv.Itoa(days) + " day")
epochs := abi.ChainEpoch(buildconstants.BlockDelaySecs * builtin.SecondsInDay * uint64(days))
return DeleteEntriesOlderThan(ei.db, epochs)
}

// DeleteEntriesOlderThan deletes entries older than the given number of epochs from now.
func DeleteEntriesOlderThan(db *sql.DB, epochs abi.ChainEpoch) (int64, error) {
secondsAgo := int64(epochs) * int64(buildconstants.BlockDelaySecs)
res, err := db.Exec(deleteOlderThan, "-"+strconv.FormatInt(secondsAgo, 10)+" seconds")
if err != nil {
return 0, err
}

return res.RowsAffected()
}

Expand Down
152 changes: 43 additions & 109 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

// database related constants
const (
databaseType = "sqlite"
indexesDBDir = "sqlite"
databaseDriver = "sqlite3"

eventTable = "event"
Expand Down Expand Up @@ -745,15 +745,13 @@ var pruneMsgIndexCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
srvc, err := lcli.GetFullNodeServices(cctx)
srv, err := lcli.GetFullNodeServices(cctx)
if err != nil {
return err
}
defer srv.Close() //nolint:errcheck

api := srvc.FullNodeAPI()
closer := srvc.Close

defer closer() //nolint:errcheck
api := srv.FullNodeAPI()
ctx := lcli.ReqContext(cctx)

basePath, startHeight, err := parsePruneArgs(ctx, cctx, api)
Expand Down Expand Up @@ -926,7 +924,7 @@ var pruneTxHashCmd = &cli.Command{
return xerrors.Errorf("error parsing prune args: %w", err)
}

if err := pruneTxIndex(ctx, api, basePath, startHeight); err != nil {
if err := pruneTxIndex(basePath, startHeight); err != nil {
return xerrors.Errorf("error pruning txindex: %w", err)
}

Expand Down Expand Up @@ -959,7 +957,7 @@ var pruneEventsCmd = &cli.Command{
return xerrors.Errorf("error parsing prune args: %w", err)
}

if err := pruneEventsIndex(ctx, basePath, startHeight); err != nil {
if err := pruneEventsIndex(basePath, startHeight); err != nil {
return xerrors.Errorf("error pruning events index: %w", err)
}

Expand Down Expand Up @@ -1004,15 +1002,15 @@ var pruneAllIndexesCmd = &cli.Command{
})

g.Go(func() error {
if err := pruneTxIndex(ctx, api, basePath, startHeight); err != nil {
if err := pruneTxIndex(basePath, startHeight); err != nil {
return xerrors.Errorf("error pruning txindex: %w", err)
}

return nil
})

g.Go(func() error {
if err := pruneEventsIndex(ctx, basePath, startHeight); err != nil {
if err := pruneEventsIndex(basePath, startHeight); err != nil {
return xerrors.Errorf("error pruning events index: %w", err)
}

Expand All @@ -1024,18 +1022,17 @@ var pruneAllIndexesCmd = &cli.Command{
}

// pruneEventsIndex is a helper function that prunes the events index.db for a number of epochs starting from a specified height
func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) error {
func pruneEventsIndex(basePath string, startHeight int64) error {
log.Infof("pruning events index")

dbPath := path.Join(basePath, databaseType, filter.DefaultDbFilename)
dbPath := path.Join(basePath, indexesDBDir, filter.DefaultDbFilename)
db, err := sql.Open(databaseDriver, dbPath+"?_txlock=immediate")
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
if err = db.Close(); err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()
Expand All @@ -1046,149 +1043,87 @@ func pruneEventsIndex(_ context.Context, basePath string, startHeight int64) err
}

rollback := func(tableName string) {
if err := tx.Rollback(); err != nil {
if err = tx.Rollback(); err != nil {
fmt.Printf("ERROR: rollback %s tx :%s", tableName, err)
}
}

res, err := tx.Exec(deleteEventFromStartHeight, startHeight)
log.Infof("pruning `%s` table", eventEntryTable)

res, err := tx.Exec(deleteEventEntriesByEventIDs, startHeight)
if err != nil {
rollback(eventTable)
return xerrors.Errorf("error deleting event: %w", err)
rollback(eventEntryTable)
return xerrors.Errorf("error deleting event entries: %w", err)
}

var totalRowsAffected int64
rowsAffected, err := res.RowsAffected()
eventEntryTableRowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("error getting rows affected: %w", err)
}

totalRowsAffected += rowsAffected
log.Infof("pruned %s table, rows affected: %d", eventTable, rowsAffected)
log.Infof("pruning `%s` table", eventTable)

res, err = tx.Exec(deleteEventEntriesByEventIDs, startHeight)
res, err = tx.Exec(deleteEventFromStartHeight, startHeight)
if err != nil {
rollback(eventEntryTable)
return xerrors.Errorf("error deleting event entries: %w", err)
rollback(eventTable)
return xerrors.Errorf("error deleting event: %w", err)
}

rowsAffected, err = res.RowsAffected()
eventTableRowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("error getting rows affected: %w", err)
}

totalRowsAffected += rowsAffected
log.Infof("pruned %s table, rows affected: %d", eventEntryTable, rowsAffected)
log.Infof("pruning `%s` table", eventTable)

res, err = tx.Exec(deleteEventsSeenByHeight, startHeight)
res, err = tx.Exec(deleteEventsSeenByHeight, eventsSeenTable)
if err != nil {
rollback(eventsSeenTable)
return xerrors.Errorf("error deleting events seen: %w", err)
}

rowsAffected, err = res.RowsAffected()
eventSeenTableRowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("error getting rows affected: %w", err)
}

err = tx.Commit()
if err != nil {
if err = tx.Commit(); err != nil {
return xerrors.Errorf("error committing tx: %w", err)
}

totalRowsAffected += rowsAffected
log.Infof("pruned %s table, rows affected: %d", eventsSeenTable, rowsAffected)
log.Infof("pruned %s table, rows affected: %d", eventTable, eventTableRowsAffected)

log.Infof("pruned %s table, rows affected: %d", eventEntryTable, eventEntryTableRowsAffected)

log.Infof("pruned %s table, rows affected: %d", eventsSeenTable, eventSeenTableRowsAffected)

totalRowsAffected := eventEntryTableRowsAffected + eventTableRowsAffected + eventSeenTableRowsAffected
log.Infof("Done pruning all events indexes, total rows affected: %d", totalRowsAffected)
return nil
}

// pruneTxIndex is a helper function that prunes the txindex.db for a number of epochs starting from a specified height
func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, startHeight int64) error {
func pruneTxIndex(basePath string, startHeight int64) error {
log.Infof("pruning txindex")

dbPath := path.Join(basePath, databaseType, ethhashlookup.DefaultDbFilename)
dbPath := path.Join(basePath, indexesDBDir, ethhashlookup.DefaultDbFilename)
db, err := sql.Open(databaseDriver, dbPath)
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
if err = db.Close(); err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()

deleteStmt, err := db.Prepare(deleteTxHashIndexByCID)
if err != nil {
return xerrors.Errorf("failed to prepare tx hash index delete statement: %w", err)
}

// we want to start pruning older than the specified height
startHeight = startHeight - 1

currTs, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(startHeight), types.EmptyTSK)
if err != nil {
return err
}

var totalRowsAffected int64
for currTs != nil {
select {
case <-ctx.Done():
fmt.Println("request cancelled")
return nil
default:
}

for _, blockheader := range currTs.Blocks() {
blkMsgs, err := api.ChainGetBlockMessages(ctx, blockheader.Cid())
if err != nil {
log.Infof("failed to get block messages at height: %d", currTs.Height())
continue // should we break here?
}

for _, smsg := range blkMsgs.SecpkMessages {
// we are only storing delegated messages.
// This will reduce unnecessary calls to db.
if smsg.Signature.Type != crypto.SigTypeDelegated {
continue
}

cid := smsg.Cid().String()
res, err := deleteStmt.Exec(cid)
if err != nil {
return fmt.Errorf("failed to delete tx hash index at cid: %s: %w", cid, err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}

if rowsAffected == 0 {
log.Infof("No tx hash index found at cid: %s", cid)
}

totalRowsAffected += rowsAffected
}

}

// move to the previous epoch
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return xerrors.Errorf("error walking chain: %w", err)
}
}

err = deleteStmt.Close()
rowsAffected, err := ethhashlookup.DeleteEntriesOlderThan(db, abi.ChainEpoch(startHeight))
if err != nil {
return xerrors.Errorf("error closing delete statement: %w", err)
return xerrors.Errorf("failed to delete entries: %w", err)
}

log.Infof("Done pruning db %s table %s, rows affected: %d", ethhashlookup.DefaultDbFilename, txHashIndexTable, totalRowsAffected)
log.Infof("Done pruning db %s table %s, rows affected: %d", ethhashlookup.DefaultDbFilename, txHashIndexTable, rowsAffected)

return nil
}
Expand All @@ -1197,15 +1132,14 @@ func pruneTxIndex(ctx context.Context, api lapi.FullNode, basePath string, start
func pruneMsgIndex(_ context.Context, basePath string, startHeight int64) error {
log.Infof("pruning msgindex")

dbPath := path.Join(basePath, databaseType, index.DefaultDbFilename)
dbPath := path.Join(basePath, indexesDBDir, index.DefaultDbFilename)
db, err := sql.Open(databaseDriver, dbPath)
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
if err = db.Close(); err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()
Expand All @@ -1217,13 +1151,13 @@ func pruneMsgIndex(_ context.Context, basePath string, startHeight int64) error

res, err := tx.Exec(deleteMsgIndexFromStartHeight, startHeight)
if err != nil {
if err := tx.Rollback(); err != nil {
if err = tx.Rollback(); err != nil {
fmt.Printf("ERROR: rollback %s tx :%s", msgIndexTable, err)
}
return err
}

if err := tx.Commit(); err != nil {
if err = tx.Commit(); err != nil {
return err
}

Expand Down

0 comments on commit 2f2a23f

Please sign in to comment.