Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eth): check events are indexed within in requested range #12808

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ The Lotus v1.31.0 release introduces the new `ChainIndexer` subsystem, enhancing
- Make the ordering of event output for `eth_` APIs and `GetActorEventsRaw` consistent, sorting ascending on: epoch, message index, event index and original event entry order. ([filecoin-project/lotus#12623](https://github.com/filecoin-project/lotus/pull/12623))
- Return a consistent error when encountering null rounds in ETH RPC method calls. ([filecoin-project/lotus#12655](https://github.com/filecoin-project/lotus/pull/12655))
- Correct erroneous sector QAP-calculation upon sector extension in lotus-miner cli. ([filecoin-project/lotus#12720](https://github.com/filecoin-project/lotus/pull/12720))
- Return error if logs or events within range are not indexed. ([filecoin-project/lotus#12728](https://github.com/filecoin-project/lotus/pull/12728))


## 📝 Changelog

Expand Down
1 change: 0 additions & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we removed it as not needed anywhere.

&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
Expand Down
119 changes: 103 additions & 16 deletions chain/index/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

var ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
var (
ErrMaxResultsReached = xerrors.New("filter matches too many events, try a more restricted filter")
ErrRangeInFuture = xerrors.New("range end is in the future")
)

const maxLookBackForWait = 120 // one hour of tipsets

Expand Down Expand Up @@ -236,13 +239,18 @@ func loadExecutedMessages(ctx context.Context, cs ChainStore, recomputeTipSetSta
return ems, nil
}

// checkTipsetIndexedStatus verifies if a specific tipset is indexed based on the EventFilter.
// It returns nil if the tipset is indexed, ErrNotFound if it's not indexed or not specified,
func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, f *EventFilter) error {
// checkFilterTipsetsIndexed verifies if a tipset, or a range of tipsets, specified by a given
// filter is indexed. It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkFilterTipsetsIndexed(ctx context.Context, f *EventFilter) error {
// Three cases to consider:
// 1. Specific tipset is provided
// 2. Single tipset is specified by the height range (min=max)
// 3. Range of tipsets is specified by the height range (min!=max)
// We'll handle the first two cases here and the third case in checkRangeIndexedStatus

var tipsetKeyCid []byte
var err error

// Determine the tipset to check based on the filter
switch {
case f.TipsetCid != cid.Undef:
tipsetKeyCid = f.TipsetCid.Bytes()
Expand All @@ -253,16 +261,10 @@ func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, f *EventF
// this means that this is a null round and there exist no events for this epoch
return nil
}

return xerrors.Errorf("failed to get tipset key cid by height: %w", err)
}
default:
// This function distinguishes between two scenarios:
// 1. Missing events: The requested tipset is not present in the Index (an error condition).
// 2. Valid case: The tipset exists but contains no events (a normal situation).
// Currently, this distinction is only made for the common use case where a user requests events for a single tipset.
// TODO: Implement this functionality for a range of tipsets. This is expensive and not a common use case so it's deferred for now.
return nil
return si.checkRangeIndexedStatus(ctx, f.MinHeight, f.MaxHeight)
}

// If we couldn't determine a specific tipset, return ErrNotFound
Expand All @@ -280,7 +282,92 @@ func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, f *EventF
return ErrNotFound // Tipset is not indexed
}

// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height.
// checkRangeIndexedStatus verifies if a range of tipsets specified by the given height range is
// indexed. It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch) error {
head := si.cs.GetHeaviestTipSet()
if minHeight > head.Height() || maxHeight > head.Height() {
return ErrRangeInFuture
Copy link
Contributor

@aarshkshah1992 aarshkshah1992 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to have the error mention the heaviest height we have in our state -> will make sync problems visible here.

}

// Find the first non-null round in the range
startCid, startHeight, err := si.findFirstNonNullRound(ctx, minHeight, maxHeight)
if err != nil {
return xerrors.Errorf("failed to find first non-null round: %w", err)
}
// If all rounds are null, consider the range valid
if startCid == nil {
return nil
}

// Find the last non-null round in the range
endCid, endHeight, err := si.findLastNonNullRound(ctx, maxHeight, minHeight)
if err != nil {
return xerrors.Errorf("failed to find last non-null round: %w", err)
}
// If all rounds are null, consider the range valid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is wrong

if endCid == nil {
return xerrors.Errorf("unexpected error finding last non-null round: all rounds are null but start round is not (%d to %d)", minHeight, maxHeight)
}

// Check indexing status for start and end tipsets
if err := si.checkTipsetIndexedStatus(ctx, startCid, startHeight); err != nil {
return err
}
if err := si.checkTipsetIndexedStatus(ctx, endCid, endHeight); err != nil {
return err
}
// Assume (not necessarily correctly, but likely) that all tipsets within the range are indexed

return nil
}

func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
if err != nil {
return xerrors.Errorf("failed to check if tipset at epoch %d is indexed: %w", height, err)
} else if exists {
return nil // has been indexed
}
return ErrNotFound
}

// findFirstNonNullRound finds the first non-null round starting from minHeight up to maxHeight.
// It updates the minHeight to the found height and returns the tipset key CID.
func (si *SqliteIndexer) findFirstNonNullRound(ctx context.Context, minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch) ([]byte, abi.ChainEpoch, error) {
for height := minHeight; height <= maxHeight; height++ {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err != nil {
if !errors.Is(err, ErrNotFound) {
return nil, 0, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
// else null round, keep searching
continue
}
minHeight = height // Update the minHeight to the found height
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to update minHeight to height here ? Can't we just return height ? IIUC, we're not passing a pointer to this function and so not really updating the original minHeight param

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha I see why. This is to zero down to range of non-null epochs. Yeah makes sense.

return cid, minHeight, nil
}
// All rounds are null
return nil, 0, nil
}

// findLastNonNullRound finds the last non-null round starting from maxHeight down to minHeight
func (si *SqliteIndexer) findLastNonNullRound(ctx context.Context, maxHeight abi.ChainEpoch, minHeight abi.ChainEpoch) ([]byte, abi.ChainEpoch, error) {
for height := maxHeight; height >= minHeight; height-- {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
maxHeight = height // Update the maxHeight to the found height
return cid, maxHeight, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, 0, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
}

return nil, 0, nil
}

// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height from the ChainStore
func (si *SqliteIndexer) getTipsetKeyCidByHeight(ctx context.Context, height abi.ChainEpoch) ([]byte, error) {
ts, err := si.cs.GetTipsetByHeight(ctx, height, nil, false)
if err != nil {
Expand Down Expand Up @@ -452,15 +539,15 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
if height > 0 {
head := si.cs.GetHeaviestTipSet()
if head == nil {
return nil, errors.New("failed to get head: head is nil")
return nil, xerrors.New("failed to get head: head is nil")
}
headHeight := head.Height()
maxLookBackHeight := headHeight - maxLookBackForWait

// if the height is old enough, we'll assume the index is caught up to it and not bother
// waiting for it to be indexed
if height <= maxLookBackHeight {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkFilterTipsetsIndexed(ctx, f)
}
}

Expand All @@ -474,7 +561,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
}

if len(ces) == 0 {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkFilterTipsetsIndexed(ctx, f)
}
}

Expand Down
39 changes: 29 additions & 10 deletions chain/index/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {
si, _, cs := setupWithHeadIndexed(t, headHeight, rng)
t.Cleanup(func() { _ = si.Close() })

// Create a fake tipset at height 1
fakeTipSet1 := fakeTipSet(t, rng, 1, nil)

// Set the dummy chainstore to return this tipset for height 1
cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // empty DB
cs.SetTipSetByCid(t, fakeTipSet1)
// Create a fake tipset at various heights used in the test
fakeTipsets := make(map[abi.ChainEpoch]*types.TipSet)
for _, ts := range []abi.ChainEpoch{1, 10, 20} {
fakeTipsets[ts] = fakeTipSet(t, rng, ts, nil)
cs.SetTipsetByHeightAndKey(ts, fakeTipsets[ts].Key(), fakeTipsets[ts])
cs.SetTipSetByCid(t, fakeTipsets[ts])
}

// tipset is not indexed
f := &EventFilter{
Expand All @@ -46,7 +47,7 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {
require.True(t, errors.Is(err, ErrNotFound))
require.Equal(t, 0, len(ces))

tsCid, err := fakeTipSet1.Key().Cid()
tsCid, err := fakeTipsets[1].Key().Cid()
require.NoError(t, err)
f = &EventFilter{
TipsetCid: tsCid,
Expand All @@ -58,7 +59,7 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {

// tipset is indexed but has no events
err = withTx(ctx, si.db, func(tx *sql.Tx) error {
return si.indexTipset(ctx, tx, fakeTipSet1)
return si.indexTipset(ctx, tx, fakeTipsets[1])
})
require.NoError(t, err)

Expand All @@ -73,13 +74,31 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(ces))

// search for a range that is absent
// search for a range that is not indexed
f = &EventFilter{
MinHeight: 10,
MaxHeight: 20,
}
ces, err = si.GetEventsForFilter(ctx, f)
require.ErrorIs(t, err, ErrNotFound)
require.Equal(t, 0, len(ces))

// search for a range (end) that is in the future
f = &EventFilter{
MinHeight: 10,
MaxHeight: 200,
}
ces, err = si.GetEventsForFilter(ctx, f)
require.ErrorIs(t, err, ErrRangeInFuture)
require.Equal(t, 0, len(ces))

// search for a range (start too) that is in the future
f = &EventFilter{
MinHeight: 100,
MaxHeight: 200,
}
ces, err = si.GetEventsForFilter(ctx, f)
require.NoError(t, err)
require.ErrorIs(t, err, ErrRangeInFuture)
require.Equal(t, 0, len(ces))
}

Expand Down
Loading