diff --git a/cmd/app/app.go b/cmd/app/app.go index c2ef441..7effa46 100644 --- a/cmd/app/app.go +++ b/cmd/app/app.go @@ -45,7 +45,7 @@ func action(ctx *cli.Context) error { // Create db instance. db, err := utils.InitDB(cfg.DBConfig) if err != nil { - log.Error("failed to init db", "err", err) + log.Error("failed to connect to db", "err", err) return err } defer func() { @@ -97,7 +97,8 @@ func action(ctx *cli.Context) error { go utils.LoopWithContext(subCtx, time.Millisecond*1500, l1Watcher.ScanL1Chain) go utils.LoopWithContext(subCtx, time.Millisecond*1500, l2Watcher.ScanL2Chain) - go utils.LoopWithContext(subCtx, time.Millisecond*200, chainMonitor.ChainMonitor) + go utils.LoopWithContext(subCtx, time.Millisecond*200, chainMonitor.DepositConfirm) + go utils.LoopWithContext(subCtx, time.Millisecond*500, chainMonitor.WithdrawConfirm) // Catch CTRL-C to ensure a graceful shutdown. interrupt := make(chan os.Signal, 1) diff --git a/config.json b/config.json index 6a199d1..39aaee8 100644 --- a/config.json +++ b/config.json @@ -1,7 +1,7 @@ { "l1_config": { - "l1chain_url": "http://10.5.12.230:8545/l1", - "confirm": 1, + "l1_url": "http://10.5.12.230:8545/l1", + "confirm": "0x4", "start_number": 4041200, "l1_gateways": { "eth_gateway": "0x8A54A2347Da2562917304141ab67324615e9866d", @@ -17,8 +17,8 @@ } }, "l2_config": { - "l2chain_url": "http://10.5.11.195:8545", - "confirm": 6, + "l2_url": "http://10.5.11.195:8545", + "confirm": "0x4", "l2_gateways": { "eth_gateway": "0x91e8ADDFe1358aCa5314c644312d38237fC1101C", "weth_gateway": "0x481B20A927206aF7A754dB8b904B052e2781ea27", @@ -34,7 +34,7 @@ "slack_webhook_config": { "channel": "#chain_monitor", "user_name": "chain_monitor", - "webhook_url": "https://app.slack.com/huddle/T0232HPBN87/C05QUU3LSE9" + "webhook_url": "" }, "db_config": { "driver_name": "postgres", diff --git a/internal/config/config.go b/internal/config/config.go index 9e27514..e492e8a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,6 +6,7 @@ import ( "path/filepath" "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/rpc" ) // Gateway address list. @@ -30,16 +31,16 @@ type L1Contracts struct { // L1Config l1 chain config. type L1Config struct { L1Gateways *L1Contracts `json:"l1_gateways"` - L1ChainURL string `json:"l1chain_url"` - Confirm uint64 + L1URL string `json:"l1_url"` + Confirm rpc.BlockNumber StartNumber uint64 `json:"start_number"` } // L2Config l1 chain config. type L2Config struct { - L2gateways *Gateway `json:"l2_gateways"` - L2ChainURL string `json:"l2chain_url"` - Confirm uint64 + L2Gateways *Gateway `json:"l2_gateways"` + L2URL string `json:"l2_url"` + Confirm rpc.BlockNumber } // DBConfig db config diff --git a/internal/controller/api.go b/internal/controller/api.go index 1384331..3b4fd2e 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -27,11 +27,16 @@ func (m *ChainConfirm) ConfirmWithdrawRoot(ctx *gin.Context) { return } - confirmBlock, err := orm.GetConfirmMsgByNumber(m.db, req.Number) + l2Confirm, err := orm.GetL2ConfirmMsgByNumber(m.db, req.Number) if err != nil { types.RenderJSON(ctx, types.ErrConfirmWithdrawRootByNumber, err, nil) return } - types.RenderJSON(ctx, types.Success, nil, confirmBlock.WithdrawStatus && confirmBlock.DepositStatus) + types.RenderJSON( + ctx, + types.Success, + nil, + l2Confirm.WithdrawRootStatus && l2Confirm.DepositStatus, + ) } diff --git a/internal/controller/interface.go b/internal/controller/interface.go index f90013f..92edde9 100644 --- a/internal/controller/interface.go +++ b/internal/controller/interface.go @@ -3,7 +3,13 @@ package controller // WatcherAPI watcher api, relate to l1watcher and l2watcher. type WatcherAPI interface { IsReady() bool - StartNumber() uint64 + CurrentNumber() uint64 +} + +// L1WatcherAPI watcher api, relate to l1watcher and l2watcher. +type L1WatcherAPI interface { + WatcherAPI + L1StartNumber() uint64 } // MonitorAPI monitor public api, used by l1watcher and l2watcher. diff --git a/internal/controller/l1watcher/l1_watcher.go b/internal/controller/l1watcher/l1_watcher.go index 7391eaf..20679eb 100644 --- a/internal/controller/l1watcher/l1_watcher.go +++ b/internal/controller/l1watcher/l1_watcher.go @@ -14,9 +14,10 @@ import ( "chain-monitor/internal/config" "chain-monitor/internal/orm" + "chain-monitor/internal/utils" ) -var l1BatchSize uint64 = 100 +var l1BatchSize uint64 = 200 // L1Watcher return a new instance of L1Watcher. type L1Watcher struct { @@ -25,24 +26,25 @@ type L1Watcher struct { filter *l1Contracts + isOneByOne bool cacheLen int headerCache []*types.Header - curTime time.Time - startNumber uint64 - safeNumber uint64 + curTime time.Time + currNumber uint64 + safeNumber uint64 db *gorm.DB } // NewL1Watcher create a l1watcher instance. func NewL1Watcher(cfg *config.L1Config, db *gorm.DB) (*L1Watcher, error) { - client, err := ethclient.Dial(cfg.L1ChainURL) + client, err := ethclient.Dial(cfg.L1URL) if err != nil { return nil, err } - contracts, err := newL1Contracts(client, cfg.L1Gateways) + l2Filter, err := newL1Contracts(client, cfg.L1Gateways) if err != nil { return nil, err } @@ -51,24 +53,26 @@ func NewL1Watcher(cfg *config.L1Config, db *gorm.DB) (*L1Watcher, error) { if err != nil { return nil, err } - latestNumber, err := client.BlockNumber(context.Background()) + + // Get confirm number. + number, err := utils.GetLatestConfirmedBlockNumber(context.Background(), client, cfg.Confirm) if err != nil { return nil, err } - watcherClient := &L1Watcher{ + watcher := &L1Watcher{ cfg: cfg, db: db, client: client, - filter: contracts, + filter: l2Filter, cacheLen: 32, headerCache: make([]*types.Header, 0, 32), curTime: time.Now(), - startNumber: mathutil.MaxUint64(l1Block.Number, cfg.StartNumber), - safeNumber: latestNumber - cfg.Confirm, + currNumber: mathutil.MaxUint64(l1Block.Number, cfg.StartNumber), + safeNumber: number, } - return watcherClient, nil + return watcher, nil } // ScanL1Chain scan l1chain entrypoint function. @@ -84,7 +88,8 @@ func (l1 *L1Watcher) ScanL1Chain(ctx context.Context) { var count int // If we sync events one by one. - if start == end { + if l1.isOneByOne || start == end { + l1.isOneByOne = true var header *types.Header header, err = l1.checkReorg(ctx) if err != nil { @@ -111,32 +116,29 @@ func (l1 *L1Watcher) ScanL1Chain(ctx context.Context) { return } } - l1.setStartNumber(end) + l1.setCurrentNumber(end) log.Info("scan l1chain successful", "start", start, "end", end, "event_count", count) } func (l1 *L1Watcher) getStartAndEndNumber(ctx context.Context) (uint64, uint64, error) { var ( - start = l1.StartNumber() + 1 - end = start + l1BatchSize - 1 + start = l1.CurrentNumber() + 1 + end = mathutil.MinUint64(start+l1BatchSize-1, l1.SafeNumber()) ) - safeNumber := l1.SafeNumber() - uint64(l1.cacheLen/2) - if end <= safeNumber { + if start <= end { return start, end, nil } - if start < safeNumber { - return start, safeNumber - 1, nil - } // update latest number curTime := time.Now() if int(curTime.Sub(l1.curTime).Seconds()) >= 5 { - latestNumber, err := l1.client.BlockNumber(ctx) + number, err := utils.GetLatestConfirmedBlockNumber(ctx, l1.client, l1.cfg.Confirm) if err != nil { return 0, 0, err } - l1.setSafeNumber(latestNumber - l1.cfg.Confirm) + number = mathutil.MaxUint64(number, l1.SafeNumber()) + l1.setSafeNumber(number) l1.curTime = curTime } @@ -147,7 +149,7 @@ func (l1 *L1Watcher) getStartAndEndNumber(ctx context.Context) (uint64, uint64, func (l1 *L1Watcher) checkReorg(ctx context.Context) (*types.Header, error) { var number uint64 if len(l1.headerCache) == 0 { - number = l1.StartNumber() + number = l1.CurrentNumber() } else { number = l1.headerCache[len(l1.headerCache)-1].Number.Uint64() } @@ -169,16 +171,12 @@ func (l1 *L1Watcher) checkReorg(ctx context.Context) (*types.Header, error) { if header.ParentHash == latestHeader.Hash() { break } - if header.ParentHash != latestHeader.Hash() { - reorgNumbers = append(reorgNumbers, latestHeader.Number.Uint64()) - l1.headerCache = l1.headerCache[:len(l1.headerCache)-1] - var ( - parentHash = header.ParentHash - ) - header, err = l1.client.HeaderByHash(ctx, parentHash) - if err != nil { - return nil, err - } + // reorg appeared. + reorgNumbers = append(reorgNumbers, latestHeader.Number.Uint64()) + l1.headerCache = l1.headerCache[:len(l1.headerCache)-1] + header, err = l1.client.HeaderByNumber(ctx, latestHeader.Number) + if err != nil { + return nil, err } } diff --git a/internal/controller/l1watcher/l1_watcher_api.go b/internal/controller/l1watcher/l1_watcher_api.go index 6f4449e..cbe6ec4 100644 --- a/internal/controller/l1watcher/l1_watcher_api.go +++ b/internal/controller/l1watcher/l1_watcher_api.go @@ -6,13 +6,18 @@ import ( "chain-monitor/internal/controller" ) -// StartNumber return l1watcher start number. -func (l1 *L1Watcher) StartNumber() uint64 { - return atomic.LoadUint64(&l1.startNumber) +// L1StartNumber returns l1watcher start number. +func (l1 *L1Watcher) L1StartNumber() uint64 { + return l1.cfg.StartNumber } -func (l1 *L1Watcher) setStartNumber(number uint64) { - atomic.StoreUint64(&l1.startNumber, number) +// CurrentNumber return l1watcher start number. +func (l1 *L1Watcher) CurrentNumber() uint64 { + return atomic.LoadUint64(&l1.currNumber) +} + +func (l1 *L1Watcher) setCurrentNumber(number uint64) { + atomic.StoreUint64(&l1.currNumber, number) } // SafeNumber return safe number. @@ -26,7 +31,7 @@ func (l1 *L1Watcher) setSafeNumber(number uint64) { // IsReady if l1watcher is ready return true. func (l1 *L1Watcher) IsReady() bool { - return l1.StartNumber() == l1.SafeNumber() + return l1.CurrentNumber() == l1.SafeNumber() } // SetMonitor sets monitor api. diff --git a/internal/controller/l2watcher/l2_contracts.go b/internal/controller/l2watcher/l2_contracts.go index e76d817..15163c0 100644 --- a/internal/controller/l2watcher/l2_contracts.go +++ b/internal/controller/l2watcher/l2_contracts.go @@ -130,7 +130,7 @@ func newL2Contracts(l2chainURL string, db *gorm.DB, cfg *config.Gateway) (*l2Con } func (l2 *l2Contracts) initWithdraw(db *gorm.DB) error { - tx := db.Where("type = ?", orm.L2SentMessage) + tx := db.Where("type = ? AND msg_proof != ''", orm.L2SentMessage).Order("number DESC") var msg orm.L2MessengerEvent err := tx.Last(&msg).Error if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { diff --git a/internal/controller/l2watcher/l2_messenger.go b/internal/controller/l2watcher/l2_messenger.go index 1a26b2f..b72a2a0 100644 --- a/internal/controller/l2watcher/l2_messenger.go +++ b/internal/controller/l2watcher/l2_messenger.go @@ -6,6 +6,7 @@ import ( "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/log" "chain-monitor/bytecode/scroll/L2" "chain-monitor/internal/orm" @@ -42,25 +43,27 @@ func (l2 *l2Contracts) storeMessengerEvents(ctx context.Context, start, end uint // Calculate withdraw root. var ( - chainMonitors = make([]*orm.ChainConfirm, 0, end-start+1) + chainMonitors = make([]*orm.L2ChainConfirm, 0, end-start+1) msgSentEvents []*orm.L2MessengerEvent - latestProof []byte ) for number := start; number <= end; number++ { if l2.msgSentEvents[number] == nil { - chainMonitors = append(chainMonitors, &orm.ChainConfirm{ - Number: number, - WithdrawStatus: true, + chainMonitors = append(chainMonitors, &orm.L2ChainConfirm{ + Number: number, + WithdrawRootStatus: true, }) continue } msgs := l2.msgSentEvents[number] for i, msg := range msgs { proofs := l2.withdraw.AppendMessages([]common.Hash{common.HexToHash(msg.MsgHash)}) - latestProof = proofs[len(proofs)-1] + // Store the latest one for every block. + if i == len(msgs)-1 { + msg.MsgProof = common.Bytes2Hex(proofs[0]) + } msgSentEvents = append(msgSentEvents, msgs[i]) } - chainMonitors = append(chainMonitors, &orm.ChainConfirm{ + chainMonitors = append(chainMonitors, &orm.L2ChainConfirm{ Number: number, WithdrawRoot: l2.withdraw.MessageRoot(), }) @@ -71,8 +74,6 @@ func (l2 *l2Contracts) storeMessengerEvents(ctx context.Context, start, end uint return err } - // Just store the latest proof. - msgSentEvents[len(msgSentEvents)-1].MsgProof = common.Bytes2Hex(latestProof) // Store messenger events. if err := l2.tx.Model(&orm.L2MessengerEvent{}).Save(msgSentEvents).Error; err != nil { return err @@ -80,14 +81,14 @@ func (l2 *l2Contracts) storeMessengerEvents(ctx context.Context, start, end uint return nil } -func (l2 *l2Contracts) storeWithdrawRoots(ctx context.Context, chainMonitors []*orm.ChainConfirm) error { +func (l2 *l2Contracts) storeWithdrawRoots(ctx context.Context, chainMonitors []*orm.L2ChainConfirm) error { var ( numbers []uint64 withdrawRoots []common.Hash err error ) for _, monitor := range chainMonitors { - if !monitor.WithdrawStatus { + if !monitor.WithdrawRootStatus { numbers = append(numbers, monitor.Number) } } @@ -105,24 +106,25 @@ func (l2 *l2Contracts) storeWithdrawRoots(ctx context.Context, chainMonitors []* if len(withdrawRoots) == 0 { break } - if monitor.WithdrawStatus { + if monitor.WithdrawRootStatus { continue } expectRoot := withdrawRoots[0] withdrawRoots = withdrawRoots[1:] - monitor.WithdrawStatus = monitor.WithdrawRoot == expectRoot + monitor.WithdrawRootStatus = monitor.WithdrawRoot == expectRoot // If the withdraw root doesn't match, alert it. - if !monitor.WithdrawStatus { + if !monitor.WithdrawRootStatus { msg := fmt.Sprintf( "withdraw root doestn't match, l2_number: %d, expect_withdraw_root: %s, actual_withdraw_root: %s", monitor.Number, expectRoot.String(), monitor.WithdrawRoot.String(), ) + log.Error("withdraw root doesn't match", "number", monitor.Number, "expect_root", expectRoot.String(), "actual_root", monitor.WithdrawRoot.String()) go l2.monitorAPI.SlackNotify(msg) } } - if err = l2.tx.Model(&orm.ChainConfirm{}).Save(chainMonitors).Error; err != nil { + if err = l2.tx.Model(&orm.L2ChainConfirm{}).Save(chainMonitors).Error; err != nil { return err } return nil diff --git a/internal/controller/l2watcher/l2_watcher.go b/internal/controller/l2watcher/l2_watcher.go index e72fa0a..0436918 100644 --- a/internal/controller/l2watcher/l2_watcher.go +++ b/internal/controller/l2watcher/l2_watcher.go @@ -2,16 +2,19 @@ package l2watcher import ( "context" + "fmt" "math/big" "time" - "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/types" "github.com/scroll-tech/go-ethereum/ethclient" "github.com/scroll-tech/go-ethereum/log" "gorm.io/gorm" + "modernc.org/mathutil" "chain-monitor/internal/config" "chain-monitor/internal/orm" + "chain-monitor/internal/utils" ) var l2BatchSize uint64 = 500 @@ -23,16 +26,20 @@ type L2Watcher struct { filter *l2Contracts - curTime time.Time - startNumber uint64 - safeNumber uint64 + isOneByOne bool + cacheLen int + headerCache []*types.Header + + curTime time.Time + currNumber uint64 + safeNumber uint64 db *gorm.DB } // NewL2Watcher initializes a new L2Watcher with the given L2 configuration and database connection. func NewL2Watcher(cfg *config.L2Config, db *gorm.DB) (*L2Watcher, error) { - client, err := ethclient.Dial(cfg.L2ChainURL) + client, err := ethclient.Dial(cfg.L2URL) if err != nil { return nil, err } @@ -41,13 +48,15 @@ func NewL2Watcher(cfg *config.L2Config, db *gorm.DB) (*L2Watcher, error) { if err != nil { return nil, err } - latestNumber, err := client.BlockNumber(context.Background()) + + // Get confirm number. + number, err := utils.GetLatestConfirmedBlockNumber(context.Background(), client, cfg.Confirm) if err != nil { return nil, err } // Create a event filter instance. - l1Contracts, err := newL2Contracts(cfg.L2ChainURL, db, cfg.L2gateways) + l2Filter, err := newL2Contracts(cfg.L2URL, db, cfg.L2Gateways) if err != nil { return nil, err } @@ -56,28 +65,17 @@ func NewL2Watcher(cfg *config.L2Config, db *gorm.DB) (*L2Watcher, error) { cfg: cfg, db: db, client: client, - filter: l1Contracts, + filter: l2Filter, + cacheLen: 32, + headerCache: make([]*types.Header, 0, 32), curTime: time.Now(), - startNumber: l2Block.Number, - safeNumber: latestNumber - cfg.Confirm, + currNumber: l2Block.Number, + safeNumber: number, } return watcher, nil } -// WithdrawRoot fetches the root hash of withdrawal data from the storage of the MessageQueue contract. -func (l2 *L2Watcher) WithdrawRoot(ctx context.Context, number uint64) (common.Hash, error) { - data, err := l2.client.StorageAt( - ctx, - l2.cfg.L2gateways.MessageQueue, - common.BigToHash(big.NewInt(0)), big.NewInt(0).SetUint64(number), - ) - if err != nil { - return common.Hash{}, err - } - return common.BytesToHash(data), nil -} - // ScanL2Chain scans a range of blocks on the L2 chain for events. func (l2 *L2Watcher) ScanL2Chain(ctx context.Context) { start, end, err := l2.getStartAndEndNumber(ctx) @@ -89,38 +87,129 @@ func (l2 *L2Watcher) ScanL2Chain(ctx context.Context) { return } - cts := l2.filter - count, err := cts.ParseL2Events(ctx, l2.db, start, end) - if err != nil { - log.Error("failed to parse l2chain events", "start", start, "end", end, "err", err) - return + var count int + if l2.isOneByOne || start == end { + l2.isOneByOne = true + var header *types.Header + header, err = l2.checkReorg(ctx) + if err != nil { + log.Error("appear error when do l2chain reorg process", "number", start, "err", err) + return + } + start = header.Number.Uint64() + end = start + count, err = l2.filter.ParseL2Events(ctx, l2.db, start, end) + if err != nil { + log.Error("failed to parse l2chain events", "start", start, "end", end, "err", err) + return + } + // append block header + if len(l2.headerCache) >= l2.cacheLen { + l2.headerCache = l2.headerCache[:l2.cacheLen-1] + } + l2.headerCache = append(l2.headerCache, header) + } else { + count, err = l2.filter.ParseL2Events(ctx, l2.db, start, end) + if err != nil { + log.Error("failed to parse l2chain events", "start", start, "end", end, "err", err) + return + } } + l2.setCurrentNumber(end) - l2.setStartNumber(end) log.Info("scan l2chain successful", "start", start, "end", end, "event_count", count) } func (l2 *L2Watcher) getStartAndEndNumber(ctx context.Context) (uint64, uint64, error) { var ( - start = l2.StartNumber() + 1 - end = start + l2BatchSize - 1 + start = l2.CurrentNumber() + 1 + end = mathutil.MinUint64(start+l2BatchSize-1, l2.SafeNumber()) ) - - if end <= l2.SafeNumber() { + if start <= end { return start, end, nil } - if start < l2.SafeNumber() { - return start, l2.SafeNumber() - 1, nil - } curTime := time.Now() if int(curTime.Sub(l2.curTime).Seconds()) >= 5 { - latestNumber, err := l2.client.BlockNumber(ctx) + number, err := utils.GetLatestConfirmedBlockNumber(ctx, l2.client, l2.cfg.Confirm) if err != nil { return 0, 0, err } - l2.setSafeNumber(latestNumber - l2.cfg.Confirm) + number = mathutil.MaxUint64(number, l2.SafeNumber()) + l2.setSafeNumber(number) l2.curTime = curTime } return start, start, nil } + +func (l2 *L2Watcher) checkReorg(ctx context.Context) (*types.Header, error) { + var number uint64 + if len(l2.headerCache) == 0 { + number = l2.CurrentNumber() + } else { + number = l2.headerCache[len(l2.headerCache)-1].Number.Uint64() + } + number++ + + header, err := l2.client.HeaderByNumber(ctx, big.NewInt(0).SetUint64(number)) + if err != nil { + return nil, err + } + if len(l2.headerCache) == 0 { + return header, nil + } + + var reorgNumbers []uint64 + for len(l2.headerCache) > 0 { + latestHeader := l2.headerCache[len(l2.headerCache)-1] + if header.ParentHash == latestHeader.Hash() { + break + } + // reorg appeared. + reorgNumbers = append(reorgNumbers, latestHeader.Number.Uint64()) + l2.headerCache = l2.headerCache[:len(l2.headerCache)-1] + header, err = l2.client.HeaderByNumber(ctx, latestHeader.Number) + if err != nil { + return nil, err + } + } + + // TODO: A deeper rollback is required + if len(l2.headerCache) == 0 { + panic(fmt.Errorf("l2chain reorged too deep")) + } + + // Reorg stored events if the reorg headers is not empty. + if err = deleteReorgEvents(ctx, l2.db, reorgNumbers); err != nil { + return nil, err + } + + return header, nil +} + +func deleteReorgEvents(ctx context.Context, db *gorm.DB, numbers []uint64) error { + if len(numbers) == 0 { + return nil + } + + var ( + start = numbers[0] + end = numbers[len(numbers)-1] + tables = orm.L2Tables + result *gorm.DB + ) + tx := db.Begin().WithContext(ctx) + for _, tb := range tables { + // delete eth events. + result = tx.Where("number BETWEEN ? AND ?", start, end).Delete(tb) + if result.Error != nil { + tx.Rollback() + return result.Error + } + } + if err := tx.Commit().Error; err != nil { + tx.Rollback() + return err + } + return nil +} diff --git a/internal/controller/l2watcher/l2_watcher_api.go b/internal/controller/l2watcher/l2_watcher_api.go index d1bb2eb..53f9fb3 100644 --- a/internal/controller/l2watcher/l2_watcher_api.go +++ b/internal/controller/l2watcher/l2_watcher_api.go @@ -6,14 +6,14 @@ import ( "chain-monitor/internal/controller" ) -// StartNumber retrieves the current starting block number +// CurrentNumber retrieves the current starting block number // that the L2Watcher is tracking. -func (l2 *L2Watcher) StartNumber() uint64 { - return atomic.LoadUint64(&l2.startNumber) +func (l2 *L2Watcher) CurrentNumber() uint64 { + return atomic.LoadUint64(&l2.currNumber) } -func (l2 *L2Watcher) setStartNumber(number uint64) { - atomic.StoreUint64(&l2.startNumber, number) +func (l2 *L2Watcher) setCurrentNumber(number uint64) { + atomic.StoreUint64(&l2.currNumber, number) } // SafeNumber retrieves the current safe block number @@ -29,7 +29,7 @@ func (l2 *L2Watcher) setSafeNumber(number uint64) { // IsReady checks whether the L2Watcher is ready. It's considered ready // when the starting block number matches the safe block number. func (l2 *L2Watcher) IsReady() bool { - return l2.StartNumber() == l2.SafeNumber() + return l2.CurrentNumber() == l2.SafeNumber() } // SetMonitor sets the monitoring API for the L2Watcher. diff --git a/internal/controller/monitor/chain_monitor.go b/internal/controller/monitor/chain_monitor.go index 016572e..8b86e5c 100644 --- a/internal/controller/monitor/chain_monitor.go +++ b/internal/controller/monitor/chain_monitor.go @@ -1,7 +1,6 @@ package monitor import ( - "context" "encoding/json" "time" @@ -23,43 +22,31 @@ type ChainMonitor struct { notifyCli *resty.Client - l1watcher controller.WatcherAPI + l1watcher controller.L1WatcherAPI l2watcher controller.WatcherAPI - startNumber uint64 - safeNumber uint64 -} - -// SlackNotify sends an alert message to a Slack channel. -func (ch *ChainMonitor) SlackNotify(msg string) { - if ch.cfg.WebhookURL == "" { - return - } - hookContent := map[string]string{ - "channel": ch.cfg.Channel, - "username": ch.cfg.UserName, - "text": msg, - } - data, err := json.Marshal(hookContent) - if err != nil { - log.Error("failed to marshal hook content", "err", err) - return - } + // Used for deposit confirm loop. + depositStartNumber uint64 + depositSafeNumber uint64 - request := ch.notifyCli.R().SetHeader("Content-Type", "application/x-www-form-urlencoded") - request = request.SetFormData(map[string]string{"payload": string(data)}) - _, err = request.Post(ch.cfg.WebhookURL) - if err != nil { - log.Error("appear error when send slack message", "err", err) - } + // Used for withdraw confirm loop. + withdrawStartNumber uint64 + withdrawSafeNumber uint64 } // NewChainMonitor initializes a new instance of the ChainMonitor. -func NewChainMonitor(cfg *config.SlackWebhookConfig, db *gorm.DB, l1Watcher, l2Watcher controller.WatcherAPI) (*ChainMonitor, error) { - startNumber, err := orm.GetLatestConfirmedNumber(db) +func NewChainMonitor(cfg *config.SlackWebhookConfig, db *gorm.DB, l1Watcher controller.L1WatcherAPI, l2Watcher controller.WatcherAPI) (*ChainMonitor, error) { + depositStartNumber, err := orm.GetL2DepositNumber(db) + if err != nil { + return nil, err + } + withdrawStartNumber, err := orm.GetL1WithdrawNumber(db) if err != nil { return nil, err } + if withdrawStartNumber == 0 { + withdrawStartNumber = l1Watcher.L1StartNumber() + } // Use resty and init it. cli := resty.New() @@ -67,83 +54,37 @@ func NewChainMonitor(cfg *config.SlackWebhookConfig, db *gorm.DB, l1Watcher, l2W cli.SetTimeout(time.Second * 3) monitor := &ChainMonitor{ - cfg: cfg, - db: db, - notifyCli: cli, - startNumber: startNumber, - l1watcher: l1Watcher, - l2watcher: l2Watcher, + cfg: cfg, + db: db, + notifyCli: cli, + depositStartNumber: depositStartNumber, + withdrawStartNumber: withdrawStartNumber, + l1watcher: l1Watcher, + l2watcher: l2Watcher, } return monitor, nil } -// ChainMonitor monitors the blockchain and confirms the deposit events. -func (ch *ChainMonitor) ChainMonitor(ctx context.Context) { - // Make sure the l1Watcher is ready to use. - if !ch.l1watcher.IsReady() { - log.Debug("l1watcher is not ready, sleep 3 seconds") - time.Sleep(time.Second * 5) - return - } - start, end := ch.getStartAndEndNumber() - if end > ch.safeNumber { - log.Debug("l2watcher is not ready", "l2_start_number", ch.safeNumber) - time.Sleep(time.Second * 3) +// SlackNotify sends an alert message to a Slack channel. +func (ch *ChainMonitor) SlackNotify(msg string) { + if ch.cfg.WebhookURL == "" { return } - - // Make sure scan number is ready. - l2Number := ch.l2watcher.StartNumber() - if l2Number <= ch.startNumber { - log.Debug("l2watcher is not ready", "l2_start_number", l2Number) - time.Sleep(time.Second * 3) - return + hookContent := map[string]string{ + "channel": ch.cfg.Channel, + "username": ch.cfg.UserName, + "text": msg, } - - err := ch.db.Transaction(func(db *gorm.DB) error { - // confirm deposit events. - failedNumbers, err := ch.confirmDepositEvents(ctx, db, start, end) - if err != nil { - return err - } - // store - sTx := db.Model(&orm.ChainConfirm{}).Select("deposit_status", "confirm").Where("number BETWEEN ? AND ?", start, end) - sTx = sTx.Update("deposit_status", true).Update("confirm", true) - if sTx.Error != nil { - return sTx.Error - } - - if len(failedNumbers) > 0 { - fTx := db.Model(&orm.ChainConfirm{}).Select("deposit_status", "confirm").Where("number in ?", failedNumbers) - fTx = fTx.Update("deposit_status", false).Update("confirm", true) - if fTx.Error != nil { - return fTx.Error - } - } - - return nil - }) + data, err := json.Marshal(hookContent) if err != nil { - log.Error("failed to check deposit events", "start", start, "end", end, "err", err) - time.Sleep(time.Second * 10) + log.Error("failed to marshal hook content", "err", err) return } - ch.startNumber = end - - log.Info("confirm l2 blocks", "start", start, "end", end) -} -func (ch *ChainMonitor) getStartAndEndNumber() (uint64, uint64) { - var ( - start = ch.startNumber + 1 - end = start + batchSize - 1 - ) - ch.safeNumber = ch.l2watcher.StartNumber() - if end < ch.safeNumber { - return start, end - } - if start < ch.safeNumber { - return start, ch.safeNumber - 1 + request := ch.notifyCli.R().SetHeader("Content-Type", "application/x-www-form-urlencoded") + request = request.SetFormData(map[string]string{"payload": string(data)}) + _, err = request.Post(ch.cfg.WebhookURL) + if err != nil { + log.Error("appear error when send slack message", "err", err) } - return start, start } diff --git a/internal/controller/monitor/monitor_test.go b/internal/controller/monitor/chain_monitor_test.go similarity index 100% rename from internal/controller/monitor/monitor_test.go rename to internal/controller/monitor/chain_monitor_test.go diff --git a/internal/controller/monitor/common.go b/internal/controller/monitor/common.go new file mode 100644 index 0000000..8690781 --- /dev/null +++ b/internal/controller/monitor/common.go @@ -0,0 +1,14 @@ +package monitor + +type msgEvents struct { + L2Number uint64 `gorm:"l2_number"` + + L1TxHash string `gorm:"l1_tx_hash"` + L2TxHash string `gorm:"l2_tx_hash"` + + // asset fields + L1Amount string `gorm:"l1_amount"` + L2Amount string `gorm:"l2_amount"` + L1TokenID string `gorm:"l1_token_id"` + L2TokenID string `gorm:"l2_token_id"` +} diff --git a/internal/controller/monitor/depositconfirm.go b/internal/controller/monitor/deposit_confirm.go similarity index 55% rename from internal/controller/monitor/depositconfirm.go rename to internal/controller/monitor/deposit_confirm.go index 3efb29c..bc74c21 100644 --- a/internal/controller/monitor/depositconfirm.go +++ b/internal/controller/monitor/deposit_confirm.go @@ -3,43 +3,102 @@ package monitor import ( "context" "fmt" + "time" "github.com/scroll-tech/go-ethereum/log" "gorm.io/gorm" + "modernc.org/mathutil" "chain-monitor/internal/orm" ) -type msgEvents struct { - L2Number uint64 `gorm:"l2_number"` +var ( + l2ethSQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount +from l2_eth_events as l2ee full join l1_eth_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` + l2erc20SQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount +from l2_erc20_events as l2ee full join l1_erc20_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l2ee.number BETWEEN ? AND ? and l2ee.type in (?, ?, ?, ?);` + l2erc721SQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.token_id as l1_token_id, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.token_id as l2_token_id +from l2_erc721_events as l2ee full join l1_erc721_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` + l2erc1155SQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, l1ee.token_id as l1_token_id, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount, l2ee.token_id as l2_token_id +from l2_erc1155_events as l2ee full join l1_erc1155_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` +) + +// DepositConfirm monitors the blockchain and confirms the deposit events. +func (ch *ChainMonitor) DepositConfirm(ctx context.Context) { + // Make sure the l1Watcher is ready to use. + if !ch.l1watcher.IsReady() { + log.Debug("l1watcher is not ready, sleep 3 seconds") + time.Sleep(time.Second * 3) + return + } + start, end := ch.getDepositStartAndEndNumber() + if end > ch.depositSafeNumber { + log.Debug("l2watcher is not ready", "l2_start_number", ch.depositSafeNumber) + time.Sleep(time.Second * 3) + return + } + // Get unmatched deposit + failedNumbers, err := ch.confirmDepositEvents(ctx, start, end) + if err != nil { - L1TxHash string `gorm:"l1_tx_hash"` - L2TxHash string `gorm:"l2_tx_hash"` + } + err = ch.db.Transaction(func(tx *gorm.DB) error { - // asset fields - L1Amount string `gorm:"l1_amount"` - L2Amount string `gorm:"l2_amount"` - L1TokenID string `gorm:"l1_token_id"` - L2TokenID string `gorm:"l2_token_id"` -} + // Update deposit records. + sTx := tx.Model(&orm.L2ChainConfirm{}).Select("deposit_status", "confirm"). + Where("number BETWEEN ? AND ?", start, end) + sTx = sTx.Update("deposit_status", true).Update("confirm", true) + if sTx.Error != nil { + return sTx.Error + } + + if len(failedNumbers) > 0 { + fTx := tx.Model(&orm.L2ChainConfirm{}).Select("deposit_status"). + Where("number in ?", failedNumbers) + fTx = fTx.Update("deposit_status", false) + if fTx.Error != nil { + return fTx.Error + } + } + + return nil + }) + if err != nil { + log.Error("failed to check deposit events", "start", start, "end", end, "err", err) + time.Sleep(time.Second * 5) + return + } + ch.depositStartNumber = end -func (ch *ChainMonitor) confirmDepositEvents(ctx context.Context, db *gorm.DB, start, end uint64) ([]uint64, error) { - db = db.WithContext(ctx) + log.Info("confirm layer2 deposit transactions", "start", start, "end", end) +} +func (ch *ChainMonitor) confirmDepositEvents(ctx context.Context, start, end uint64) ([]uint64, error) { var ( + db = ch.db.WithContext(ctx) failedNumbers []uint64 flagNumbers = map[uint64]bool{} ) // check eth events. var ethEvents []msgEvents - sql := `select - l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, - l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount -from l2_eth_events as l2ee full join l1_eth_events as l1ee - on l1ee.msg_hash = l2ee.msg_hash -where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` - db = db.Raw(sql, start, end, orm.L2FinalizeDepositETH) + db = db.Raw(l2ethSQL, start, end, orm.L2FinalizeDepositETH) if err := db.Scan(ðEvents).Error; err != nil { return nil, err } @@ -52,19 +111,13 @@ where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` } // If eth msg don't match, alert it. go ch.SlackNotify(fmt.Sprintf("deposit eth don't match, message: %v", msg)) - log.Error("the eth deposit hash or amount don't match", "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) + log.Error("the eth deposit count or amount don't match", "start", start, "end", end, "event_type", orm.L2FinalizeDepositETH, "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) } } var erc20Events []msgEvents // check erc20 events. - sql = `select - l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, - l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount -from l2_erc20_events as l2ee full join l1_erc20_events as l1ee - on l1ee.msg_hash = l2ee.msg_hash -where l2ee.number BETWEEN ? AND ? and l2ee.type in (?, ?, ?, ?);` - db = db.Raw(sql, + db = db.Raw(l2erc20SQL, start, end, orm.L2FinalizeDepositDAI, orm.L2FinalizeDepositWETH, @@ -82,19 +135,20 @@ where l2ee.number BETWEEN ? AND ? and l2ee.type in (?, ?, ?, ?);` } // If erc20 msg don't match, alert it. go ch.SlackNotify(fmt.Sprintf("erc20 deposit don't match, message: %v", msg)) - log.Error("the erc20 deposit hash or amount doesn't match", "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) + log.Error( + "the erc20 deposit count or amount doesn't match", + "start", start, + "end", end, + "event_type", []orm.EventType{orm.L2FinalizeDepositDAI, orm.L2FinalizeDepositWETH, orm.L2FinalizeDepositStandardERC20, orm.L2FinalizeDepositCustomERC20}, + "l1_tx_hash", msg.L1TxHash, + "l2_tx_hash", msg.L2TxHash, + ) } } // check erc721 events. var erc721Events []msgEvents - sql = `select - l1ee.tx_hash as l1_tx_hash, l1ee.token_id as l1_token_id, - l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.token_id as l2_token_id -from l2_erc721_events as l2ee full join l1_erc721_events as l1ee - on l1ee.msg_hash = l2ee.msg_hash -where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` - db = db.Raw(sql, start, end, orm.L2FinalizeDepositERC721) + db = db.Raw(l2erc721SQL, start, end, orm.L2FinalizeDepositERC721) if err := db.Scan(&erc721Events).Error; err != nil { return nil, err } @@ -107,19 +161,13 @@ where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` } // If erc721 event don't match, alert it. go ch.SlackNotify(fmt.Sprintf("erc721 event don't match, message: %v", msg)) - log.Error("the erc721 deposit hash or amount doesn't match", "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) + log.Error("the erc721 deposit count or amount doesn't match", "start", start, "end", end, "event_type", orm.L2FinalizeDepositERC721, "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) } } // check erc1155 events. var erc1155Events []msgEvents - sql = `select - l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, l1ee.token_id as l1_token_id, - l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount, l2ee.token_id as l2_token_id -from l2_erc1155_events as l2ee full join l1_erc1155_events as l1ee - on l1ee.msg_hash = l2ee.msg_hash -where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` - db = db.Raw(sql, start, end, orm.L2FinalizeDepositERC1155) + db = db.Raw(l2erc1155SQL, start, end, orm.L2FinalizeDepositERC1155) if err := db.Scan(&erc1155Events).Error; err != nil { return nil, err } @@ -132,9 +180,21 @@ where l2ee.number BETWEEN ? AND ? and l2ee.type = ?;` } // If erc1155 event don't match, alert it. go ch.SlackNotify(fmt.Sprintf("erc1155 event don't match, message: %v", msg)) - log.Error("the erc1155 deposit hash or amount doesn't match", "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) + log.Error("the erc1155 deposit count or amount doesn't match", "start", start, "end", end, "event_type", orm.L2FinalizeDepositERC1155, "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) } } return failedNumbers, nil } + +func (ch *ChainMonitor) getDepositStartAndEndNumber() (uint64, uint64) { + ch.depositSafeNumber = ch.l2watcher.CurrentNumber() + var ( + start = ch.depositStartNumber + 1 + end = mathutil.MinUint64(start+batchSize-1, ch.depositSafeNumber) + ) + if start <= end { + return start, end + } + return start, start +} diff --git a/internal/controller/monitor/withdraw_confirm.go b/internal/controller/monitor/withdraw_confirm.go new file mode 100644 index 0000000..03fe0ac --- /dev/null +++ b/internal/controller/monitor/withdraw_confirm.go @@ -0,0 +1,202 @@ +package monitor + +import ( + "context" + "fmt" + "time" + + "github.com/scroll-tech/go-ethereum/log" + "gorm.io/gorm" + "modernc.org/mathutil" + + "chain-monitor/internal/orm" +) + +var ( + l1ethSQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount +from l2_eth_events as l2ee full join l1_eth_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l1ee.number BETWEEN ? AND ? and l1ee.type = ?;` + l1erc20SQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount +from l2_erc20_events as l2ee full join l1_erc20_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l1ee.number BETWEEN ? AND ? and l1ee.type in (?, ?, ?, ?);` + l1erc721SQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.token_id as l1_token_id, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.token_id as l2_token_id +from l2_erc721_events as l2ee full join l1_erc721_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l1ee.number BETWEEN ? AND ? and l1ee.type = ?;` + l1erc1155SQL = `select + l1ee.tx_hash as l1_tx_hash, l1ee.amount as l1_amount, l1ee.token_id as l1_token_id, + l2ee.tx_hash as l2_tx_hash, l2ee.number as l2_number, l2ee.amount as l2_amount, l2ee.token_id as l2_token_id +from l2_erc1155_events as l2ee full join l1_erc1155_events as l1ee + on l1ee.msg_hash = l2ee.msg_hash +where l1ee.number BETWEEN ? AND ? and l1ee.type = ?;` +) + +// WithdrawConfirm the loop in order to confirm withdraw events. +func (ch *ChainMonitor) WithdrawConfirm(ctx context.Context) { + // Make sure the l2Watcher is ready to use. + if !ch.l2watcher.IsReady() { + log.Debug("l2watcher is not ready, sleep 3 seconds") + time.Sleep(time.Second * 3) + return + } + start, end := ch.getWithdrawStartAndEndNumber() + if end > ch.withdrawSafeNumber { + log.Debug("l1watcher is not ready", "l1_start_number", ch.withdrawSafeNumber) + time.Sleep(time.Second * 3) + return + } + + // Get unmatched withdraw event numbers. + failedNumbers, err := ch.confirmWithdrawEvents(ctx, start, end) + if err != nil { + log.Error("failed to get unmatched withdraw events", "start", start, "end", end, "err", err) + return + } + err = ch.db.Transaction(func(tx *gorm.DB) error { + // Update withdraw records. + sTx := tx.Model(&orm.L1ChainConfirm{}).Select("withdraw_status", "confirm"). + Where("number BETWEEN ? AND ?", start, end) + sTx = sTx.Update("withdraw_status", true).Update("confirm", true) + if sTx.Error != nil { + return sTx.Error + } + + // Update failed withdraw records. + if len(failedNumbers) > 0 { + fTx := tx.Model(&orm.L1ChainConfirm{}).Select("withdraw_status"). + Where("number in ?", failedNumbers) + fTx = fTx.Update("withdraw_status", false) + if fTx.Error != nil { + return fTx.Error + } + } + + return nil + }) + if err != nil { + log.Error("failed to check withdraw events", "start", start, "end", end, "err", err) + time.Sleep(time.Second * 5) + return + } + ch.withdrawStartNumber = end + + log.Info("confirm layer1 withdraw transactions", "start", start, "end", end) +} + +func (ch *ChainMonitor) confirmWithdrawEvents(ctx context.Context, start, end uint64) ([]uint64, error) { + var ( + db = ch.db.WithContext(ctx) + failedNumbers []uint64 + flagNumbers = map[uint64]bool{} + ) + + // check eth events. + var ethEvents []msgEvents + db = db.Raw(l1ethSQL, start, end, orm.L1FinalizeWithdrawETH) + if err := db.Scan(ðEvents).Error; err != nil { + return nil, err + } + for i := 0; i < len(ethEvents); i++ { + msg := ethEvents[i] + if msg.L1Amount != msg.L2Amount { + if !flagNumbers[msg.L2Number] { + flagNumbers[msg.L2Number] = true + failedNumbers = append(failedNumbers, msg.L2Number) + } + // If eth msg don't match, alert it. + go ch.SlackNotify(fmt.Sprintf("eth withdraw doesn't match, message: %v", msg)) + log.Error("the eth withdraw count or amount doesn't match", "start", start, "end", end, "event_type", orm.L1FinalizeWithdrawETH, "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) + } + } + + var erc20Events []msgEvents + // check erc20 events. + db = db.Raw(l1erc20SQL, + start, end, + orm.L1FinalizeWithdrawDAI, + orm.L1FinalizeWithdrawWETH, + orm.L1FinalizeWithdrawStandardERC20, + orm.L1FinalizeWithdrawCustomERC20) + if err := db.Scan(&erc20Events).Error; err != nil { + return nil, err + } + for i := 0; i < len(erc20Events); i++ { + msg := erc20Events[i] + if msg.L1Amount != msg.L2Amount { + if !flagNumbers[msg.L2Number] { + flagNumbers[msg.L2Number] = true + failedNumbers = append(failedNumbers, msg.L2Number) + } + // If erc20 msg don't match, alert it. + go ch.SlackNotify(fmt.Sprintf("erc20 withdraw doesn't match, message: %v", msg)) + log.Error( + "the erc20 withdraw count or amount doesn't match", + "start", start, + "end", end, + "event_type", []orm.EventType{orm.L1FinalizeWithdrawDAI, orm.L1FinalizeWithdrawWETH, orm.L1FinalizeWithdrawStandardERC20, orm.L1FinalizeWithdrawCustomERC20}, + "l1_tx_hash", msg.L1TxHash, + "l2_tx_hash", msg.L2TxHash, + ) + } + } + + // check erc721 events. + var erc721Events []msgEvents + db = db.Raw(l1erc721SQL, start, end, orm.L1FinalizeWithdrawERC721) + if err := db.Scan(&erc721Events).Error; err != nil { + return nil, err + } + for i := 0; i < len(erc721Events); i++ { + msg := erc721Events[i] + if msg.L1TokenID != msg.L2TokenID { + if !flagNumbers[msg.L2Number] { + flagNumbers[msg.L2Number] = true + failedNumbers = append(failedNumbers, msg.L2Number) + } + // If erc721 event don't match, alert it. + go ch.SlackNotify(fmt.Sprintf("erc721 withdraw doesn't match, message: %v", msg)) + log.Error("the erc721 withdraw count or amount doesn't match", "start", start, "end", end, "event_type", orm.L1FinalizeWithdrawERC721, "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) + } + } + + // check erc1155 events. + var erc1155Events []msgEvents + db = db.Raw(l1erc1155SQL, start, end, orm.L1FinalizeWithdrawERC1155) + if err := db.Scan(&erc1155Events).Error; err != nil { + return nil, err + } + for i := 0; i < len(erc1155Events); i++ { + msg := erc1155Events[i] + if msg.L1TokenID != msg.L2TokenID || msg.L1Amount != msg.L2Amount { + if !flagNumbers[msg.L2Number] { + flagNumbers[msg.L2Number] = true + failedNumbers = append(failedNumbers, msg.L2Number) + } + // If erc1155 event don't match, alert it. + go ch.SlackNotify(fmt.Sprintf("erc1155 withdraw doesn't match, message: %v", msg)) + log.Error("the erc1155 withdraw count or amount doesn't match", "start", start, "end", end, "event_type", orm.L1FinalizeWithdrawERC1155, "l1_tx_hash", msg.L1TxHash, "l2_tx_hash", msg.L2TxHash) + } + } + + return failedNumbers, nil +} + +func (ch *ChainMonitor) getWithdrawStartAndEndNumber() (uint64, uint64) { + ch.withdrawSafeNumber = ch.l1watcher.CurrentNumber() + var ( + start = ch.withdrawStartNumber + 1 + end = mathutil.MinUint64(start+batchSize-1, ch.withdrawSafeNumber) + ) + if start <= end { + return start, end + } + return start, start +} diff --git a/internal/orm/messenger.go b/internal/orm/messenger.go index 8cd1fe3..b670811 100644 --- a/internal/orm/messenger.go +++ b/internal/orm/messenger.go @@ -40,8 +40,8 @@ func SaveL1Messenger(db *gorm.DB, eventType EventType, vLog *types.Log, msgHash type L2MessengerEvent struct { Number uint64 `gorm:"index; comment: block number"` Type EventType `gorm:"index; comment: tx type"` - MsgNonce uint64 `gorm:"primaryKey"` - MsgHash string `gorm:"index"` + MsgNonce uint64 `gorm:"type: msg_nonce"` + MsgHash string `gorm:"primaryKey"` MsgProof string Confirm bool `gorm:"index"` } diff --git a/internal/orm/migrate.go b/internal/orm/migrate.go index 64d7afd..c42a660 100644 --- a/internal/orm/migrate.go +++ b/internal/orm/migrate.go @@ -27,7 +27,7 @@ var ( &L2MessengerEvent{}, } tables = []interface{}{ - &ChainConfirm{}, + &L2ChainConfirm{}, } once sync.Once ) diff --git a/internal/orm/migrate/migrate.go b/internal/orm/migrate/migrate.go index 9240b2d..77a2c5f 100644 --- a/internal/orm/migrate/migrate.go +++ b/internal/orm/migrate/migrate.go @@ -18,7 +18,7 @@ const MigrationsDir string = "migrations" func init() { goose.SetBaseFS(embedMigrations) goose.SetSequential(true) - goose.SetTableName("bridge_history_migrations") + goose.SetTableName("chain_monitor_migrations") verbose, _ := strconv.ParseBool(os.Getenv("LOG_SQL_MIGRATIONS")) goose.SetVerbose(verbose) diff --git a/internal/orm/migrate/migrations/000015_l1_chain_confirms.sql b/internal/orm/migrate/migrations/000015_l1_chain_confirms.sql new file mode 100644 index 0000000..0e2f3d7 --- /dev/null +++ b/internal/orm/migrate/migrations/000015_l1_chain_confirms.sql @@ -0,0 +1,15 @@ +-- +goose Up +-- +goose L1ChainConfirmBegin +create table l1_chain_confirms +( + number bigserial + primary key, + withdraw_status boolean, + confirm boolean +); +-- +goose L1ChainConfirmEnd + +-- +goose Down +-- +goose L1ChainConfirmBegin +drop table if exists l1_chain_confirms; +-- +goose L1ChainConfirmEnd diff --git a/internal/orm/migrate/migrations/000016_upgrade_chain_confirms.sql b/internal/orm/migrate/migrations/000016_upgrade_chain_confirms.sql new file mode 100644 index 0000000..9c2125a --- /dev/null +++ b/internal/orm/migrate/migrations/000016_upgrade_chain_confirms.sql @@ -0,0 +1,15 @@ +-- +goose Up +-- +goose UpgradeChainMonitorBegin +alter table chain_confirms + rename to l2_chain_confirms; +alter table l2_chain_confirms + rename column withdraw_status to withdraw_root_status; +-- +goose UpgradeChainMonitorEnd + +-- +goose Down +-- +goose UpgradeChainMonitorBegin +alter table l2_chain_confirms + rename to chain_confirms; +alter table chain_confirms + rename column withdraw_root_status to withdraw_status; +-- +goose UpgradeChainMonitorEnd diff --git a/internal/orm/migrate/migrations/000017_change_messenger_primary_key.sql b/internal/orm/migrate/migrations/000017_change_messenger_primary_key.sql new file mode 100644 index 0000000..b9d31be --- /dev/null +++ b/internal/orm/migrate/migrations/000017_change_messenger_primary_key.sql @@ -0,0 +1,15 @@ +-- +goose Up +-- +goose L2ScrollMessengerBegin +alter table l2_messenger_events +drop constraint l2_messenger_events_pkey; +alter table l2_messenger_events + add primary key (msg_hash); +-- +goose L2ScrollMessengerEnd + +-- +goose Down +-- +goose L2ScrollMessengerBegin +alter table l2_messenger_events +drop constraint l2_messenger_events_pkey; +alter table l2_messenger_events + add primary key (msg_nonce); +-- +goose L2ScrollMessengerEnd diff --git a/internal/orm/monitor.go b/internal/orm/monitor.go index aab7dda..b1bc7e6 100644 --- a/internal/orm/monitor.go +++ b/internal/orm/monitor.go @@ -5,29 +5,48 @@ import ( "gorm.io/gorm" ) -// ChainConfirm represents the confirmation status of various events in the blockchain. +// L1ChainConfirm represents the confirmation status of various events in the blockchain. // It keeps track of the confirmation status for deposits, withdrawals, and overall confirmations for a given block number. -type ChainConfirm struct { - Number uint64 `gorm:"primaryKey"` - WithdrawRoot common.Hash `gorm:"-"` - WithdrawStatus bool - DepositStatus bool - Confirm bool +type L1ChainConfirm struct { + Number uint64 `gorm:"primaryKey"` + WithdrawStatus bool `gorm:"type: withdraw_status"` + Confirm bool `gorm:"type: confirm"` } -// GetLatestConfirmedNumber retrieves the latest block number with confirmation status set to true. -func GetLatestConfirmedNumber(db *gorm.DB) (uint64, error) { - var monitor ChainConfirm - err := db.Where("confirm = true").Last(&monitor).Error +// L2ChainConfirm represents the confirmation status of various events in the blockchain. +// It keeps track of the confirmation status for deposits, withdrawals, and overall confirmations for a given block number. +type L2ChainConfirm struct { + Number uint64 `gorm:"primaryKey"` + WithdrawRoot common.Hash `gorm:"-"` + WithdrawRootStatus bool `gorm:"type: withdraw_status"` + + DepositStatus bool `gorm:"type: deposit_status"` + Confirm bool `gorm:"type: confirm"` +} + +// GetL2DepositNumber retrieves the latest block number with confirmation status set to true. +func GetL2DepositNumber(db *gorm.DB) (uint64, error) { + var monitor L2ChainConfirm + err := db.Model(&L2ChainConfirm{}).Where("confirm = true").Last(&monitor).Error + if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { + return 0, err + } + return monitor.Number, nil +} + +// GetL1WithdrawNumber retrieves the latest block number with confirmation status set to true. +func GetL1WithdrawNumber(db *gorm.DB) (uint64, error) { + var monitor L1ChainConfirm + err := db.Model(&L1ChainConfirm{}).Where("confirm = true").Last(&monitor).Error if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { return 0, err } return monitor.Number, nil } -// GetConfirmMsgByNumber fetches the ChainConfirm message for a given block number. -func GetConfirmMsgByNumber(db *gorm.DB, number uint64) (*ChainConfirm, error) { - var confirmBatch ChainConfirm +// GetL2ConfirmMsgByNumber fetches the L2ChainConfirm message for a given block number. +func GetL2ConfirmMsgByNumber(db *gorm.DB, number uint64) (*L2ChainConfirm, error) { + var confirmBatch L2ChainConfirm res := db.Where("number = ?", number).First(&confirmBatch) return &confirmBatch, res.Error } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index f8555ba..d5a8e36 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "context" + "fmt" "math/big" "time" @@ -58,6 +59,47 @@ func ComputeMessageHash(ABI *abi.ABI, return common.BytesToHash(crypto.Keccak256(data)) } +// GetLatestConfirmedBlockNumber get confirmed block number by rpc.BlockNumber type. +func GetLatestConfirmedBlockNumber(ctx context.Context, client *ethclient.Client, confirm rpc.BlockNumber) (uint64, error) { + switch true { + case confirm == rpc.SafeBlockNumber || confirm == rpc.FinalizedBlockNumber: + var tag *big.Int + if confirm == rpc.FinalizedBlockNumber { + tag = big.NewInt(int64(rpc.FinalizedBlockNumber)) + } else { + tag = big.NewInt(int64(rpc.SafeBlockNumber)) + } + + header, err := client.HeaderByNumber(ctx, tag) + if err != nil { + return 0, err + } + if !header.Number.IsInt64() { + return 0, fmt.Errorf("received invalid block confirm: %v", header.Number) + } + return header.Number.Uint64(), nil + case confirm == rpc.LatestBlockNumber: + number, err := client.BlockNumber(ctx) + if err != nil { + return 0, err + } + return number, nil + case confirm.Int64() >= 0: // If it's positive integer, consider it as a certain confirm value. + number, err := client.BlockNumber(ctx) + if err != nil { + return 0, err + } + cfmNum := uint64(confirm.Int64()) + + if number >= cfmNum { + return number - cfmNum, nil + } + return 0, nil + default: + return 0, fmt.Errorf("unknown confirmation type: %v", confirm) + } +} + func toBlockNumArg(number *big.Int) string { if number == nil { return "latest"