Skip to content

Commit

Permalink
feat:add sync event
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouop0 committed Jun 20, 2024
1 parent 3c3060f commit b3128a3
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 71 deletions.
27 changes: 26 additions & 1 deletion docs/sql/postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,29 @@ CREATE INDEX if not exists status_index ON sync_blocks (status);
CREATE INDEX if not exists tx_count_index ON sync_blocks (tx_count);
CREATE INDEX if not exists check_count_index ON sync_blocks (check_count);

-- Create sync_events table
DROP TABLE sync_events;
CREATE TABLE IF NOT EXISTS sync_events
(
id SERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
sync_block_id BIGINT NOT NULL,
blockchain VARCHAR(32) NOT NULL,
block_time BIGINT NOT NULL,
block_number BIGINT NOT NULL,
block_hash VARCHAR(66) NOT NULL,
block_log_indexed BIGINT NOT NULL,
tx_index BIGINT NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
event_name VARCHAR(32) NOT NULL,
event_hash VARCHAR(66) NOT NULL,
contract_address VARCHAR(42) NOT NULL,
data JSONB NOT NULL,
status VARCHAR(32) NOT NULL,
retry_count BIGINT DEFAULT 0
);

-- ----------------------------
-- Table structure for dispute_game
-- ----------------------------
Expand All @@ -49,6 +72,7 @@ CREATE TABLE IF NOT EXISTS dispute_game
l2_block_number bigint NOT NULL,
status int NOT NULL
);
CREATE INDEX if not exists dispute_game_index ON dispute_game (contract_address, game_contract);

