From 36f463121ed4238d8ea4fbaea76c9c158cc27577 Mon Sep 17 00:00:00 2001 From: corver Date: Mon, 14 Oct 2024 12:48:31 +0200 Subject: [PATCH] chore(monitor): remove emitcache --- monitor/app/app.go | 27 +- .../emitcache/emitcursor.cosmos_orm.go | 220 ----------- monitor/xmonitor/emitcache/emitcursor.pb.go | 200 ---------- monitor/xmonitor/emitcache/emitcursor.proto | 23 -- monitor/xmonitor/emitcache/emitcursorcache.go | 355 ------------------ .../emitcursorcache_internal_test.go | 124 ------ monitor/xmonitor/emitcache/metrics.go | 22 -- monitor/xmonitor/monitor.go | 36 +- 8 files changed, 10 insertions(+), 997 deletions(-) delete mode 100644 monitor/xmonitor/emitcache/emitcursor.cosmos_orm.go delete mode 100644 monitor/xmonitor/emitcache/emitcursor.pb.go delete mode 100644 monitor/xmonitor/emitcache/emitcursor.proto delete mode 100644 monitor/xmonitor/emitcache/emitcursorcache.go delete mode 100644 monitor/xmonitor/emitcache/emitcursorcache_internal_test.go delete mode 100644 monitor/xmonitor/emitcache/metrics.go diff --git a/monitor/app/app.go b/monitor/app/app.go index df6e471c4..330d21a0a 100644 --- a/monitor/app/app.go +++ b/monitor/app/app.go @@ -9,7 +9,6 @@ import ( "github.com/omni-network/omni/contracts/bindings" "github.com/omni-network/omni/halo/genutil/evm/predeploys" "github.com/omni-network/omni/lib/buildinfo" - "github.com/omni-network/omni/lib/cchain" cprovider "github.com/omni-network/omni/lib/cchain/provider" "github.com/omni-network/omni/lib/errors" "github.com/omni-network/omni/lib/ethclient" @@ -91,7 +90,7 @@ func Run(ctx context.Context, cfg Config) error { return errors.Wrap(err, "start AVS sync") } - if err := startXMonitor(ctx, cfg, network, ethClients, cprov, xprov); err != nil { + if err := xmonitor.Start(ctx, network, xprov, cprov, ethClients); err != nil { return errors.Wrap(err, "start xchain monitor") } @@ -140,30 +139,6 @@ func startIndexer( return indexer.Start(ctx, network, xprov, db) } -// startXMonitor starts the xchain offset/head monitoring. -func startXMonitor( - ctx context.Context, - cfg Config, - network netconf.Network, - ethClients map[uint64]ethclient.Client, - cprov cchain.Provider, - xprov xchain.Provider, -) error { - var db dbm.DB - if cfg.DBDir == "" { - log.Warn(ctx, "No --db-dir provided, using in-memory DB", nil) - db = dbm.NewMemDB() - } else { - var err error - db, err = dbm.NewGoLevelDB("emitcache", cfg.DBDir, nil) - if err != nil { - return errors.Wrap(err, "new golevel db") - } - } - - return xmonitor.Start(ctx, network, xprov, cprov, ethClients, db) -} - // serveMonitoring starts a goroutine that serves the monitoring API. It // returns a channel that will receive an error if the server fails to start. func serveMonitoring(address string) <-chan error { diff --git a/monitor/xmonitor/emitcache/emitcursor.cosmos_orm.go b/monitor/xmonitor/emitcache/emitcursor.cosmos_orm.go deleted file mode 100644 index d020218ca..000000000 --- a/monitor/xmonitor/emitcache/emitcursor.cosmos_orm.go +++ /dev/null @@ -1,220 +0,0 @@ -// Code generated by protoc-gen-go-cosmos-orm. DO NOT EDIT. - -package emitcache - -import ( - context "context" - ormlist "cosmossdk.io/orm/model/ormlist" - ormtable "cosmossdk.io/orm/model/ormtable" - ormerrors "cosmossdk.io/orm/types/ormerrors" -) - -type EmitCursorTable interface { - Insert(ctx context.Context, emitCursor *EmitCursor) error - InsertReturningId(ctx context.Context, emitCursor *EmitCursor) (uint64, error) - LastInsertedSequence(ctx context.Context) (uint64, error) - Update(ctx context.Context, emitCursor *EmitCursor) error - Save(ctx context.Context, emitCursor *EmitCursor) error - Delete(ctx context.Context, emitCursor *EmitCursor) error - Has(ctx context.Context, id uint64) (found bool, err error) - // Get returns nil and an error which responds true to ormerrors.IsNotFound() if the record was not found. - Get(ctx context.Context, id uint64) (*EmitCursor, error) - HasBySrcChainIdDstChainIdShardIdHeight(ctx context.Context, src_chain_id uint64, dst_chain_id uint64, shard_id uint64, height uint64) (found bool, err error) - // GetBySrcChainIdDstChainIdShardIdHeight returns nil and an error which responds true to ormerrors.IsNotFound() if the record was not found. - GetBySrcChainIdDstChainIdShardIdHeight(ctx context.Context, src_chain_id uint64, dst_chain_id uint64, shard_id uint64, height uint64) (*EmitCursor, error) - List(ctx context.Context, prefixKey EmitCursorIndexKey, opts ...ormlist.Option) (EmitCursorIterator, error) - ListRange(ctx context.Context, from, to EmitCursorIndexKey, opts ...ormlist.Option) (EmitCursorIterator, error) - DeleteBy(ctx context.Context, prefixKey EmitCursorIndexKey) error - DeleteRange(ctx context.Context, from, to EmitCursorIndexKey) error - - doNotImplement() -} - -type EmitCursorIterator struct { - ormtable.Iterator -} - -func (i EmitCursorIterator) Value() (*EmitCursor, error) { - var emitCursor EmitCursor - err := i.UnmarshalMessage(&emitCursor) - return &emitCursor, err -} - -type EmitCursorIndexKey interface { - id() uint32 - values() []interface{} - emitCursorIndexKey() -} - -// primary key starting index.. -type EmitCursorPrimaryKey = EmitCursorIdIndexKey - -type EmitCursorIdIndexKey struct { - vs []interface{} -} - -func (x EmitCursorIdIndexKey) id() uint32 { return 0 } -func (x EmitCursorIdIndexKey) values() []interface{} { return x.vs } -func (x EmitCursorIdIndexKey) emitCursorIndexKey() {} - -func (this EmitCursorIdIndexKey) WithId(id uint64) EmitCursorIdIndexKey { - this.vs = []interface{}{id} - return this -} - -type EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey struct { - vs []interface{} -} - -func (x EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey) id() uint32 { return 2 } -func (x EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey) values() []interface{} { return x.vs } -func (x EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey) emitCursorIndexKey() {} - -func (this EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey) WithSrcChainId(src_chain_id uint64) EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey { - this.vs = []interface{}{src_chain_id} - return this -} - -func (this EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey) WithSrcChainIdDstChainId(src_chain_id uint64, dst_chain_id uint64) EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey { - this.vs = []interface{}{src_chain_id, dst_chain_id} - return this -} - -func (this EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey) WithSrcChainIdDstChainIdShardId(src_chain_id uint64, dst_chain_id uint64, shard_id uint64) EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey { - this.vs = []interface{}{src_chain_id, dst_chain_id, shard_id} - return this -} - -func (this EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey) WithSrcChainIdDstChainIdShardIdHeight(src_chain_id uint64, dst_chain_id uint64, shard_id uint64, height uint64) EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey { - this.vs = []interface{}{src_chain_id, dst_chain_id, shard_id, height} - return this -} - -type emitCursorTable struct { - table ormtable.AutoIncrementTable -} - -func (this emitCursorTable) Insert(ctx context.Context, emitCursor *EmitCursor) error { - return this.table.Insert(ctx, emitCursor) -} - -func (this emitCursorTable) Update(ctx context.Context, emitCursor *EmitCursor) error { - return this.table.Update(ctx, emitCursor) -} - -func (this emitCursorTable) Save(ctx context.Context, emitCursor *EmitCursor) error { - return this.table.Save(ctx, emitCursor) -} - -func (this emitCursorTable) Delete(ctx context.Context, emitCursor *EmitCursor) error { - return this.table.Delete(ctx, emitCursor) -} - -func (this emitCursorTable) InsertReturningId(ctx context.Context, emitCursor *EmitCursor) (uint64, error) { - return this.table.InsertReturningPKey(ctx, emitCursor) -} - -func (this emitCursorTable) LastInsertedSequence(ctx context.Context) (uint64, error) { - return this.table.LastInsertedSequence(ctx) -} - -func (this emitCursorTable) Has(ctx context.Context, id uint64) (found bool, err error) { - return this.table.PrimaryKey().Has(ctx, id) -} - -func (this emitCursorTable) Get(ctx context.Context, id uint64) (*EmitCursor, error) { - var emitCursor EmitCursor - found, err := this.table.PrimaryKey().Get(ctx, &emitCursor, id) - if err != nil { - return nil, err - } - if !found { - return nil, ormerrors.NotFound - } - return &emitCursor, nil -} - -func (this emitCursorTable) HasBySrcChainIdDstChainIdShardIdHeight(ctx context.Context, src_chain_id uint64, dst_chain_id uint64, shard_id uint64, height uint64) (found bool, err error) { - return this.table.GetIndexByID(2).(ormtable.UniqueIndex).Has(ctx, - src_chain_id, - dst_chain_id, - shard_id, - height, - ) -} - -func (this emitCursorTable) GetBySrcChainIdDstChainIdShardIdHeight(ctx context.Context, src_chain_id uint64, dst_chain_id uint64, shard_id uint64, height uint64) (*EmitCursor, error) { - var emitCursor EmitCursor - found, err := this.table.GetIndexByID(2).(ormtable.UniqueIndex).Get(ctx, &emitCursor, - src_chain_id, - dst_chain_id, - shard_id, - height, - ) - if err != nil { - return nil, err - } - if !found { - return nil, ormerrors.NotFound - } - return &emitCursor, nil -} - -func (this emitCursorTable) List(ctx context.Context, prefixKey EmitCursorIndexKey, opts ...ormlist.Option) (EmitCursorIterator, error) { - it, err := this.table.GetIndexByID(prefixKey.id()).List(ctx, prefixKey.values(), opts...) - return EmitCursorIterator{it}, err -} - -func (this emitCursorTable) ListRange(ctx context.Context, from, to EmitCursorIndexKey, opts ...ormlist.Option) (EmitCursorIterator, error) { - it, err := this.table.GetIndexByID(from.id()).ListRange(ctx, from.values(), to.values(), opts...) - return EmitCursorIterator{it}, err -} - -func (this emitCursorTable) DeleteBy(ctx context.Context, prefixKey EmitCursorIndexKey) error { - return this.table.GetIndexByID(prefixKey.id()).DeleteBy(ctx, prefixKey.values()...) -} - -func (this emitCursorTable) DeleteRange(ctx context.Context, from, to EmitCursorIndexKey) error { - return this.table.GetIndexByID(from.id()).DeleteRange(ctx, from.values(), to.values()) -} - -func (this emitCursorTable) doNotImplement() {} - -var _ EmitCursorTable = emitCursorTable{} - -func NewEmitCursorTable(db ormtable.Schema) (EmitCursorTable, error) { - table := db.GetTable(&EmitCursor{}) - if table == nil { - return nil, ormerrors.TableNotFound.Wrap(string((&EmitCursor{}).ProtoReflect().Descriptor().FullName())) - } - return emitCursorTable{table.(ormtable.AutoIncrementTable)}, nil -} - -type EmitcursorStore interface { - EmitCursorTable() EmitCursorTable - - doNotImplement() -} - -type emitcursorStore struct { - emitCursor EmitCursorTable -} - -func (x emitcursorStore) EmitCursorTable() EmitCursorTable { - return x.emitCursor -} - -func (emitcursorStore) doNotImplement() {} - -var _ EmitcursorStore = emitcursorStore{} - -func NewEmitcursorStore(db ormtable.Schema) (EmitcursorStore, error) { - emitCursorTable, err := NewEmitCursorTable(db) - if err != nil { - return nil, err - } - - return emitcursorStore{ - emitCursorTable, - }, nil -} diff --git a/monitor/xmonitor/emitcache/emitcursor.pb.go b/monitor/xmonitor/emitcache/emitcursor.pb.go deleted file mode 100644 index 8459875ac..000000000 --- a/monitor/xmonitor/emitcache/emitcursor.pb.go +++ /dev/null @@ -1,200 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.35.1 -// protoc (unknown) -// source: monitor/xmonitor/emitcache/emitcursor.proto - -package emitcache - -import ( - _ "cosmossdk.io/api/cosmos/orm/v1" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// EmitCursor stores historical emit cursors for all evm portals. -type EmitCursor struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` // Auto-incremented ID - SrcChainId uint64 `protobuf:"varint,2,opt,name=src_chain_id,json=srcChainId,proto3" json:"src_chain_id,omitempty"` // Source chain ID as per https://chainlist.org - Height uint64 `protobuf:"varint,3,opt,name=height,proto3" json:"height,omitempty"` // Block Height - DstChainId uint64 `protobuf:"varint,4,opt,name=dst_chain_id,json=dstChainId,proto3" json:"dst_chain_id,omitempty"` // Destination Chain ID as per https://chainlist.org - ShardId uint64 `protobuf:"varint,5,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` // Shard ID - MsgOffset uint64 `protobuf:"varint,6,opt,name=msg_offset,json=msgOffset,proto3" json:"msg_offset,omitempty"` // XMsg Stream Offset -} - -func (x *EmitCursor) Reset() { - *x = EmitCursor{} - mi := &file_monitor_xmonitor_emitcache_emitcursor_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *EmitCursor) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*EmitCursor) ProtoMessage() {} - -func (x *EmitCursor) ProtoReflect() protoreflect.Message { - mi := &file_monitor_xmonitor_emitcache_emitcursor_proto_msgTypes[0] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use EmitCursor.ProtoReflect.Descriptor instead. -func (*EmitCursor) Descriptor() ([]byte, []int) { - return file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescGZIP(), []int{0} -} - -func (x *EmitCursor) GetId() uint64 { - if x != nil { - return x.Id - } - return 0 -} - -func (x *EmitCursor) GetSrcChainId() uint64 { - if x != nil { - return x.SrcChainId - } - return 0 -} - -func (x *EmitCursor) GetHeight() uint64 { - if x != nil { - return x.Height - } - return 0 -} - -func (x *EmitCursor) GetDstChainId() uint64 { - if x != nil { - return x.DstChainId - } - return 0 -} - -func (x *EmitCursor) GetShardId() uint64 { - if x != nil { - return x.ShardId - } - return 0 -} - -func (x *EmitCursor) GetMsgOffset() uint64 { - if x != nil { - return x.MsgOffset - } - return 0 -} - -var File_monitor_xmonitor_emitcache_emitcursor_proto protoreflect.FileDescriptor - -var file_monitor_xmonitor_emitcache_emitcursor_proto_rawDesc = []byte{ - 0x0a, 0x2b, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x78, 0x6d, 0x6f, 0x6e, 0x69, 0x74, - 0x6f, 0x72, 0x2f, 0x65, 0x6d, 0x69, 0x74, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x65, 0x6d, 0x69, - 0x74, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1a, 0x6d, - 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x78, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, - 0x65, 0x6d, 0x69, 0x74, 0x63, 0x61, 0x63, 0x68, 0x65, 0x1a, 0x17, 0x63, 0x6f, 0x73, 0x6d, 0x6f, - 0x73, 0x2f, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x6f, 0x72, 0x6d, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x22, 0xf5, 0x01, 0x0a, 0x0a, 0x45, 0x6d, 0x69, 0x74, 0x43, 0x75, 0x72, 0x73, 0x6f, - 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x02, 0x69, - 0x64, 0x12, 0x20, 0x0a, 0x0c, 0x73, 0x72, 0x63, 0x5f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, - 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x73, 0x72, 0x63, 0x43, 0x68, 0x61, 0x69, - 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x20, 0x0a, 0x0c, 0x64, - 0x73, 0x74, 0x5f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x0a, 0x64, 0x73, 0x74, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x19, 0x0a, - 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x73, 0x67, 0x5f, - 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6d, 0x73, - 0x67, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x3a, 0x41, 0xf2, 0x9e, 0xd3, 0x8e, 0x03, 0x3b, 0x0a, - 0x06, 0x0a, 0x02, 0x69, 0x64, 0x10, 0x01, 0x12, 0x2f, 0x0a, 0x29, 0x73, 0x72, 0x63, 0x5f, 0x63, - 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x2c, 0x64, 0x73, 0x74, 0x5f, 0x63, 0x68, 0x61, 0x69, - 0x6e, 0x5f, 0x69, 0x64, 0x2c, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x2c, 0x68, 0x65, - 0x69, 0x67, 0x68, 0x74, 0x10, 0x02, 0x18, 0x01, 0x18, 0x01, 0x42, 0xf4, 0x01, 0x0a, 0x1e, 0x63, - 0x6f, 0x6d, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x78, 0x6d, 0x6f, 0x6e, 0x69, - 0x74, 0x6f, 0x72, 0x2e, 0x65, 0x6d, 0x69, 0x74, 0x63, 0x61, 0x63, 0x68, 0x65, 0x42, 0x0f, 0x45, - 0x6d, 0x69, 0x74, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, - 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x6d, 0x6e, - 0x69, 0x2d, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x6f, 0x6d, 0x6e, 0x69, 0x2f, 0x6d, - 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x78, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, - 0x65, 0x6d, 0x69, 0x74, 0x63, 0x61, 0x63, 0x68, 0x65, 0xa2, 0x02, 0x03, 0x4d, 0x58, 0x45, 0xaa, - 0x02, 0x1a, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x58, 0x6d, 0x6f, 0x6e, 0x69, 0x74, - 0x6f, 0x72, 0x2e, 0x45, 0x6d, 0x69, 0x74, 0x63, 0x61, 0x63, 0x68, 0x65, 0xca, 0x02, 0x1a, 0x4d, - 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x5c, 0x58, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x5c, - 0x45, 0x6d, 0x69, 0x74, 0x63, 0x61, 0x63, 0x68, 0x65, 0xe2, 0x02, 0x26, 0x4d, 0x6f, 0x6e, 0x69, - 0x74, 0x6f, 0x72, 0x5c, 0x58, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x5c, 0x45, 0x6d, 0x69, - 0x74, 0x63, 0x61, 0x63, 0x68, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0xea, 0x02, 0x1c, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x3a, 0x3a, 0x58, 0x6d, - 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x3a, 0x3a, 0x45, 0x6d, 0x69, 0x74, 0x63, 0x61, 0x63, 0x68, - 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescOnce sync.Once - file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescData = file_monitor_xmonitor_emitcache_emitcursor_proto_rawDesc -) - -func file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescGZIP() []byte { - file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescOnce.Do(func() { - file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescData = protoimpl.X.CompressGZIP(file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescData) - }) - return file_monitor_xmonitor_emitcache_emitcursor_proto_rawDescData -} - -var file_monitor_xmonitor_emitcache_emitcursor_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_monitor_xmonitor_emitcache_emitcursor_proto_goTypes = []any{ - (*EmitCursor)(nil), // 0: monitor.xmonitor.emitcache.EmitCursor -} -var file_monitor_xmonitor_emitcache_emitcursor_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_monitor_xmonitor_emitcache_emitcursor_proto_init() } -func file_monitor_xmonitor_emitcache_emitcursor_proto_init() { - if File_monitor_xmonitor_emitcache_emitcursor_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_monitor_xmonitor_emitcache_emitcursor_proto_rawDesc, - NumEnums: 0, - NumMessages: 1, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_monitor_xmonitor_emitcache_emitcursor_proto_goTypes, - DependencyIndexes: file_monitor_xmonitor_emitcache_emitcursor_proto_depIdxs, - MessageInfos: file_monitor_xmonitor_emitcache_emitcursor_proto_msgTypes, - }.Build() - File_monitor_xmonitor_emitcache_emitcursor_proto = out.File - file_monitor_xmonitor_emitcache_emitcursor_proto_rawDesc = nil - file_monitor_xmonitor_emitcache_emitcursor_proto_goTypes = nil - file_monitor_xmonitor_emitcache_emitcursor_proto_depIdxs = nil -} diff --git a/monitor/xmonitor/emitcache/emitcursor.proto b/monitor/xmonitor/emitcache/emitcursor.proto deleted file mode 100644 index ec046e1d9..000000000 --- a/monitor/xmonitor/emitcache/emitcursor.proto +++ /dev/null @@ -1,23 +0,0 @@ -syntax = "proto3"; - -package monitor.xmonitor.emitcache; - -import "cosmos/orm/v1/orm.proto"; - -option go_package = "monitor/xmonitor/emitcache"; - -// EmitCursor stores historical emit cursors for all evm portals. -message EmitCursor { - option (cosmos.orm.v1.table) = { - id: 1; - primary_key: { fields: "id", auto_increment: true } - index: {id: 2, fields: "src_chain_id,dst_chain_id,shard_id,height", unique: true} // Allow querying and searching. - }; - - uint64 id = 1; // Auto-incremented ID - uint64 src_chain_id = 2; // Source chain ID as per https://chainlist.org - uint64 height = 3; // Block Height - uint64 dst_chain_id = 4; // Destination Chain ID as per https://chainlist.org - uint64 shard_id = 5; // Shard ID - uint64 msg_offset = 6; // XMsg Stream Offset -} diff --git a/monitor/xmonitor/emitcache/emitcursorcache.go b/monitor/xmonitor/emitcache/emitcursorcache.go deleted file mode 100644 index 8d14a90ba..000000000 --- a/monitor/xmonitor/emitcache/emitcursorcache.go +++ /dev/null @@ -1,355 +0,0 @@ -package emitcache - -import ( - "context" - "sync" - "time" - - "github.com/omni-network/omni/lib/errors" - "github.com/omni-network/omni/lib/log" - "github.com/omni-network/omni/lib/netconf" - "github.com/omni-network/omni/lib/umath" - "github.com/omni-network/omni/lib/xchain" - - ormv1alpha1 "cosmossdk.io/api/cosmos/orm/v1alpha1" - "cosmossdk.io/core/store" - "cosmossdk.io/orm/model/ormdb" - "cosmossdk.io/orm/model/ormlist" - "cosmossdk.io/orm/types/ormerrors" - db "github.com/cosmos/cosmos-db" - "golang.org/x/sync/errgroup" -) - -const ( - // cacheRetain is the number of block heights after which cursors are evicted from the cache. - cacheRetain = 10_000 - // cacheStartLag is the number of blocks behind latest to start streaming and populating the cache. - // 128 is the default number of historical block state that geth stores in non-archive mode. - cacheStartLag = 128 -) - -type Cache interface { - Get(ctx context.Context, height uint64, stream xchain.StreamID) (xchain.EmitCursor, bool, error) - AtOrBefore(ctx context.Context, height uint64, stream xchain.StreamID) (xchain.EmitCursor, bool, error) -} - -// Start subscribes the xprovider iot populate the emit cursor cache. -// It returns a cache that will be populated and trimmed asynchronously. -func Start( - ctx context.Context, - network netconf.Network, - xprov xchain.Provider, - db db.DB, -) (Cache, error) { - cache, err := newEmitCursorCache(db, network.StreamName, xprov.GetEmittedCursor) - if err != nil { - return nil, err - } - - for _, chain := range network.Chains { - callback := func(ctx context.Context, block xchain.Block) error { - // Ignore blocks that are not attested. - if !block.ShouldAttest(chain.AttestInterval) { - return nil - } - - // Update the emit cursor cache for each stream for this height (in parallel). - var eg errgroup.Group - for _, stream := range network.StreamsFrom(chain.ID) { - eg.Go(func() error { - ref := xchain.EmitRef{Height: &block.BlockHeight} - emit, _, err := xprov.GetEmittedCursor(ctx, ref, stream) - if err != nil { - latest, _ := xprov.ChainVersionHeight(ctx, xchain.ChainVersion{ID: chain.ID, ConfLevel: xchain.ConfLatest}) - return errors.Wrap(err, "get emit cursor", - "stream", network.StreamName(stream), - "lagging", umath.SubtractOrZero(latest, block.BlockHeight), - ) - } - // Populate a zero cursor if not found. - - return cache.set(ctx, block.BlockHeight, emit) - }) - } - - if err := eg.Wait(); err != nil { - // Log warn and continue, don't block or retry. - log.Warn(ctx, "Failed (partially) to populate emit cursor cache (skipping)", err, "height", block.BlockHeight, "chain", chain.Name) - } else { - log.Debug(ctx, "Populated emit cursor cache", "height", block.BlockHeight, "chain", chain.Name) - } - - return nil - } - - // Figure out where to start streaming from. - latest, err := xprov.ChainVersionHeight(ctx, xchain.ChainVersion{ID: chain.ID, ConfLevel: xchain.ConfLatest}) - if err != nil { - return nil, errors.Wrap(err, "latest height", "chain", chain.Name) - } - - fromHeight := umath.SubtractOrZero(latest, cacheStartLag) // Start as far back as cacheStartLag blocks. - if fromHeight < chain.DeployHeight { - fromHeight = chain.DeployHeight // But not before chain deploy height. - } - - req := xchain.ProviderRequest{ - ChainID: chain.ID, - Height: fromHeight, - ConfLevel: xchain.ConfLatest, // Stream latest height to ensure state is available for querying. - } - - log.Info(ctx, "Subscribing to xblocks to populate emit cursor cache", "chain", chain.Name, "from_height", fromHeight) - - if err := xprov.StreamAsync(ctx, req, callback); err != nil { - return nil, err - } - - // Start a goroutine to trim this chain. - go cache.trimForever(ctx, network.ID, chain.ID) - } - - return cache, nil -} - -// newEmitCursorCache creates a new emit cursor cache using the provided DB. -func newEmitCursorCache( - db db.DB, - streamNamer func(xchain.StreamID) string, - fallbackFunc func(context.Context, xchain.EmitRef, xchain.StreamID) (xchain.EmitCursor, bool, error), -) (*emitCursorCache, error) { - schema := &ormv1alpha1.ModuleSchemaDescriptor{SchemaFile: []*ormv1alpha1.ModuleSchemaDescriptor_FileEntry{ - {Id: 1, ProtoFileName: File_monitor_xmonitor_emitcache_emitcursor_proto.Path()}, - }} - - storeSvc := dbStoreService{DB: db} - - modDB, err := ormdb.NewModuleDB(schema, ormdb.ModuleDBOptions{KVStoreService: storeSvc}) - if err != nil { - return nil, errors.Wrap(err, "create ormdb module db") - } - - dbStore, err := NewEmitcursorStore(modDB) - if err != nil { - return nil, errors.Wrap(err, "create store") - } - - return &emitCursorCache{ - table: dbStore.EmitCursorTable(), - streamNamer: streamNamer, - fallbackFunc: fallbackFunc, - }, nil -} - -// emitCursorCache is a cache of the last 10k emit cursors for each stream. -// This is used to avoid querying chain state (emit cursor) for historical blocks -// as this requires archive nodes. -// Instead we cache the emit cursor of latest blocks, and query the cache for historical blocks -// while monitoring attested stream offsets. -type emitCursorCache struct { - mu sync.RWMutex - table EmitCursorTable - streamNamer func(xchain.StreamID) string - // fallbackFunc is called when a cursor is not found in the cache. - // This is a workaround for the cache not being fully populated yet. - fallbackFunc func(context.Context, xchain.EmitRef, xchain.StreamID) (xchain.EmitCursor, bool, error) -} - -// set adds a cursor to the cache for the given height and stream. -// It updates the cursor if it already exists. -func (c *emitCursorCache) set(ctx context.Context, height uint64, cursor xchain.EmitCursor) error { - c.mu.Lock() - defer c.mu.Unlock() - - err := c.table.Insert(ctx, &EmitCursor{ - SrcChainId: cursor.SourceChainID, - Height: height, - DstChainId: cursor.DestChainID, - ShardId: uint64(cursor.ShardID), - MsgOffset: cursor.MsgOffset, - }) - if errors.Is(err, ormerrors.UniqueKeyViolation) { - // Cursor already exists, update it - existing, err := c.table.GetBySrcChainIdDstChainIdShardIdHeight(ctx, cursor.SourceChainID, cursor.DestChainID, uint64(cursor.ShardID), height) - if err != nil { - return errors.Wrap(err, "get emit cursor") - } - existing.MsgOffset = cursor.MsgOffset - if err := c.table.Update(ctx, existing); err != nil { - return errors.Wrap(err, "update emit cursor") - } - - return nil - } else if err != nil { - return errors.Wrap(err, "insert emit cursor") - } - - return nil -} - -// Get returns the emit cursor for the given height and stream. -func (c *emitCursorCache) Get(ctx context.Context, height uint64, stream xchain.StreamID) (xchain.EmitCursor, bool, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - cursor, err := c.table.GetBySrcChainIdDstChainIdShardIdHeight(ctx, stream.SourceChainID, stream.DestChainID, uint64(stream.ShardID), height) - if ormerrors.IsNotFound(err) { - missCounter.WithLabelValues(c.streamNamer(stream)).Inc() - return c.fallbackFunc(ctx, xchain.HeightEmitRef(height), stream) - } else if err != nil { - return xchain.EmitCursor{}, false, errors.Wrap(err, "get emit cursor") - } - - hitCounter.WithLabelValues(c.streamNamer(stream)).Inc() - - return xchain.EmitCursor{ - StreamID: stream, - MsgOffset: cursor.GetMsgOffset(), - }, true, nil -} - -// AtOrBefore returns the stream emit cursor at-or-before the given height. -// Only attested heights are populated, so the first cursor at-or-before any -// height will return the correct cursor. -func (c *emitCursorCache) AtOrBefore(ctx context.Context, height uint64, stream xchain.StreamID) (xchain.EmitCursor, bool, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - start := EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey{}.WithSrcChainIdDstChainIdShardIdHeight( - stream.SourceChainID, - stream.DestChainID, - uint64(stream.ShardID), - 0, - ) - - end := EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey{}.WithSrcChainIdDstChainIdShardIdHeight( - stream.SourceChainID, - stream.DestChainID, - uint64(stream.ShardID), - height, - ) - - iter, err := c.table.ListRange(ctx, start, end, ormlist.Reverse(), ormlist.DefaultLimit(1)) - if err != nil { - return xchain.EmitCursor{}, false, errors.Wrap(err, "list emit cursor") - } - defer iter.Close() - - if !iter.Next() { - missCounter.WithLabelValues(c.streamNamer(stream)).Inc() - return c.fallbackFunc(ctx, xchain.HeightEmitRef(height), stream) - } - - cursor, err := iter.Value() - if err != nil { - return xchain.EmitCursor{}, false, errors.Wrap(err, "emit cursor value") - } - - if iter.Next() { - return xchain.EmitCursor{}, false, errors.New("multiple results [BUG]") - } - - hitCounter.WithLabelValues(c.streamNamer(stream)).Inc() - - return xchain.EmitCursor{ - StreamID: stream, - MsgOffset: cursor.GetMsgOffset(), - }, true, err -} - -func (c *emitCursorCache) trimForever(ctx context.Context, network netconf.ID, chainID uint64) { - period := time.Hour // Only trim once an hour, since it does table scans. - if network.IsEphemeral() { - period = time.Minute * 10 // Make it faster for ephemeral chains - } - - ticker := time.NewTicker(period) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - err := c.trimOnce(ctx, chainID, cacheRetain) - if ctx.Err() != nil { - return // Don't log error on shutdown - } else if err != nil { - log.Warn(ctx, "Trim emit cursor cache failed (will retry)", err, "chain", chainID) - } - } - } -} - -func (c *emitCursorCache) trimOnce(ctx context.Context, chainID uint64, retain uint64) error { - t0 := time.Now() - - ids, err := c.detectTrim(ctx, chainID, retain) - if err != nil { - return err - } - - for _, id := range ids { - c.mu.Lock() - err := c.table.DeleteBy(ctx, EmitCursorIdIndexKey{}.WithId(id)) - c.mu.Unlock() - if err != nil { - return errors.Wrap(err, "delete emit cursor cache") - } - } - - var first, last uint64 - if len(ids) > 0 { - first, last = ids[0], ids[len(ids)-1] - } - - log.Debug(ctx, "Trimmed emit cursor cache", "chain", chainID, "count", len(ids), "first", first, "last", last, "duration", time.Since(t0)) - - return nil -} - -// detectTrim returns a list of emit cursor IDs to delete from the cache. -// It returns the IDs of the oldest chainID cursors, excluding the most recent `retainHeight` cursor heights. -func (c *emitCursorCache) detectTrim(ctx context.Context, chainID uint64, retainHeights uint64) ([]uint64, error) { - // Not taking a read lock since this is a very slow an expensive table scan. - prefix := EmitCursorSrcChainIdDstChainIdShardIdHeightIndexKey{}.WithSrcChainId(chainID) - iter, err := c.table.List(ctx, prefix, ormlist.Reverse()) - if err != nil { - return nil, errors.Wrap(err, "delete emit cursor cache") - } - defer iter.Close() - - var ids []uint64 - var prevHeight uint64 - for iter.Next() { - cursor, err := iter.Value() - if err != nil { - return nil, errors.Wrap(err, "emit cursor value") - } - - if prevHeight == 0 { - prevHeight = cursor.GetHeight() - } else if retainHeights > 0 && cursor.GetHeight() < prevHeight { - // Skip the most recent `retainHeights` cursors. - prevHeight = cursor.GetHeight() - retainHeights-- - } - - if retainHeights > 0 { - continue - } - - ids = append(ids, cursor.GetId()) - } - - return ids, nil -} - -// dbStoreService wraps a cosmos-db instance and provides it via OpenKVStore. -type dbStoreService struct { - db.DB -} - -func (db dbStoreService) OpenKVStore(context.Context) store.KVStore { - return db.DB -} diff --git a/monitor/xmonitor/emitcache/emitcursorcache_internal_test.go b/monitor/xmonitor/emitcache/emitcursorcache_internal_test.go deleted file mode 100644 index 9f46dbf40..000000000 --- a/monitor/xmonitor/emitcache/emitcursorcache_internal_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package emitcache - -import ( - "context" - "math" - "testing" - - "github.com/omni-network/omni/lib/errors" - "github.com/omni-network/omni/lib/xchain" - - dbm "github.com/cosmos/cosmos-db" - "github.com/stretchr/testify/require" -) - -func TestEmitCursorCache(t *testing.T) { - t.Parallel() - db := dbm.NewMemDB() - - fallbackErr := errors.New("fallback") - fallbackFunc := func(context.Context, xchain.EmitRef, xchain.StreamID) (xchain.EmitCursor, bool, error) { - return xchain.EmitCursor{}, false, fallbackErr - } - streamNamer := func(xchain.StreamID) string { return "" } - - cache, err := newEmitCursorCache(db, streamNamer, fallbackFunc) - require.NoError(t, err) - ctx := context.Background() - - assertContains := func(t *testing.T, height uint64, stream xchain.StreamID, cursor xchain.EmitCursor) { - t.Helper() - c, ok, err := cache.Get(ctx, height, stream) - require.NoError(t, err) - require.True(t, ok) - require.Equal(t, cursor, c) - - c, ok, err = cache.AtOrBefore(ctx, height, stream) - require.NoError(t, err) - require.True(t, ok) - require.Equal(t, cursor, c) - } - - assertHighest := func(t *testing.T, stream xchain.StreamID, cursor xchain.EmitCursor) { - t.Helper() - const maxHeight = math.MaxUint64 - c, ok, err := cache.AtOrBefore(ctx, maxHeight, stream) - require.NoError(t, err) - require.True(t, ok) - require.Equal(t, cursor, c) - } - - assertNotContains := func(t *testing.T, height uint64, stream xchain.StreamID) { - t.Helper() - _, ok, err := cache.Get(ctx, height, stream) - require.ErrorIs(t, err, fallbackErr) - require.False(t, ok) - } - - set := func(t *testing.T, height uint64, cursor xchain.EmitCursor) { - t.Helper() - require.NoError(t, cache.set(ctx, height, cursor)) - } - - trim := func(t *testing.T, chainID uint64, retain uint64) { - t.Helper() - require.NoError(t, cache.trimOnce(ctx, chainID, retain)) - } - - stream1 := xchain.StreamID{SourceChainID: 1} - stream2 := xchain.StreamID{SourceChainID: 2} - - cursor11 := xchain.EmitCursor{StreamID: stream1, MsgOffset: 1} - cursor12 := xchain.EmitCursor{StreamID: stream1, MsgOffset: 2} - cursor21 := xchain.EmitCursor{StreamID: stream2, MsgOffset: 1} - cursor22 := xchain.EmitCursor{StreamID: stream2, MsgOffset: 2} - - stream99 := xchain.StreamID{SourceChainID: 99} - - assertNotContains(t, 1, stream1) - assertNotContains(t, 2, stream1) - set(t, 1, cursor12) - assertContains(t, 1, stream1, cursor12) - set(t, 1, cursor11) // Update it to 11 - assertContains(t, 1, stream1, cursor11) - trim(t, stream1.SourceChainID, 99) // Nothing trimmed - assertContains(t, 1, stream1, cursor11) - assertHighest(t, stream1, cursor11) - - assertNotContains(t, 2, stream1) - set(t, 2, cursor12) - trim(t, stream1.SourceChainID, 99) // Nothing trimmed - assertContains(t, 2, stream1, cursor12) - assertContains(t, 1, stream1, cursor11) - assertHighest(t, stream1, cursor12) - - assertNotContains(t, 1, stream2) - assertNotContains(t, 2, stream2) - set(t, 1, cursor21) - assertContains(t, 1, stream2, cursor21) - assertHighest(t, stream2, cursor21) - - assertNotContains(t, 2, stream2) - set(t, 2, cursor22) - assertContains(t, 2, stream2, cursor22) - assertContains(t, 1, stream2, cursor21) - assertHighest(t, stream2, cursor22) - - assertNotContains(t, 1, stream99) - assertNotContains(t, 2, stream99) - - trim(t, stream1.SourceChainID, 1) - assertNotContains(t, 1, stream1) - assertContains(t, 2, stream1, cursor12) - assertContains(t, 1, stream2, cursor21) - assertContains(t, 2, stream2, cursor22) - - trim(t, stream1.SourceChainID, 0) - assertNotContains(t, 2, stream1) - assertContains(t, 1, stream2, cursor21) - assertContains(t, 2, stream2, cursor22) - - trim(t, stream2.SourceChainID, 0) - assertNotContains(t, 2, stream2) - assertNotContains(t, 1, stream2) -} diff --git a/monitor/xmonitor/emitcache/metrics.go b/monitor/xmonitor/emitcache/metrics.go deleted file mode 100644 index 0f97968fe..000000000 --- a/monitor/xmonitor/emitcache/metrics.go +++ /dev/null @@ -1,22 +0,0 @@ -package emitcache - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var ( - hitCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "monitor", - Subsystem: "emitcache", - Name: "hit_total", - Help: "Total number of emitcache hits per stream", - }, []string{"stream"}) - - missCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "monitor", - Subsystem: "emitcache", - Name: "miss_total", - Help: "Total number of emitcache misses per stream", - }, []string{"stream"}) -) diff --git a/monitor/xmonitor/monitor.go b/monitor/xmonitor/monitor.go index c4567953b..c415046d2 100644 --- a/monitor/xmonitor/monitor.go +++ b/monitor/xmonitor/monitor.go @@ -10,9 +10,6 @@ import ( "github.com/omni-network/omni/lib/log" "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/xchain" - "github.com/omni-network/omni/monitor/xmonitor/emitcache" - - dbm "github.com/cosmos/cosmos-db" ) // Start starts the xchain monitoring goroutines. @@ -22,13 +19,7 @@ func Start( xprovider xchain.Provider, cprovider cchain.Provider, rpcClients map[uint64]ethclient.Client, - db dbm.DB, ) error { - cache, err := emitcache.Start(ctx, network, xprovider, db) - if err != nil { - return err - } - // Monitor the head of all chains, including consensus. for _, srcChain := range network.Chains { headsFunc := func(ctx context.Context) map[ethclient.HeadType]uint64 { @@ -41,7 +32,7 @@ func Start( } go monitorHeadsForever(ctx, srcChain, headsFunc) - go monitorAttestedForever(ctx, srcChain, cprovider, network, cache) + go monitorAttestedForever(ctx, srcChain, cprovider, network, xprovider) } // Monitors below only apply to EVM chains. @@ -51,7 +42,7 @@ func Start( continue } - go monitorOffsetsForever(ctx, xprovider, network, srcChain, dstChain, cache) + go monitorOffsetsForever(ctx, xprovider, network, srcChain, dstChain) } } @@ -145,7 +136,7 @@ func monitorAttestedForever( srcChain netconf.Chain, cprovider cchain.Provider, network netconf.Network, - cache emitcache.Cache, + xprovider xchain.Provider, ) { chainVer := srcChain.ChainVersions()[0] @@ -172,14 +163,9 @@ func monitorAttestedForever( for _, stream := range network.StreamsFrom(srcChain.ID) { name := network.StreamName(stream) - cursor, ok, err := cache.Get(ctx, att.BlockHeight, stream) + cursor, _, err := xprovider.GetEmittedCursor(ctx, xchain.EmitRef{Height: &att.BlockHeight}, stream) if err != nil { - log.Warn(ctx, "Getting cache failed (will retry)", err, "chain", srcChain.Name) - continue - } else if !ok { - log.Warn(ctx, "Emit cursor cache not populated", nil, - "height", att.BlockHeight, "stream", name) - + log.Warn(ctx, "Failed getting emit cursor (will retry)", err, "chain", srcChain.Name) continue } @@ -229,7 +215,6 @@ func monitorOffsetsForever( xprovider xchain.Provider, network netconf.Network, src, dst netconf.Chain, - cache emitcache.Cache, ) { ticker := time.NewTicker(time.Second * 30) defer ticker.Stop() @@ -239,7 +224,7 @@ func monitorOffsetsForever( case <-ctx.Done(): return case <-ticker.C: - err := monitorOffsetsOnce(ctx, xprovider, network, src, dst, cache) + err := monitorOffsetsOnce(ctx, xprovider, network, src, dst) if ctx.Err() != nil { return } else if err != nil { @@ -257,7 +242,6 @@ func monitorOffsetsOnce( xprovider xchain.Provider, network netconf.Network, src, dst netconf.Chain, - cache emitcache.Cache, ) error { var lastErr error for _, stream := range network.StreamsBetween(src.ID, dst.ID) { @@ -271,12 +255,10 @@ func monitorOffsetsOnce( continue // Don't monitor chains before finalized. } - emitted, ok, err := cache.AtOrBefore(ctx, height, stream) + confLevel := stream.ConfLevel() + emitted, _, err := xprovider.GetEmittedCursor(ctx, xchain.EmitRef{ConfLevel: &confLevel}, stream) if err != nil { - lastErr = errors.Wrap(err, "query cache") - continue - } else if !ok { - lastErr = errors.New("emit cursor cache not populated", "stream", network.StreamName(stream), "height", height) + lastErr = errors.Wrap(err, "get emit cursor", "stream", stream) continue }