diff --git a/docs/sql/postgres.sql b/docs/sql/postgres.sql index b08c60f..2d30d6c 100644 --- a/docs/sql/postgres.sql +++ b/docs/sql/postgres.sql @@ -24,6 +24,29 @@ CREATE INDEX if not exists status_index ON sync_blocks (status); CREATE INDEX if not exists tx_count_index ON sync_blocks (tx_count); CREATE INDEX if not exists check_count_index ON sync_blocks (check_count); +-- Create sync_events table +DROP TABLE sync_events; +CREATE TABLE IF NOT EXISTS sync_events +( + id SERIAL PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + sync_block_id BIGINT NOT NULL, + blockchain VARCHAR(32) NOT NULL, + block_time BIGINT NOT NULL, + block_number BIGINT NOT NULL, + block_hash VARCHAR(66) NOT NULL, + block_log_indexed BIGINT NOT NULL, + tx_index BIGINT NOT NULL, + tx_hash VARCHAR(66) NOT NULL, + event_name VARCHAR(32) NOT NULL, + event_hash VARCHAR(66) NOT NULL, + contract_address VARCHAR(42) NOT NULL, + data JSONB NOT NULL, + status VARCHAR(32) NOT NULL, + retry_count BIGINT DEFAULT 0 +); + -- ---------------------------- -- Table structure for dispute_game -- ---------------------------- @@ -49,6 +72,7 @@ CREATE TABLE IF NOT EXISTS dispute_game l2_block_number bigint NOT NULL, status int NOT NULL ); +CREATE INDEX if not exists dispute_game_index ON dispute_game (contract_address, game_contract); -- ---------------------------- -- Table structure for game_claim_data @@ -68,4 +92,5 @@ CREATE TABLE IF NOT EXISTS game_claim_data claim varchar(64) NOT NULL, position bigint NOT NULL, clock bigint NOT NULL -) +); +CREATE INDEX if not exists dispute_game_data_index ON game_claim_data (game_contract, data_index); diff --git a/internal/blockchain/blockchain_test.go b/internal/blockchain/blockchain_test.go index e7f9363..f3fad1e 100644 --- a/internal/blockchain/blockchain_test.go +++ b/internal/blockchain/blockchain_test.go @@ -8,10 +8,10 @@ import ( func TestRemoveContract(t *testing.T) { contracts = GetContracts() fmt.Println(GetContracts()) - AddContract("BB") + AddContract("0x05F9613aDB30026FFd634f38e5C4dFd30a197Ba1") AddContract("CC") fmt.Println(GetContracts()) - RemoveContract("AA") + RemoveContract("0x05f9613adB30026FFd634f38e5C4dFd30a197ba1") RemoveContract("BB") RemoveContract("CC") fmt.Println(GetContracts()) diff --git a/internal/handler/disputeGame.go b/internal/handler/disputeGame.go index 4f30d58..db66316 100644 --- a/internal/handler/disputeGame.go +++ b/internal/handler/disputeGame.go @@ -1,109 +1,148 @@ package handler import ( + "context" "encoding/hex" "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/optimism-java/dispute-explorer/pkg/log" + "golang.org/x/time/rate" "math/big" + "strings" "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" "github.com/optimism-java/dispute-explorer/internal/blockchain" "github.com/optimism-java/dispute-explorer/internal/schema" - "github.com/optimism-java/dispute-explorer/internal/svc" "github.com/optimism-java/dispute-explorer/pkg/contract" "github.com/optimism-java/dispute-explorer/pkg/event" - "github.com/optimism-java/dispute-explorer/pkg/log" - "golang.org/x/time/rate" "gorm.io/gorm" ) -func BatchFilterAddAndRemove(ctx *svc.ServiceContext, events []*schema.SyncEvent) error { - for _, evt := range events { - err := filterAddAndRemove(ctx, evt) - if err != nil { - return fmt.Errorf("[BatchFilterAddAndRemove] filterAddAndRemove: %s", err) - } +type RetryDisputeGameClient struct { + Client *contract.RateAndRetryDisputeGameClient + DB *gorm.DB +} + +func NewRetryDisputeGameClient(db *gorm.DB, address common.Address, rpc *ethclient.Client, limit rate.Limit, + burst int) (*RetryDisputeGameClient, error) { + newDisputeGame, err := contract.NewDisputeGame(address, rpc) + if err != nil { + return nil, err } - return nil + retryLimitGame := contract.NewRateAndRetryDisputeGameClient(newDisputeGame, limit, burst) + return &RetryDisputeGameClient{Client: retryLimitGame, DB: db}, nil } -func filterAddAndRemove(ctx *svc.ServiceContext, evt *schema.SyncEvent) error { +func (r *RetryDisputeGameClient) ProcessDisputeGameCreated(ctx context.Context, evt *schema.SyncEvent) error { dispute := event.DisputeGameCreated{} - if evt.EventName == dispute.Name() && evt.EventHash == dispute.EventHash().String() { - err := dispute.ToObj(evt.Data) + err := dispute.ToObj(evt.Data) + if err != nil { + return fmt.Errorf("[processDisputeGameCreated] event data to DisputeGameCreated err: %s", err) + } + err = r.addDisputeGame(ctx, evt) + if err != nil { + return fmt.Errorf("[processDisputeGameCreated] addDisputeGame err: %s", err) + } + blockchain.AddContract(dispute.DisputeProxy) + return nil +} + +func (r *RetryDisputeGameClient) ProcessDisputeGameMove(ctx context.Context, evt *schema.SyncEvent) error { + disputeGameMove := event.DisputeGameMove{} + err := disputeGameMove.ToObj(evt.Data) + if err != nil { + return fmt.Errorf("[processDisputeGameMove] event data to disputeGameMove err: %s", err) + } + index := disputeGameMove.ParentIndex.Add(disputeGameMove.ParentIndex, big.NewInt(1)) + data, err := r.Client.RetryClaimData(ctx, &bind.CallOpts{}, index) + if err != nil { + return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s", evt.ContractAddress, index, err) + } + + claimData := &schema.GameClaimData{ + GameContract: evt.ContractAddress, + DataIndex: index.Int64(), + ParentIndex: data.ParentIndex, + CounteredBy: data.CounteredBy.Hex(), + Claimant: data.Claimant.Hex(), + Bond: data.Bond.Uint64(), + Claim: hex.EncodeToString(data.Claim[:]), + Position: data.Position.Uint64(), + Clock: data.Clock.Int64(), + } + err = r.DB.Transaction(func(tx *gorm.DB) error { + err = tx.Save(claimData).Error if err != nil { - return fmt.Errorf("[FilterDisputeContractAndAdd] event data to DisputeGameCreated err: %s", err) + return fmt.Errorf("[processDisputeGameMove] save dispute game err: %s\n ", err) } - err = addDisputeGame(ctx, evt) + evt.Status = schema.EventValid + err = tx.Save(evt).Error if err != nil { - return fmt.Errorf("[FilterDisputeContractAndAdd] addDisputeGame err: %s", err) + return fmt.Errorf("[processDisputeGameMove] save event err: %s\n ", err) } - blockchain.AddContract(dispute.DisputeProxy) + return nil + }) + if err != nil { + panic(err) } + return nil +} + +func (r *RetryDisputeGameClient) ProcessDisputeGameResolve(ctx context.Context, evt *schema.SyncEvent) error { disputeResolved := event.DisputeGameResolved{} - if evt.EventName == disputeResolved.Name() && evt.EventHash == disputeResolved.EventHash().String() { - blockchain.RemoveContract(evt.ContractAddress) - log.Infof("resolve event remove %s", evt.ContractAddress) + err := disputeResolved.ToObj(evt.Data) + if err != nil { + return fmt.Errorf("[processDisputeGameResolve] event data to disputeResolved err: %s", err) } - disputeGameMove := event.DisputeGameMove{} - if evt.EventName == disputeGameMove.Name() && evt.EventHash == disputeGameMove.EventHash().String() { - err := disputeGameMove.ToObj(evt.Data) - if err != nil { - return fmt.Errorf("[FilterDisputeContractAndAdd] event data to disputeGameMove err: %s", err) - } - newDisputeGame, err := contract.NewDisputeGame(common.HexToAddress(evt.ContractAddress), ctx.L1RPC) + disputeGame := &schema.DisputeGame{} + err = r.DB.Where("game_contract=?", evt.ContractAddress).First(disputeGame).Error + if err != nil && err != gorm.ErrRecordNotFound { + return fmt.Errorf("[processDisputeGameResolve] resolve event can not find dispute game err: %s", err) + } + disputeGame.Status = disputeResolved.Status + err = r.DB.Transaction(func(tx *gorm.DB) error { + err = tx.Save(disputeGame).Error if err != nil { - return fmt.Errorf("[addDisputeGame] init dispute game contract client err: %s", err) + return fmt.Errorf("[processDisputeGameMove] update dispute game status err: %s\n ", err) } - index := disputeGameMove.ParentIndex.Add(disputeGameMove.ParentIndex, big.NewInt(1)) - data, err := newDisputeGame.ClaimData(&bind.CallOpts{}, index) + evt.Status = schema.EventValid + err = tx.Save(evt).Error if err != nil { - return fmt.Errorf("[addDisputeGame] contract: %s, index: %d move event get claim data err: %s", evt.ContractAddress, index, err) + return fmt.Errorf("[processDisputeGameMove] update event err: %s\n ", err) } - claimData := &schema.GameClaimData{ - GameContract: evt.ContractAddress, - DataIndex: index.Int64(), - ParentIndex: data.ParentIndex, - CounteredBy: data.CounteredBy.Hex(), - Claimant: data.Claimant.Hex(), - Bond: data.Bond.Uint64(), - Claim: hex.EncodeToString(data.Claim[:]), - Position: data.Position.Uint64(), - Clock: data.Clock.Int64(), - } - ctx.DB.Save(claimData) + return nil + }) + if err != nil { + panic(err) } - + blockchain.RemoveContract(evt.ContractAddress) + log.Infof("remove contract: %s", evt.ContractAddress) return nil } -func addDisputeGame(ctx *svc.ServiceContext, evt *schema.SyncEvent) error { +func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema.SyncEvent) error { disputeGame := event.DisputeGameCreated{} err := disputeGame.ToObj(evt.Data) if err != nil { return fmt.Errorf("[addDisputeGame] event data to DisputeGameCreated err: %s", err) } - newDisputeGame, err := contract.NewDisputeGame(common.HexToAddress(disputeGame.DisputeProxy), ctx.L1RPC) - if err != nil { - return fmt.Errorf("[addDisputeGame] init dispute game contract client err: %s", err) - } - retryLimitGame := contract.NewRateAndRetryDisputeGameClient(newDisputeGame, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) - l2Block, err := retryLimitGame.RetryL2BlockNumber(ctx.Context, &bind.CallOpts{}) + l2Block, err := r.Client.RetryL2BlockNumber(ctx, &bind.CallOpts{}) if err != nil { return fmt.Errorf("[addDisputeGame] GET game L2BlockNumber err: %s", err) } - status, err := retryLimitGame.RetryStatus(ctx.Context, &bind.CallOpts{}) + status, err := r.Client.RetryStatus(ctx, &bind.CallOpts{}) if err != nil { return fmt.Errorf("[addDisputeGame] GET game status err: %s", err) } - claimData, err := retryLimitGame.RetryClaimData(ctx.Context, &bind.CallOpts{}, big.NewInt(0)) + claimData, err := r.Client.RetryClaimData(ctx, &bind.CallOpts{}, big.NewInt(0)) if err != nil { return fmt.Errorf("[addDisputeGame] GET index 0 ClaimData err: %s", err) } gameClaim := &schema.GameClaimData{ - GameContract: disputeGame.DisputeProxy, + GameContract: strings.ToLower(disputeGame.DisputeProxy), DataIndex: 0, ParentIndex: claimData.ParentIndex, CounteredBy: claimData.CounteredBy.Hex(), @@ -126,19 +165,25 @@ func addDisputeGame(ctx *svc.ServiceContext, evt *schema.SyncEvent) error { EventName: evt.EventName, EventHash: evt.EventHash, ContractAddress: evt.ContractAddress, - GameContract: disputeGame.DisputeProxy, + GameContract: strings.ToLower(disputeGame.DisputeProxy), GameType: disputeGame.GameType, L2BlockNumber: l2Block.Int64(), Status: status, + InitStatus: schema.DisputeGameInit, } - err = ctx.DB.Transaction(func(tx *gorm.DB) error { + err = r.DB.Transaction(func(tx *gorm.DB) error { + err = tx.Save(gameClaim).Error + if err != nil { + return fmt.Errorf("[addDisputeGame] save dispute game claim: %s", err) + } err = tx.Save(game).Error if err != nil { - return fmt.Errorf("[addDisputeGame] save dispute game err: %s\n ", err) + return fmt.Errorf("[addDisputeGame] save dispute game err: %s ", err) } - err = tx.Save(gameClaim).Error + evt.Status = schema.EventValid + err = tx.Save(evt).Error if err != nil { - return fmt.Errorf("[addDisputeGame] save game claim err: %s\n ", err) + return fmt.Errorf("[addDisputeGame] save event err: %s ", err) } return nil }) diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 88967d9..b2b88ef 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -11,4 +11,6 @@ func Run(ctx *svc.ServiceContext) { go SyncBlock(ctx) // sync events go SyncEvent(ctx) + // sync dispute game + go SyncDispute(ctx) } diff --git a/internal/handler/syncDispute.go b/internal/handler/syncDispute.go new file mode 100644 index 0000000..ca3f3d7 --- /dev/null +++ b/internal/handler/syncDispute.go @@ -0,0 +1,64 @@ +package handler + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/optimism-java/dispute-explorer/internal/schema" + "github.com/optimism-java/dispute-explorer/internal/svc" + "github.com/optimism-java/dispute-explorer/pkg/event" + "github.com/optimism-java/dispute-explorer/pkg/log" + "golang.org/x/time/rate" + "time" +) + +func SyncDispute(ctx *svc.ServiceContext) { + for { + var events []schema.SyncEvent + err := ctx.DB.Where("status=?", schema.EventPending).Limit(20).Find(&events).Error + if err != nil { + time.Sleep(3 * time.Second) + continue + } + for _, evt := range events { + disputeCreated := event.DisputeGameCreated{} + disputeMove := event.DisputeGameMove{} + disputeResolved := event.DisputeGameResolved{} + if evt.EventName == disputeCreated.Name() && evt.EventHash == disputeCreated.EventHash().String() { + err = disputeCreated.ToObj(evt.Data) + if err != nil { + log.Errorf("[handle.SyncDispute] event data to DisputeGameCreated err: %s", err) + } + disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy), + ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) + if err != nil { + log.Errorf("[handle.SyncDispute] init client err: %s", err) + } + err = disputeClient.ProcessDisputeGameCreated(ctx.Context, &evt) + if err != nil { + log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err) + } + } + if evt.EventName == disputeMove.Name() && evt.EventHash == disputeMove.EventHash().String() { + disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress), + ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) + if err != nil { + log.Errorf("[handle.SyncDispute] init client err: %s", err) + } + err = disputeClient.ProcessDisputeGameMove(ctx.Context, &evt) + if err != nil { + log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err) + } + } + if evt.EventName == disputeResolved.Name() && evt.EventHash == disputeResolved.EventHash().String() { + disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress), + ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst) + if err != nil { + log.Errorf("[handle.SyncDispute] init client err: %s", err) + } + err = disputeClient.ProcessDisputeGameResolve(ctx.Context, &evt) + if err != nil { + log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err) + } + } + } + } +} diff --git a/internal/handler/syncEvent.go b/internal/handler/syncEvent.go index fb44a69..1d83297 100644 --- a/internal/handler/syncEvent.go +++ b/internal/handler/syncEvent.go @@ -70,24 +70,38 @@ func HandlePendingBlock(ctx *svc.ServiceContext, block schema.SyncBlock) error { log.Infof("[Handler.SyncEvent.PendingBlock]Don't match block hash\n") return nil } else if eventCount > 0 && events[0].BlockHash == block.BlockHash { - if len(events) > 0 { + BatchEvents := make([]*schema.SyncEvent, 0) + for _, event := range events { + var one schema.SyncEvent + log.Infof("[Handler.SyncEvent.PendingBlock]BlockLogIndexed %d ,TxHash %s,EventHash %s", event.BlockLogIndexed, event.TxHash, event.EventHash) + err = ctx.DB.Select("id").Where("sync_block_id=? AND block_log_indexed=? AND tx_hash=? AND event_hash=? ", + block.ID, event.BlockLogIndexed, event.TxHash, event.EventHash).First(&one).Error + if err != nil && err != gorm.ErrRecordNotFound { + log.Errorf("[Handler.SyncEvent.PendingBlock]Query SyncEvent err: %s\n ", err) + return errors.WithStack(err) + } else if err == gorm.ErrRecordNotFound { + BatchEvents = append(BatchEvents, event) + log.Infof("[Handler.SyncEvent.PendingBlock]block %d, BatchEvents len is %d:", block.BlockNumber, len(BatchEvents)) + } + } + if len(BatchEvents) > 0 { err = ctx.DB.Transaction(func(tx *gorm.DB) error { - err = BatchFilterAddAndRemove(ctx, events) + err = tx.CreateInBatches(&BatchEvents, 200).Error if err != nil { - log.Errorf("[Handler.SyncEvent.PendingBlock] BatchFilterAddAndRemove err: %s\n ", err) + log.Errorf("[Handler.SyncEvent.PendingBlock]CreateInBatches err: %s\n ", err) return errors.WithStack(err) } block.Status = schema.BlockValid block.EventCount = int64(eventCount) err = tx.Save(&block).Error if err != nil { - log.Errorf("[Handler.SyncEvent.PendingBlock] Batch Events Update SyncBlock Status err: %s\n ", err) + log.Errorf("[Handler.SyncEvent.PendingBlock]Batch Events Update SyncBlock Status err: %s\n ", err) return errors.WithStack(err) } return nil }) if err != nil { - panic(err) + return err } return nil } diff --git a/internal/schema/dispute_game.go b/internal/schema/dispute_game.go index 1a55502..9d5c01e 100644 --- a/internal/schema/dispute_game.go +++ b/internal/schema/dispute_game.go @@ -3,9 +3,12 @@ package schema // 0-In progress 1- Challenger wins 2- Defender wins const ( - InProgress = "0" - ChallengerWin = "1" - DefenderWin = "2" + DisputeGameStatusInProgress = 0 + DisputeGameStatusChallengerWin = 1 + DisputeGameStatusDefenderWin = 2 + + DisputeGameInit = 0 + DisputeGameComplete = 1 ) type DisputeGame struct { @@ -25,6 +28,7 @@ type DisputeGame struct { GameType uint32 `json:"game_type"` L2BlockNumber int64 `json:"l_2_block_number"` Status uint8 `json:"status"` + InitStatus uint8 `json:""` } func (DisputeGame) TableName() string {