-- ----------------------------
-- Table structure for game_claim_data
Expand All @@ -68,4 +92,5 @@ CREATE TABLE IF NOT EXISTS game_claim_data
claim varchar(64) NOT NULL,
position bigint NOT NULL,
clock bigint NOT NULL
)
);
CREATE INDEX if not exists dispute_game_data_index ON game_claim_data (game_contract, data_index);
4 changes: 2 additions & 2 deletions internal/blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
func TestRemoveContract(t *testing.T) {
contracts = GetContracts()
fmt.Println(GetContracts())
AddContract("BB")
AddContract("0x05F9613aDB30026FFd634f38e5C4dFd30a197Ba1")
AddContract("CC")
fmt.Println(GetContracts())
RemoveContract("AA")
RemoveContract("0x05f9613adB30026FFd634f38e5C4dFd30a197ba1")
RemoveContract("BB")
RemoveContract("CC")
fmt.Println(GetContracts())
Expand Down
165 changes: 105 additions & 60 deletions internal/handler/disputeGame.go
Original file line number Diff line number Diff line change
@@ -1,109 +1,148 @@
package handler

import (
"context"
"encoding/hex"
"fmt"

Check failure on line 6 in internal/handler/disputeGame.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

File is not `gofumpt`-ed (gofumpt)
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/optimism-java/dispute-explorer/pkg/log"
"golang.org/x/time/rate"
"math/big"

Check failure on line 11 in internal/handler/disputeGame.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

File is not `gofumpt`-ed (gofumpt)
"strings"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/optimism-java/dispute-explorer/internal/blockchain"
"github.com/optimism-java/dispute-explorer/internal/schema"
"github.com/optimism-java/dispute-explorer/internal/svc"
"github.com/optimism-java/dispute-explorer/pkg/contract"
"github.com/optimism-java/dispute-explorer/pkg/event"
"github.com/optimism-java/dispute-explorer/pkg/log"
"golang.org/x/time/rate"
"gorm.io/gorm"
)

func BatchFilterAddAndRemove(ctx *svc.ServiceContext, events []*schema.SyncEvent) error {
for _, evt := range events {
err := filterAddAndRemove(ctx, evt)
if err != nil {
return fmt.Errorf("[BatchFilterAddAndRemove] filterAddAndRemove: %s", err)
}
type RetryDisputeGameClient struct {
Client *contract.RateAndRetryDisputeGameClient
DB *gorm.DB
}

func NewRetryDisputeGameClient(db *gorm.DB, address common.Address, rpc *ethclient.Client, limit rate.Limit,
burst int) (*RetryDisputeGameClient, error) {

Check failure on line 28 in internal/handler/disputeGame.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

File is not `gofumpt`-ed (gofumpt)
newDisputeGame, err := contract.NewDisputeGame(address, rpc)
if err != nil {
return nil, err
}
return nil
retryLimitGame := contract.NewRateAndRetryDisputeGameClient(newDisputeGame, limit, burst)
return &RetryDisputeGameClient{Client: retryLimitGame, DB: db}, nil
}

func filterAddAndRemove(ctx *svc.ServiceContext, evt *schema.SyncEvent) error {
func (r *RetryDisputeGameClient) ProcessDisputeGameCreated(ctx context.Context, evt *schema.SyncEvent) error {
dispute := event.DisputeGameCreated{}
if evt.EventName == dispute.Name() && evt.EventHash == dispute.EventHash().String() {
err := dispute.ToObj(evt.Data)
err := dispute.ToObj(evt.Data)
if err != nil {
return fmt.Errorf("[processDisputeGameCreated] event data to DisputeGameCreated err: %s", err)
}
err = r.addDisputeGame(ctx, evt)
if err != nil {
return fmt.Errorf("[processDisputeGameCreated] addDisputeGame err: %s", err)
}
blockchain.AddContract(dispute.DisputeProxy)
return nil
}

func (r *RetryDisputeGameClient) ProcessDisputeGameMove(ctx context.Context, evt *schema.SyncEvent) error {
disputeGameMove := event.DisputeGameMove{}
err := disputeGameMove.ToObj(evt.Data)
if err != nil {
return fmt.Errorf("[processDisputeGameMove] event data to disputeGameMove err: %s", err)
}
index := disputeGameMove.ParentIndex.Add(disputeGameMove.ParentIndex, big.NewInt(1))
data, err := r.Client.RetryClaimData(ctx, &bind.CallOpts{}, index)
if err != nil {
return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s", evt.ContractAddress, index, err)
}

claimData := &schema.GameClaimData{
GameContract: evt.ContractAddress,
DataIndex: index.Int64(),
ParentIndex: data.ParentIndex,
CounteredBy: data.CounteredBy.Hex(),
Claimant: data.Claimant.Hex(),
Bond: data.Bond.Uint64(),
Claim: hex.EncodeToString(data.Claim[:]),
Position: data.Position.Uint64(),
Clock: data.Clock.Int64(),
}
err = r.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Save(claimData).Error
if err != nil {
return fmt.Errorf("[FilterDisputeContractAndAdd] event data to DisputeGameCreated err: %s", err)
return fmt.Errorf("[processDisputeGameMove] save dispute game err: %s\n ", err)
}
err = addDisputeGame(ctx, evt)
evt.Status = schema.EventValid
err = tx.Save(evt).Error
if err != nil {
return fmt.Errorf("[FilterDisputeContractAndAdd] addDisputeGame err: %s", err)
return fmt.Errorf("[processDisputeGameMove] save event err: %s\n ", err)
}
blockchain.AddContract(dispute.DisputeProxy)
return nil
})
if err != nil {
panic(err)
}
return nil
}

func (r *RetryDisputeGameClient) ProcessDisputeGameResolve(ctx context.Context, evt *schema.SyncEvent) error {

Check warning on line 92 in internal/handler/disputeGame.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
disputeResolved := event.DisputeGameResolved{}
if evt.EventName == disputeResolved.Name() && evt.EventHash == disputeResolved.EventHash().String() {
blockchain.RemoveContract(evt.ContractAddress)
log.Infof("resolve event remove %s", evt.ContractAddress)
err := disputeResolved.ToObj(evt.Data)
if err != nil {
return fmt.Errorf("[processDisputeGameResolve] event data to disputeResolved err: %s", err)
}
disputeGameMove := event.DisputeGameMove{}
if evt.EventName == disputeGameMove.Name() && evt.EventHash == disputeGameMove.EventHash().String() {
err := disputeGameMove.ToObj(evt.Data)
if err != nil {
return fmt.Errorf("[FilterDisputeContractAndAdd] event data to disputeGameMove err: %s", err)
}
newDisputeGame, err := contract.NewDisputeGame(common.HexToAddress(evt.ContractAddress), ctx.L1RPC)
disputeGame := &schema.DisputeGame{}
err = r.DB.Where("game_contract=?", evt.ContractAddress).First(disputeGame).Error
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("[processDisputeGameResolve] resolve event can not find dispute game err: %s", err)
}
disputeGame.Status = disputeResolved.Status
err = r.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Save(disputeGame).Error
if err != nil {
return fmt.Errorf("[addDisputeGame] init dispute game contract client err: %s", err)
return fmt.Errorf("[processDisputeGameMove] update dispute game status err: %s\n ", err)
}
index := disputeGameMove.ParentIndex.Add(disputeGameMove.ParentIndex, big.NewInt(1))
data, err := newDisputeGame.ClaimData(&bind.CallOpts{}, index)
evt.Status = schema.EventValid
err = tx.Save(evt).Error
if err != nil {
return fmt.Errorf("[addDisputeGame] contract: %s, index: %d move event get claim data err: %s", evt.ContractAddress, index, err)
return fmt.Errorf("[processDisputeGameMove] update event err: %s\n ", err)
}
claimData := &schema.GameClaimData{
GameContract: evt.ContractAddress,
DataIndex: index.Int64(),
ParentIndex: data.ParentIndex,
CounteredBy: data.CounteredBy.Hex(),
Claimant: data.Claimant.Hex(),
Bond: data.Bond.Uint64(),
Claim: hex.EncodeToString(data.Claim[:]),
Position: data.Position.Uint64(),
Clock: data.Clock.Int64(),
}
ctx.DB.Save(claimData)
return nil
})
if err != nil {
panic(err)
}

blockchain.RemoveContract(evt.ContractAddress)
log.Infof("remove contract: %s", evt.ContractAddress)
return nil
}

func addDisputeGame(ctx *svc.ServiceContext, evt *schema.SyncEvent) error {
func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema.SyncEvent) error {
disputeGame := event.DisputeGameCreated{}
err := disputeGame.ToObj(evt.Data)
if err != nil {
return fmt.Errorf("[addDisputeGame] event data to DisputeGameCreated err: %s", err)
}

newDisputeGame, err := contract.NewDisputeGame(common.HexToAddress(disputeGame.DisputeProxy), ctx.L1RPC)
if err != nil {
return fmt.Errorf("[addDisputeGame] init dispute game contract client err: %s", err)
}
retryLimitGame := contract.NewRateAndRetryDisputeGameClient(newDisputeGame, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
l2Block, err := retryLimitGame.RetryL2BlockNumber(ctx.Context, &bind.CallOpts{})
l2Block, err := r.Client.RetryL2BlockNumber(ctx, &bind.CallOpts{})
if err != nil {
return fmt.Errorf("[addDisputeGame] GET game L2BlockNumber err: %s", err)
}
status, err := retryLimitGame.RetryStatus(ctx.Context, &bind.CallOpts{})
status, err := r.Client.RetryStatus(ctx, &bind.CallOpts{})
if err != nil {
return fmt.Errorf("[addDisputeGame] GET game status err: %s", err)
}
claimData, err := retryLimitGame.RetryClaimData(ctx.Context, &bind.CallOpts{}, big.NewInt(0))
claimData, err := r.Client.RetryClaimData(ctx, &bind.CallOpts{}, big.NewInt(0))
if err != nil {
return fmt.Errorf("[addDisputeGame] GET index 0 ClaimData err: %s", err)
}

gameClaim := &schema.GameClaimData{
GameContract: disputeGame.DisputeProxy,
GameContract: strings.ToLower(disputeGame.DisputeProxy),
DataIndex: 0,
ParentIndex: claimData.ParentIndex,
CounteredBy: claimData.CounteredBy.Hex(),
Expand All @@ -126,19 +165,25 @@ func addDisputeGame(ctx *svc.ServiceContext, evt *schema.SyncEvent) error {
EventName: evt.EventName,
EventHash: evt.EventHash,
ContractAddress: evt.ContractAddress,
GameContract: disputeGame.DisputeProxy,
GameContract: strings.ToLower(disputeGame.DisputeProxy),
GameType: disputeGame.GameType,
L2BlockNumber: l2Block.Int64(),
Status: status,
InitStatus: schema.DisputeGameInit,
}
err = ctx.DB.Transaction(func(tx *gorm.DB) error {
err = r.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Save(gameClaim).Error
if err != nil {
return fmt.Errorf("[addDisputeGame] save dispute game claim: %s", err)
}
err = tx.Save(game).Error
if err != nil {
return fmt.Errorf("[addDisputeGame] save dispute game err: %s\n ", err)
return fmt.Errorf("[addDisputeGame] save dispute game err: %s ", err)
}
err = tx.Save(gameClaim).Error
evt.Status = schema.EventValid
err = tx.Save(evt).Error
if err != nil {
return fmt.Errorf("[addDisputeGame] save game claim err: %s\n ", err)
return fmt.Errorf("[addDisputeGame] save event err: %s ", err)
}
return nil
})
Expand Down
2 changes: 2 additions & 0 deletions internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ func Run(ctx *svc.ServiceContext) {
go SyncBlock(ctx)
// sync events
go SyncEvent(ctx)
// sync dispute game
go SyncDispute(ctx)
}
64 changes: 64 additions & 0 deletions internal/handler/syncDispute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package handler

import (

Check failure on line 3 in internal/handler/syncDispute.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

File is not `gofumpt`-ed (gofumpt)
"github.com/ethereum/go-ethereum/common"
"github.com/optimism-java/dispute-explorer/internal/schema"
"github.com/optimism-java/dispute-explorer/internal/svc"
"github.com/optimism-java/dispute-explorer/pkg/event"
"github.com/optimism-java/dispute-explorer/pkg/log"
"golang.org/x/time/rate"
"time"

Check failure on line 10 in internal/handler/syncDispute.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

File is not `gofumpt`-ed (gofumpt)
)

func SyncDispute(ctx *svc.ServiceContext) {
for {
var events []schema.SyncEvent
err := ctx.DB.Where("status=?", schema.EventPending).Limit(20).Find(&events).Error
if err != nil {
time.Sleep(3 * time.Second)
continue
}
for _, evt := range events {
disputeCreated := event.DisputeGameCreated{}
disputeMove := event.DisputeGameMove{}
disputeResolved := event.DisputeGameResolved{}
if evt.EventName == disputeCreated.Name() && evt.EventHash == disputeCreated.EventHash().String() {
err = disputeCreated.ToObj(evt.Data)
if err != nil {
log.Errorf("[handle.SyncDispute] event data to DisputeGameCreated err: %s", err)
}
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client err: %s", err)
}
err = disputeClient.ProcessDisputeGameCreated(ctx.Context, &evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
}
}
if evt.EventName == disputeMove.Name() && evt.EventHash == disputeMove.EventHash().String() {
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client err: %s", err)
}
err = disputeClient.ProcessDisputeGameMove(ctx.Context, &evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
}
}
if evt.EventName == disputeResolved.Name() && evt.EventHash == disputeResolved.EventHash().String() {
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client err: %s", err)
}
err = disputeClient.ProcessDisputeGameResolve(ctx.Context, &evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
}
}
}
}
}
Loading

0 comments on commit b3128a3

Please sign in to comment.