Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(monitor/indexer): track number of fuzzy overrides per stream #1808

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/xchain/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func (c ConfLevel) Valid() bool {

// IsFuzzy returns true if this confirmation level is not ConfFinalized.
func (c ConfLevel) IsFuzzy() bool {
return c == ConfLatest
return !c.IsFinalized()
}

// IsFinalized returns true if this confirmation level is ConfFinalized.
func (c ConfLevel) IsFinalized() bool {
return c == ConfFinalized
}

// Label returns a short label for the confirmation level.
Expand Down
2 changes: 1 addition & 1 deletion lib/xchain/types_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func TestConfLevelFuzzy(t *testing.T) {
t.Parallel()
var fuzzies []ConfLevel
for conf := ConfUnknown; conf < confSentinel; conf++ {
if conf.IsFuzzy() {
if conf.Valid() && conf.IsFuzzy() {
fuzzies = append(fuzzies, conf)
}
}
Expand Down
36 changes: 30 additions & 6 deletions monitor/xmonitor/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func Start(
xprov xchain.Provider,
db db.DB,
) error {
indexer, err := newIndexer(db, network.StreamName)
indexer, err := newIndexer(db, xprov, network.StreamName)
if err != nil {
return errors.Wrap(err, "create indexer")
}
Expand Down Expand Up @@ -59,6 +59,7 @@ func Start(
// newIndexer creates a new indexer using the provided DB.
func newIndexer(
db db.DB,
xprov xchain.Provider,
streamNamer func(xchain.StreamID) string,
) (*indexer, error) {
schema := &ormv1alpha1.ModuleSchemaDescriptor{SchemaFile: []*ormv1alpha1.ModuleSchemaDescriptor_FileEntry{
Expand All @@ -78,6 +79,7 @@ func newIndexer(
}

return &indexer{
xprov: xprov,
streamNamer: streamNamer,
blockTable: dbStore.BlockTable(),
msgLinkTable: dbStore.MsgLinkTable(),
Expand All @@ -90,6 +92,7 @@ func newIndexer(
// indexer indexes xchain blocks and messages.
type indexer struct {
mu sync.RWMutex
xprov xchain.Provider
blockTable BlockTable
msgLinkTable MsgLinkTable
cursorTable CursorTable
Expand Down Expand Up @@ -377,13 +380,19 @@ func (i *indexer) instrumentMsg(ctx context.Context, link *MsgLink) error {
return errors.New("receipt not found in receipt block [BUG]")
}

override, err := isFuzzyOverride(ctx, i.xprov, receipt)
if err != nil {
return err
}

// Instrument sample
s := sample{
Stream: i.streamNamer(msg.StreamID),
XDApp: i.xdapp(msg.SourceMsgSender),
Latency: receiptBlock.Timestamp.Sub(msgBlock.Timestamp),
Success: receipt.Success,
ExcessGas: umath.SubtractOrZero(msg.DestGasLimit, receipt.GasUsed),
Stream: i.streamNamer(msg.StreamID),
XDApp: i.xdapp(msg.SourceMsgSender),
Latency: receiptBlock.Timestamp.Sub(msgBlock.Timestamp),
Success: receipt.Success,
ExcessGas: umath.SubtractOrZero(msg.DestGasLimit, receipt.GasUsed),
FuzzyOverride: override,
}
i.sampleFunc(s)

Expand All @@ -399,6 +408,21 @@ func (i *indexer) instrumentMsg(ctx context.Context, link *MsgLink) error {
return nil
}

// isFuzzyOverride returns true if this was a fuzzy xchain message that was
// submitted by a finalized confirmation attestation (and not by a fuzzy attestation).
func isFuzzyOverride(ctx context.Context, xprov xchain.Provider, receipt xchain.Receipt) (bool, error) {
if receipt.ShardID.ConfLevel().IsFinalized() {
return false, nil // Only fuzzy shards can be overridden.
}

sub, err := xprov.GetSubmission(ctx, receipt.DestChainID, receipt.TxHash)
if err != nil {
return false, errors.Wrap(err, "get submission")
}

return sub.AttHeader.ChainVersion.ConfLevel.IsFinalized(), nil
}

// xdapp returns the xdapp name for the given sender address or "unknown".
func (i *indexer) xdapp(sender common.Address) string {
resp, ok := i.xdapps[sender]
Expand Down
47 changes: 41 additions & 6 deletions monitor/xmonitor/indexer/indexer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/omni-network/omni/lib/umath"
"github.com/omni-network/omni/lib/xchain"

"github.com/ethereum/go-ethereum/common"

dbm "github.com/cosmos/cosmos-db"
fuzz "github.com/google/gofuzz"
"github.com/stretchr/testify/require"
Expand All @@ -24,7 +26,7 @@ func TestIndexer(t *testing.T) {

streamNamer := func(s xchain.StreamID) string { return fmt.Sprint(s) }

indexer, err := newIndexer(db, streamNamer)
indexer, err := newIndexer(db, mockXProvider{}, streamNamer)
require.NoError(t, err)
var samples []sample
indexer.sampleFunc = func(s sample) {
Expand Down Expand Up @@ -110,14 +112,23 @@ func TestIndexer(t *testing.T) {

func makeSample(blocks []xchain.Block, receipts []xchain.Receipt, msgs []xchain.Msg, idx int) sample {
return sample{
Stream: fmt.Sprint(receipts[idx].StreamID),
XDApp: "unknown",
Latency: getReceiptBlock(blocks, receipts[idx].MsgID).Timestamp.Sub(getMsgBlock(blocks, msgs[idx].MsgID).Timestamp),
ExcessGas: umath.SubtractOrZero(msgs[idx].DestGasLimit, receipts[idx].GasUsed),
Success: receipts[idx].Success,
Stream: fmt.Sprint(receipts[idx].StreamID),
XDApp: "unknown",
Latency: getReceiptBlock(blocks, receipts[idx].MsgID).Timestamp.Sub(getMsgBlock(blocks, msgs[idx].MsgID).Timestamp),
ExcessGas: umath.SubtractOrZero(msgs[idx].DestGasLimit, receipts[idx].GasUsed),
Success: receipts[idx].Success,
FuzzyOverride: expectFuzzyOverride(receipts[idx]),
}
}

func expectFuzzyOverride(receipt xchain.Receipt) bool {
if !receipt.ConfLevel().IsFuzzy() {
return false
}

return !mockConfLevel(receipt.TxHash).IsFuzzy()
}

func getReceiptBlock(blocks []xchain.Block, msgID xchain.MsgID) xchain.Block {
for _, block := range blocks {
_, err := block.ReceiptByID(msgID)
Expand Down Expand Up @@ -148,3 +159,27 @@ func fuzzBlock(f *fuzz.Fuzzer, msgs []xchain.Msg, receipts []xchain.Receipt) xch

return resp
}

type mockXProvider struct {
xchain.Provider
}

func (m mockXProvider) GetSubmission(_ context.Context, chainID uint64, txHash common.Hash) (xchain.Submission, error) {
return xchain.Submission{
AttHeader: xchain.AttestHeader{
ChainVersion: xchain.ChainVersion{
ID: chainID,
ConfLevel: mockConfLevel(txHash),
},
},
}, nil
}

func mockConfLevel(txHash common.Hash) xchain.ConfLevel {
confLevel = xchain.ConfFinalized
if txHash[0]%2 == 0 {
confLevel = xchain.ConfLatest
}

return confLevel
}
21 changes: 16 additions & 5 deletions monitor/xmonitor/indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ var (
Help: "Total number of reverted cross chain transactions per stream per xdapp",
}, []string{"stream", "xdapp"})

fuzzyOverrideCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "monitor",
Subsystem: "indexer",
Name: "fuzzy_override_total",
Help: "Total number of fuzzy override cross chain transactions per stream per xdapp",
}, []string{"stream", "xdapp"})

excessGasHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "monitor",
Subsystem: "indexer",
Expand All @@ -40,11 +47,12 @@ var (
)

type sample struct {
Stream string
XDApp string
Latency time.Duration
ExcessGas uint64
Success bool
Stream string
XDApp string
Latency time.Duration
ExcessGas uint64
Success bool
FuzzyOverride bool
}

func instrumentSample(s sample) {
Expand All @@ -53,6 +61,9 @@ func instrumentSample(s sample) {
} else {
revertCounter.WithLabelValues(s.Stream, s.XDApp).Inc()
}
if s.FuzzyOverride {
fuzzyOverrideCounter.WithLabelValues(s.Stream, s.XDApp).Inc()
}
latencyHist.WithLabelValues(s.Stream, s.XDApp).Observe(s.Latency.Seconds())
excessGasHist.WithLabelValues(s.Stream, s.XDApp).Observe(float64(s.ExcessGas))
}
Loading