From c93dc367e9794386666020c15da14434bd62f7c1 Mon Sep 17 00:00:00 2001 From: allnil Date: Wed, 13 Nov 2024 21:01:33 +0000 Subject: [PATCH] chore: refactor code, add timeouts on operations --- .env.example.holesky | 3 + store/precomputed_key/wvm/cli.go | 18 +- store/precomputed_key/wvm/eth_rpc_client.go | 266 ++++++++---------- .../precomputed_key/wvm/web3_signer_client.go | 22 +- store/precomputed_key/wvm/wvm.go | 67 +---- 5 files changed, 151 insertions(+), 225 deletions(-) diff --git a/.env.example.holesky b/.env.example.holesky index 5e07e976..39f532bb 100644 --- a/.env.example.holesky +++ b/.env.example.holesky @@ -99,6 +99,9 @@ EIGENDA_PROXY_EIGENDA_SERVICE_MANAGER_ADDR=0xD4A7E1Bd8015057293f0D0A557088c28694 # WVM chain id # EIGENDA_PROXY_WVM_CHAIN_ID=9496 +# WVM operations timeout +# EIGENDA_PROXY_WVM_TIMEOUT=5s + # WVM web3signer endpoint # EIGENDA_PROXY_WVM_WEB3_SIGNER_ENDPOINT= diff --git a/store/precomputed_key/wvm/cli.go b/store/precomputed_key/wvm/cli.go index d2ce7b13..32d0e236 100644 --- a/store/precomputed_key/wvm/cli.go +++ b/store/precomputed_key/wvm/cli.go @@ -1,6 +1,8 @@ package wvm import ( + "time" + "github.com/urfave/cli/v2" ) @@ -56,13 +58,13 @@ func CLIFlags(envPrefix, category string) []cli.Flag { EnvVars: withEnvPrefix(envPrefix, "WEB3_SIGNER_ENDPOINT"), Category: category, }, - // &cli.DurationFlag{ - // Name: TimeoutFlagName, - // Usage: "timeout for S3 storage operations (e.g. get, put)", - // Value: 5 * time.Second, - // EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), - // Category: category, - // }, + &cli.DurationFlag{ + Name: TimeoutFlagName, + Usage: "timeout for WVM requests operations (e.g. get, put)", + Value: 5 * time.Second, + EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), + Category: category, + }, } } @@ -73,6 +75,6 @@ func ReadConfig(ctx *cli.Context) Config { ArchiverAddress: ctx.String(ArchiverAddressFlagName), Web3SignerEndpoint: ctx.String(Web3SignerEndpoint), Enabled: ctx.Bool(EnabledFlagName), - // Timeout: ctx.Duration(TimeoutFlagName), + Timeout: ctx.Duration(TimeoutFlagName), } } diff --git a/store/precomputed_key/wvm/eth_rpc_client.go b/store/precomputed_key/wvm/eth_rpc_client.go index 6aa92d09..068bd3e7 100644 --- a/store/precomputed_key/wvm/eth_rpc_client.go +++ b/store/precomputed_key/wvm/eth_rpc_client.go @@ -37,25 +37,25 @@ func NewEthRPCClient(log log.Logger, cfg *Config) (*EthRPCClient, error) { return nil, fmt.Errorf("failed to connect to the WVM client: %w", err) } - web3signerClient := NewWeb3SignerClient(cfg.Web3SignerEndpoint, log) - if web3signerClient == nil { - privKey := os.Getenv("WVM_PRIV_KEY") - if privKey == "" { + ethRPCClient := &EthRPCClient{ + log: log, + client: client, + chainID: cfg.ChainID, + } + + privKey := os.Getenv("WVM_PRIV_KEY") + if privKey == "" { + if cfg.Web3SignerEndpoint == "" { return nil, fmt.Errorf("wvm archiver private key is empty and wvm web3 signer is empty") } + ethRPCClient.web3signer = NewWeb3SignerClient(cfg, log) } - return &EthRPCClient{ - log: log, - client: client, - chainID: cfg.ChainID, - // may change it signer interface - web3signer: web3signerClient, - }, nil + return ethRPCClient, nil } // estimateGas tries estimates the suggested amount of gas that required to execute a given transaction. -func (rpc *EthRPCClient) estimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { +func (rpc *EthRPCClient) EstimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { var ( fromAddr = common.HexToAddress(from) toAddr = common.HexToAddress(to) @@ -93,102 +93,91 @@ func (rpc *EthRPCClient) estimateGas(ctx context.Context, from, to string, data } // createRawTransaction creates a raw EIP-1559 transaction and returns it as a hex string. -func (rpc *EthRPCClient) createRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { +func (rpc *EthRPCClient) CreateRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { baseFee, err := rpc.client.SuggestGasPrice(ctx) if err != nil { return "", err } gasFeeCap := baseFee - if rpc.web3signer == nil { - // address shenanigans - // Decode the provided private key. - privKey := os.Getenv("WVM_PRIV_KEY") - if privKey == "" { - panic("wvm archiver signer key is empty") - } - pKeyBytes, err := hexutil.Decode("0x" + privKey) - if err != nil { - return "", err - } - // Convert the private key bytes to an ECDSA private key. - ecdsaPrivateKey, err := crypto.ToECDSA(pKeyBytes) - if err != nil { - return "", err - } - // Extract the public key from the ECDSA private key. - publicKey := ecdsaPrivateKey.Public() - publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) - if !ok { - return "", fmt.Errorf("error casting public key to ECDSA") - } + return rpc.signTxWithPrivateKey(ctx, to, data, gasFeeCap, gasLimit) + } + return rpc.signTxWithWeb3Signer(ctx, to, data, gasFeeCap, gasLimit) +} - // Compute the Ethereum address of the signer from the public key. - fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) +func (rpc *EthRPCClient) SendRawTransaction(ctx context.Context, signedTxHex string) (string, error) { + var err error + var signedTxBytes []byte - // Retrieve the nonce for the signer's account, representing the transaction count. - nonce, err := rpc.client.PendingNonceAt(ctx, fromAddress) + if strings.HasPrefix(signedTxHex, "0x") { + signedTxBytes, err = hexutil.Decode(signedTxHex) if err != nil { - return "", err - } - // Prepare data payload. - var hexData string - if strings.HasPrefix(data, "0x") { - hexData = data - } else { - hexData = hexutil.Encode([]byte(data)) + return "", fmt.Errorf("failed to decode signed transaction: %w", err) } - bytesData, err := hexutil.Decode(hexData) + } else { + signedTxBytes, err = hex.DecodeString(signedTxHex) if err != nil { - return "", err + return "", fmt.Errorf("failed to decode signed transaction: %w", err) } + } - toAddr := common.HexToAddress(to) - txData := types.DynamicFeeTx{ - ChainID: big.NewInt(rpc.chainID), - Nonce: nonce, - GasTipCap: big.NewInt(0), - GasFeeCap: gasFeeCap, - Gas: gasLimit, - To: &toAddr, - Data: bytesData, - } - tx := types.NewTx(&txData) - signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(big.NewInt(rpc.chainID)), ecdsaPrivateKey) + tx := new(types.Transaction) + err = tx.UnmarshalBinary(signedTxBytes) + if err != nil { + err = rlp.DecodeBytes(signedTxBytes, tx) if err != nil { - return "", err + return "", fmt.Errorf("failed to parse signed transaction: %w", err) } + } - // Encode the signed transaction into RLP (Recursive Length Prefix) format for transmission. - var buf bytes.Buffer - err = signedTx.EncodeRLP(&buf) - if err != nil { - return "", err - } + err = rpc.client.SendTransaction(ctx, tx) + if err != nil { + return "", err + } - // Return the RLP-encoded transaction as a hexadecimal string. - rawTxRLPHex := hex.EncodeToString(buf.Bytes()) + rpc.log.Info("wvm: successfully sent transaction", "tx hash", tx.Hash().String()) - return rawTxRLPHex, nil + err = rpc.logReceipt(tx) + if err != nil { + rpc.log.Error("failed to log sent transaction receipt", "error", err) } - rpc.log.Info("sign transaction using web3signer") + return tx.Hash().String(), nil +} - fromAddresses, err := rpc.web3signer.GetAccounts(ctx) +func (rpc *EthRPCClient) signTxWithPrivateKey(ctx context.Context, to string, data string, gasFeeCap *big.Int, gasLimit uint64) (string, error) { + // Getting public address from private key + + // Decode the provided private key. + privKey := os.Getenv("WVM_PRIV_KEY") + if privKey == "" { + panic("wvm archiver signer key is empty") + } + pKeyBytes, err := hexutil.Decode("0x" + privKey) if err != nil { - return "", fmt.Errorf("failed to get an account from web3signer: %w", err) + return "", err + } + // Convert the private key bytes to an ECDSA private key. + ecdsaPrivateKey, err := crypto.ToECDSA(pKeyBytes) + if err != nil { + return "", err + } + // Extract the public key from the ECDSA private key. + publicKey := ecdsaPrivateKey.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + if !ok { + return "", fmt.Errorf("error casting public key to ECDSA") } - fromAddress := fromAddresses[0] + + // Compute the Ethereum address of the signer from the public key. + fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) // Retrieve the nonce for the signer's account, representing the transaction count. nonce, err := rpc.client.PendingNonceAt(ctx, fromAddress) if err != nil { return "", err } - - toAddr := common.HexToAddress(to) - // Prepare data payload. var hexData string if strings.HasPrefix(data, "0x") { @@ -200,96 +189,83 @@ func (rpc *EthRPCClient) createRawTransaction(ctx context.Context, to string, da if err != nil { return "", err } - // Prepare transaction for Web3Signer - tx := map[string]interface{}{ - "from": fromAddress.String(), - "to": toAddr.String(), - "gas": fmt.Sprintf("0x%x", gasLimit), - "maxFeePerGas": fmt.Sprintf("0x%x", gasFeeCap), - "maxPriorityFeePerGas": "0x0", - "value": "0x0", - "data": bytesData, - "nonce": fmt.Sprintf("0x%x", nonce), - "chainId": fmt.Sprintf("0x%x", rpc.chainID), - } - - // Sign transaction using Web3Signer - signedTx, err := rpc.web3signer.SignTransaction(ctx, tx) - if err != nil { - return "", fmt.Errorf("failed to sign transaction with web3signer: %w", err) - } - - return signedTx, nil -} - -func (rpc *EthRPCClient) sendRawTransaction(ctx context.Context, signedTxHex string) (string, error) { - var err error - signedTxBytes := []byte{} - if strings.HasPrefix(signedTxHex, "0x") { - signedTxBytes, err = hexutil.Decode(signedTxHex) - if err != nil { - return "", fmt.Errorf("failed to decode signed transaction: %w", err) - } - } else { - signedTxBytes, err = hex.DecodeString(signedTxHex) - if err != nil { - return "", fmt.Errorf("failed to decode signed transaction: %w", err) - } + toAddr := common.HexToAddress(to) + txData := types.DynamicFeeTx{ + ChainID: big.NewInt(rpc.chainID), + Nonce: nonce, + GasTipCap: big.NewInt(0), + GasFeeCap: gasFeeCap, + Gas: gasLimit, + To: &toAddr, + Data: bytesData, } - - tx := new(types.Transaction) - err = tx.UnmarshalBinary(signedTxBytes) + tx := types.NewTx(&txData) + signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(big.NewInt(rpc.chainID)), ecdsaPrivateKey) if err != nil { - err = rlp.DecodeBytes(signedTxBytes, tx) - if err != nil { - return "", fmt.Errorf("failed to parse signed transaction: %w", err) - } + return "", err } - err = rpc.client.SendTransaction(ctx, tx) + // Encode the signed transaction into RLP (Recursive Length Prefix) format for transmission. + var buf bytes.Buffer + err = signedTx.EncodeRLP(&buf) if err != nil { return "", err } - rpc.log.Info("wvm: succesfully sent transaction", "tx hash", tx.Hash().String()) + // Return the RLP-encoded transaction as a hexadecimal string. + rawTxRLPHex := hex.EncodeToString(buf.Bytes()) - err = rpc.logReceipt(tx) - if err != nil { - rpc.log.Error("failed to log sent transaction receipt", "error", err) - } - - return tx.Hash().String(), nil + return rawTxRLPHex, nil } -/* -func (rpc *EthRPCClient) sendRawTransaction(ctx context.Context, rawTx string) (string, error) { - rawTxBytes, err := hex.DecodeString(rawTx) +func (rpc *EthRPCClient) signTxWithWeb3Signer(ctx context.Context, to string, data string, gasFeeCap *big.Int, gasLimit uint64) (string, error) { + rpc.log.Info("sign transaction using web3signer") + + fromAddresses, err := rpc.web3signer.GetAccounts(ctx) if err != nil { - return "", err + return "", fmt.Errorf("failed to get an account from web3signer: %w", err) } - - tx := new(types.Transaction) - - err = rlp.DecodeBytes(rawTxBytes, &tx) + fromAddress := fromAddresses[0] + // Retrieve the nonce for the signer's account, representing the transaction count. + nonce, err := rpc.client.PendingNonceAt(ctx, fromAddress) if err != nil { return "", err } - err = rpc.client.SendTransaction(ctx, tx) + toAddr := common.HexToAddress(to) + // Prepare data payload. + var hexData string + if strings.HasPrefix(data, "0x") { + hexData = data + } else { + hexData = hexutil.Encode([]byte(data)) + } + bytesData, err := hexutil.Decode(hexData) if err != nil { return "", err } - rpc.log.Info("wvm: succesfully sent transaction", "tx hash", tx.Hash().String()) + // Prepare transaction for Web3Signer + tx := map[string]interface{}{ + "from": fromAddress.String(), + "to": toAddr.String(), + "gas": fmt.Sprintf("0x%x", gasLimit), + "maxFeePerGas": fmt.Sprintf("0x%x", gasFeeCap), + "maxPriorityFeePerGas": "0x0", + "value": "0x0", + "data": bytesData, + "nonce": fmt.Sprintf("0x%x", nonce), + "chainId": fmt.Sprintf("0x%x", rpc.chainID), + } - err = rpc.logReceipt(tx) + // Sign transaction using Web3Signer + signedTx, err := rpc.web3signer.SignTransaction(ctx, tx) if err != nil { - rpc.log.Error("failed to log sent transaction receipt", "error", err) + return "", fmt.Errorf("failed to sign transaction with web3signer: %w", err) } - return tx.Hash().String(), nil + return signedTx, nil } -*/ func (rpc *EthRPCClient) logReceipt(tx *types.Transaction) error { var txDetails Transaction @@ -344,24 +320,18 @@ type Transaction struct { func convertHexField(tx *Transaction, field string) error { typeOfTx := reflect.TypeOf(*tx) - txValue := reflect.ValueOf(tx).Elem() - hexStr := txValue.FieldByName(field).String() - intValue, err := strconv.ParseUint(hexStr[2:], 16, 64) if err != nil { return err } decimalStr := strconv.FormatUint(intValue, 10) - _, ok := typeOfTx.FieldByName(field) if !ok { return fmt.Errorf("field %s does not exist in Transaction struct", field) } - - // Set the field value to the decimal string txValue.FieldByName(field).SetString(decimalStr) return nil diff --git a/store/precomputed_key/wvm/web3_signer_client.go b/store/precomputed_key/wvm/web3_signer_client.go index d84d6d2e..515d9279 100644 --- a/store/precomputed_key/wvm/web3_signer_client.go +++ b/store/precomputed_key/wvm/web3_signer_client.go @@ -8,7 +8,6 @@ import ( "io" "net" "net/http" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -17,30 +16,23 @@ import ( type Web3SignerClient struct { endpoint string log log.Logger - timeout time.Duration client *http.Client } -func NewWeb3SignerClient(endpoint string, log log.Logger) *Web3SignerClient { - if endpoint == "" { - return nil - } - - // default timeout 3 seconds - timeout := 3 * time.Second +func NewWeb3SignerClient(cfg *Config, log log.Logger) *Web3SignerClient { transport := &http.Transport{ DialContext: (&net.Dialer{ - Timeout: timeout, + Timeout: cfg.Timeout, }).DialContext, - TLSHandshakeTimeout: timeout, + TLSHandshakeTimeout: cfg.Timeout, } client := &http.Client{ Transport: transport, - Timeout: timeout, + Timeout: cfg.Timeout, } return &Web3SignerClient{ - endpoint: endpoint, + endpoint: cfg.Endpoint, log: log, client: client, } @@ -96,7 +88,7 @@ func (signer *Web3SignerClient) GetAccounts(ctx context.Context) ([]common.Addre for i, address := range result { signer.log.Info("addresses", "address", address) if !common.IsHexAddress(address) { - return nil, fmt.Errorf("invalid address format received: %s", string(address)) + return nil, fmt.Errorf("invalid address format received: %s", address) } addresses[i] = common.HexToAddress(address) } @@ -138,7 +130,7 @@ func (signer *Web3SignerClient) doRequest(ctx context.Context, request *SignerJS return nil, fmt.Errorf("failed to marshal request: %w", err) } - req, err := http.NewRequestWithContext(ctx, "POST", signer.endpoint, bytes.NewReader(reqBody)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, signer.endpoint, bytes.NewReader(reqBody)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } diff --git a/store/precomputed_key/wvm/wvm.go b/store/precomputed_key/wvm/wvm.go index 0ed5c828..db485dce 100644 --- a/store/precomputed_key/wvm/wvm.go +++ b/store/precomputed_key/wvm/wvm.go @@ -11,7 +11,6 @@ import ( "time" "github.com/Layr-Labs/eigenda-proxy/common" - "github.com/andybalholm/brotli" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" cache "github.com/patrickmn/go-cache" @@ -31,6 +30,8 @@ type Config struct { ArchiverAddress string // Endpoint for Web3 signer Web3SignerEndpoint string + // Timeout on WVM calls in seconds + Timeout time.Duration } // Store...wraps wvm client, ethclient and concurrent internal cache @@ -71,17 +72,20 @@ func (wvm *Store) Verify(_ context.Context, key []byte, value []byte) error { } func (wvm *Store) Put(ctx context.Context, key []byte, value []byte) error { - gas, err := wvm.rpcClient.estimateGas(ctx, wvm.cfg.ArchiverAddress, ArchivePoolAddress, value) + ctx, cancel := context.WithTimeout(ctx, wvm.cfg.Timeout) + defer cancel() + + gas, err := wvm.rpcClient.EstimateGas(ctx, wvm.cfg.ArchiverAddress, ArchivePoolAddress, value) if err != nil { return fmt.Errorf("failed to store data in wvm: failed estimate gas: %w", err) } - wvmRawTx, err := wvm.rpcClient.createRawTransaction(ctx, ArchivePoolAddress, string(value), gas) + wvmRawTx, err := wvm.rpcClient.CreateRawTransaction(ctx, ArchivePoolAddress, string(value), gas) if err != nil { return fmt.Errorf("failed to store data in wvm: failed create transaction: %w", err) } - wvmTxHash, err := wvm.rpcClient.sendRawTransaction(ctx, wvmRawTx) + wvmTxHash, err := wvm.rpcClient.SendRawTransaction(ctx, wvmRawTx) if err != nil { return fmt.Errorf("failed to store data in wvm: failed to send transaction: %w", err) } @@ -95,7 +99,10 @@ func (wvm *Store) Put(ctx context.Context, key []byte, value []byte) error { } func (wvm *Store) Get(ctx context.Context, key []byte) ([]byte, error) { - wvmTxHash, err := wvm.GetWvmTxHashByCommitment(key) + ctx, cancel := context.WithTimeout(ctx, wvm.cfg.Timeout) + defer cancel() + + wvmTxHash, err := wvm.getWvmTxHashByCommitment(key) if err != nil { return nil, err } @@ -154,7 +161,7 @@ func (wvm *Store) get(ctx context.Context, wvmTxHash string) ([]byte, error) { // GetWvmTxHashByCommitment uses commitment to get wvm tx hash from the internal map(temprorary hack) // and returns it to the caller -func (wvm *Store) GetWvmTxHashByCommitment(key []byte) (string, error) { +func (wvm *Store) getWvmTxHashByCommitment(key []byte) (string, error) { wvmTxHash, found := wvm.txCache.Get(string(key)) if !found { wvm.log.Info("wvm backend: tx hash using provided commitment NOT FOUND", "provided key", string(key)) @@ -165,51 +172,3 @@ func (wvm *Store) GetWvmTxHashByCommitment(key []byte) (string, error) { return wvmTxHash.(string), nil } - -func (wvm *Store) wvmEncode(eigenBlob []byte) ([]byte, error) { - eigenBlobLen := len(eigenBlob) - - wvm.log.Info("wvm: eigen blob received", "eigen blob size", eigenBlobLen) - - brotliOut := bytes.Buffer{} - writer := brotli.NewWriterOptions(&brotliOut, brotli.WriterOptions{Quality: 6}) - - in := bytes.NewReader(eigenBlob) - n, err := io.Copy(writer, in) - if err != nil { - return nil, fmt.Errorf("failed to read buffer to encode eigen blob: %w", err) - } - - if int(n) != len(eigenBlob) { - return nil, fmt.Errorf("size mismatch during brotli compression") - } - - if err := writer.Close(); err != nil { - return nil, fmt.Errorf("brotli writer close fail: %w", err) - } - - wvm.log.Info("wvm backend: compressed by brotli", "eigen blob size", eigenBlobLen, "eigen blob size compressed with brotli", brotliOut.Len()) - - return brotliOut.Bytes(), nil -} - -func (wvm *Store) wvmDecode(calldataBlob string) ([]byte, error) { - // trim calldata - compressedBlob, err := hex.DecodeString(calldataBlob[2:]) - if err != nil { - return nil, err - } - - wvm.log.Info("wvm backend: compressed eigen blob received for decompression", "compressed blob size", len(compressedBlob)) - - brotliReader := brotli.NewReader(bytes.NewReader(compressedBlob)) - - decompressedEncoded, err := io.ReadAll(brotliReader) - if err != nil { - return nil, fmt.Errorf("WVM: failed to decompress brotli data: %w", err) - } - - wvm.log.Info("wvm backend: eigen blob successfully decompressed", "decompressed blob size", len(compressedBlob)) - - return decompressedEncoded, nil -}