Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #59 from 0xPolygon/vcastellm/fix-context
Browse files Browse the repository at this point in the history
Fix context lifetime
  • Loading branch information
vcastellm authored Jan 31, 2024
2 parents 6fc8017 + 1506161 commit 9e2ff44
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 28 deletions.
5 changes: 1 addition & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ func start(cliCtx *cli.Context) error {
log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), c.RPC.ReadTimeout.Duration)
defer cancel()

executor := interop.New(
log.WithFields("module", "executor"),
c,
Expand All @@ -141,7 +138,7 @@ func start(cliCtx *cli.Context) error {
[]jRPC.Service{
{
Name: rpc.INTEROP,
Service: rpc.NewInteropEndpoints(ctx, executor, storage),
Service: rpc.NewInteropEndpoints(executor, storage, c),
},
},
)
Expand Down
31 changes: 19 additions & 12 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/ethereum/go-ethereum/common"

"github.com/0xPolygon/agglayer/config"
"github.com/0xPolygon/agglayer/interop"
"github.com/0xPolygon/agglayer/tx"
"github.com/0xPolygon/agglayer/types"
Expand All @@ -21,53 +22,56 @@ const (

// InteropEndpoints contains implementations for the "interop" RPC endpoints
type InteropEndpoints struct {
ctx context.Context
executor *interop.Executor
db types.IDB
config *config.Config
}

// NewInteropEndpoints returns InteropEndpoints
func NewInteropEndpoints(
ctx context.Context,
executor *interop.Executor,
db types.IDB,
conf *config.Config,
) *InteropEndpoints {
return &InteropEndpoints{
ctx: ctx,
executor: executor,
db: db,
config: conf,
}
}

func (i *InteropEndpoints) SendTx(signedTx tx.SignedTx) (interface{}, jRPC.Error) {
ctx, cancel := context.WithTimeout(context.Background(), i.config.RPC.WriteTimeout.Duration)
defer cancel()

// Check if the RPC is actually registered, if not it won't be possible to assert soundness (in the future once we are stateless won't be needed)
if err := i.executor.CheckTx(signedTx); err != nil {
return "0x0", jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("there is no RPC registered for %d", signedTx.Tx.RollupID))
}

// Verify ZKP using eth_call
if err := i.executor.Verify(i.ctx, signedTx); err != nil {
if err := i.executor.Verify(ctx, signedTx); err != nil {
return "0x0", jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to verify tx: %s", err))
}

if err := i.executor.Execute(i.ctx, signedTx); err != nil {
if err := i.executor.Execute(ctx, signedTx); err != nil {
return "0x0", jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to execute tx: %s", err))
}

// Send L1 tx
dbTx, err := i.db.BeginStateTransaction(i.ctx)
dbTx, err := i.db.BeginStateTransaction(ctx)
if err != nil {
return "0x0", jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to begin dbTx, error: %s", err))
}

_, err = i.executor.Settle(i.ctx, signedTx, dbTx)
_, err = i.executor.Settle(ctx, signedTx, dbTx)
if err != nil {
if errRollback := dbTx.Rollback(i.ctx); errRollback != nil {
if errRollback := dbTx.Rollback(ctx); errRollback != nil {
log.Error("rollback err: ", errRollback)
}
return "0x0", jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to add tx to ethTxMan, error: %s", err))
}
if err := dbTx.Commit(i.ctx); err != nil {
if err := dbTx.Commit(ctx); err != nil {
return "0x0", jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to commit dbTx, error: %s", err))
}
log.Debugf("successfuly added tx %s to ethTxMan", signedTx.Tx.Hash().Hex())
Expand All @@ -76,7 +80,10 @@ func (i *InteropEndpoints) SendTx(signedTx tx.SignedTx) (interface{}, jRPC.Error
}

func (i *InteropEndpoints) GetTxStatus(hash common.Hash) (result interface{}, err jRPC.Error) {
dbTx, innerErr := i.db.BeginStateTransaction(i.ctx)
ctx, cancel := context.WithTimeout(context.Background(), i.config.RPC.ReadTimeout.Duration)
defer cancel()

dbTx, innerErr := i.db.BeginStateTransaction(ctx)
if innerErr != nil {
result = "0x0"
err = jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to begin dbTx, error: %s", innerErr))
Expand All @@ -85,13 +92,13 @@ func (i *InteropEndpoints) GetTxStatus(hash common.Hash) (result interface{}, er
}

defer func() {
if innerErr := dbTx.Rollback(i.ctx); innerErr != nil {
if innerErr := dbTx.Rollback(ctx); innerErr != nil {
result = "0x0"
err = jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to rollback dbTx, error: %s", innerErr))
}
}()

result, innerErr = i.executor.GetTxStatus(i.ctx, hash, dbTx)
result, innerErr = i.executor.GetTxStatus(ctx, hash, dbTx)
if innerErr != nil {
result = "0x0"
err = jRPC.NewRPCError(jRPC.DefaultErrorCode, fmt.Sprintf("failed to get tx, error: %s", innerErr))
Expand Down
28 changes: 16 additions & 12 deletions rpc/rpc_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rpc

import (
"context"
"errors"
"math/big"
"testing"
Expand Down Expand Up @@ -32,14 +31,15 @@ func TestInteropEndpointsGetTxStatus(t *testing.T) {
dbMock := mocks.NewDBMock(t)
dbMock.On("BeginStateTransaction", mock.Anything).Return(nil, errors.New("error")).Once()

cfg := &config.Config{}
e := interop.New(
log.WithFields("module", "test"),
&config.Config{},
cfg,
common.HexToAddress("0xadmin"),
mocks.NewEthermanMock(t),
mocks.NewEthTxManagerMock(t),
)
i := NewInteropEndpoints(context.Background(), e, dbMock)
i := NewInteropEndpoints(e, dbMock, cfg)

result, err := i.GetTxStatus(common.HexToHash("0xsomeTxHash"))

Expand All @@ -64,14 +64,15 @@ func TestInteropEndpointsGetTxStatus(t *testing.T) {
txManagerMock.On("Result", mock.Anything, ethTxManOwner, txHash.Hex(), txMock).
Return(ethtxmanager.MonitoredTxResult{}, errors.New("error")).Once()

cfg := &config.Config{}
e := interop.New(
log.WithFields("module", "test"),
&config.Config{},
cfg,
common.HexToAddress("0xadmin"),
mocks.NewEthermanMock(t),
txManagerMock,
)
i := NewInteropEndpoints(context.Background(), e, dbMock)
i := NewInteropEndpoints(e, dbMock, cfg)

result, err := i.GetTxStatus(txHash)

Expand Down Expand Up @@ -108,14 +109,15 @@ func TestInteropEndpointsGetTxStatus(t *testing.T) {
txManagerMock.On("Result", mock.Anything, ethTxManOwner, txHash.Hex(), txMock).
Return(result, nil).Once()

cfg := &config.Config{}
e := interop.New(
log.WithFields("module", "test"),
&config.Config{},
cfg,
common.HexToAddress("0xadmin"),
mocks.NewEthermanMock(t),
txManagerMock,
)
i := NewInteropEndpoints(context.Background(), e, dbMock)
i := NewInteropEndpoints(e, dbMock, cfg)

status, err := i.GetTxStatus(txHash)

Expand Down Expand Up @@ -169,17 +171,19 @@ func TestInteropEndpointsSendTx(t *testing.T) {
ethTxManagerMock := mocks.NewEthTxManagerMock(t)

executeTestFn := func() {
c := &config.Config{
FullNodeRPCs: fullNodeRPCs,
L1: config.L1Config{RollupManagerContract: common.HexToAddress("0xdeadbeef")},
}

e := interop.New(
log.WithFields("module", "test"),
&config.Config{
FullNodeRPCs: fullNodeRPCs,
L1: config.L1Config{RollupManagerContract: common.HexToAddress("0xdeadbeef")},
},
c,
common.HexToAddress("0xadmin"),
ethermanMock,
ethTxManagerMock,
)
i := NewInteropEndpoints(context.Background(), e, dbMock)
i := NewInteropEndpoints(e, dbMock, c)
i.executor.ZkEVMClientCreator = zkEVMClientCreatorMock

result, err := i.SendTx(*signedTx)
Expand Down

0 comments on commit 9e2ff44

Please sign in to comment.