Skip to content

Commit

Permalink
fix: events index: record processed epochs and tipsets for events and…
Browse files Browse the repository at this point in the history
… eth_get_log blocks till requested tipset has been indexed (#12080)

* record seen event epochs

* create correct index

* migrate to version 6

* fix typo

* test both conditions

* changes as per review

* record reverted tipsets

* see if tipsets has events and has not been reverted

* sub/unsub tipset updates from the index

* eth_get_logs should wait for events

* fix naming

* changes as per review

* solve issue with missing events

* use correct var

* changes as per review

* add unique constraint

* fix test wait

* check for events at min_height as well

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* reduce duplication

---------

Co-authored-by: Rod Vagg <rod@vagg.org>
  • Loading branch information
aarshkshah1992 and rvagg authored Jun 20, 2024
1 parent e229617 commit c87e2f2
Show file tree
Hide file tree
Showing 3 changed files with 424 additions and 12 deletions.
253 changes: 246 additions & 7 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -62,9 +63,13 @@ var ddls = []string{
value BLOB NOT NULL
)`,

createTableEventsSeen,

createIndexEventEntryIndexedKey,
createIndexEventEntryCodecValue,
createIndexEventEntryEventId,
createIndexEventsSeenHeight,
createIndexEventsSeenTipsetKeyCid,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
Expand All @@ -76,20 +81,27 @@ var ddls = []string{
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (5)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (6)`,
}

var (
log = logging.Logger("filter")
)

const (
schemaVersion = 5
schemaVersion = 6

eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?`
restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?`
getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen`
isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?`

createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
Expand All @@ -99,6 +111,17 @@ const (
createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);`
createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);`
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`

createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key_cid BLOB NOT NULL,
reverted INTEGER NOT NULL,
UNIQUE(height, tipset_key_cid)
)`

createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);`
createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);`
)

type EventIndex struct {
Expand All @@ -109,8 +132,27 @@ type EventIndex struct {
stmtInsertEntry *sql.Stmt
stmtRevertEventsInTipset *sql.Stmt
stmtRestoreEvent *sql.Stmt
stmtUpsertEventsSeen *sql.Stmt
stmtRevertEventSeen *sql.Stmt
stmtRestoreEventSeen *sql.Stmt

stmtIsTipsetProcessed *sql.Stmt
stmtGetMaxHeightInIndex *sql.Stmt
stmtIsHeightProcessed *sql.Stmt

mu sync.Mutex
subIdCounter uint64
updateSubs map[uint64]*updateSub
}

type updateSub struct {
ctx context.Context
ch chan EventIndexUpdated
cancel context.CancelFunc
}

type EventIndexUpdated struct{}

func (ei *EventIndex) initStatements() (err error) {
ei.stmtEventExists, err = ei.db.Prepare(eventExists)
if err != nil {
Expand All @@ -137,6 +179,36 @@ func (ei *EventIndex) initStatements() (err error) {
return xerrors.Errorf("prepare stmtRestoreEvent: %w", err)
}

ei.stmtUpsertEventsSeen, err = ei.db.Prepare(upsertEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtUpsertEventsSeen: %w", err)
}

ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRevertEventSeen: %w", err)
}

ei.stmtRestoreEventSeen, err = ei.db.Prepare(restoreEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err)
}

ei.stmtIsTipsetProcessed, err = ei.db.Prepare(isTipsetProcessed)
if err != nil {
return xerrors.Errorf("prepare isTipsetProcessed: %w", err)
}

ei.stmtGetMaxHeightInIndex, err = ei.db.Prepare(getMaxHeightInIndex)
if err != nil {
return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err)
}

ei.stmtIsHeightProcessed, err = ei.db.Prepare(isHeightProcessed)
if err != nil {
return xerrors.Errorf("prepare isHeightProcessed: %w", err)
}

return nil
}

Expand Down Expand Up @@ -402,9 +474,59 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error {
return xerrors.Errorf("commit transaction: %w", err)
}

log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
return nil
}

func (ei *EventIndex) migrateToVersion6(ctx context.Context) error {
now := time.Now()

tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()

stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err)
}
_, err = stmtCreateTableEventsSeen.ExecContext(ctx)
if err != nil {
return xerrors.Errorf("create table events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, createIndexEventsSeenHeight)
if err != nil {
return xerrors.Errorf("create index events_seen_height: %w", err)
}
_, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid)
if err != nil {
return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err)
}

// INSERT an entry in the events_seen table for all epochs we do have events for in our DB
_, err = tx.ExecContext(ctx, `
INSERT OR IGNORE INTO events_seen (height, tipset_key_cid, reverted)
SELECT DISTINCT height, tipset_key_cid, reverted FROM event
`)
if err != nil {
return xerrors.Errorf("insert events into events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)")
if err != nil {
return xerrors.Errorf("increment _meta version: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.vacuumDBAndCheckpointWAL(ctx)

log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now))
return nil
}

Expand Down Expand Up @@ -502,6 +624,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
version = 5
}

if version == 5 {
log.Infof("Upgrading event index from version 5 to version 6")
err = eventIndex.migrateToVersion6(ctx)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err)
}
version = 6
}

if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
Expand All @@ -514,6 +646,8 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err)
}

eventIndex.updateSubs = make(map[uint64]*updateSub)

return &eventIndex, nil
}

Expand All @@ -524,6 +658,60 @@ func (ei *EventIndex) Close() error {
return ei.db.Close()
}

func (ei *EventIndex) SubscribeUpdates() (chan EventIndexUpdated, func()) {
subCtx, subCancel := context.WithCancel(context.Background())
ch := make(chan EventIndexUpdated)

tSub := &updateSub{
ctx: subCtx,
cancel: subCancel,
ch: ch,
}

ei.mu.Lock()
subId := ei.subIdCounter
ei.subIdCounter++
ei.updateSubs[subId] = tSub
ei.mu.Unlock()

unSubscribeF := func() {
ei.mu.Lock()
tSub, ok := ei.updateSubs[subId]
if !ok {
ei.mu.Unlock()
return
}
delete(ei.updateSubs, subId)
ei.mu.Unlock()

// cancel the subscription
tSub.cancel()
}

return tSub.ch, unSubscribeF
}

func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
row := ei.stmtGetMaxHeightInIndex.QueryRowContext(ctx)
var maxHeight uint64
err := row.Scan(&maxHeight)
return maxHeight, err
}

func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) {
row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height)
var exists bool
err := row.Scan(&exists)
return exists, err
}

func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {
row := ei.stmtIsTipsetProcessed.QueryRowContext(ctx, tipsetKeyCid)
var exists bool
err := row.Scan(&exists)
return exists, err
}

func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
Expand All @@ -532,18 +720,46 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
// rollback the transaction (a no-op if the transaction was already committed)
defer func() { _ = tx.Rollback() }()

tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}

// lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return
if revert {
_, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes())
if err != nil {
return xerrors.Errorf("revert event: %w", err)
}

_, err = tx.Stmt(ei.stmtRevertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes())
if err != nil {
return xerrors.Errorf("revert event seen: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.mu.Lock()
tSubs := make([]*updateSub, 0, len(ei.updateSubs))
for _, tSub := range ei.updateSubs {
tSubs = append(tSubs, tSub)
}
ei.mu.Unlock()

for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- EventIndexUpdated{}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

Expand Down Expand Up @@ -571,11 +787,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
addressLookups[ev.Emitter] = addr
}

tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}

// check if this event already exists in the database
var entryID sql.NullInt64
err = tx.Stmt(ei.stmtEventExists).QueryRow(
Expand Down Expand Up @@ -655,11 +866,39 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}
}

// this statement will mark the tipset as processed and will insert a new row if it doesn't exist
// or update the reverted field to false if it does
_, err = tx.Stmt(ei.stmtUpsertEventsSeen).Exec(
te.msgTs.Height(),
tsKeyCid.Bytes(),
)
if err != nil {
return xerrors.Errorf("exec upsert events seen: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.mu.Lock()
tSubs := make([]*updateSub, 0, len(ei.updateSubs))
for _, tSub := range ei.updateSubs {
tSubs = append(tSubs, tSub)
}
ei.mu.Unlock()

for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- EventIndexUpdated{}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

Expand Down
Loading

0 comments on commit c87e2f2

Please sign in to comment.