Skip to content

Commit

Permalink
feat: add usage of new bservability API from chainbridge-core to make…
Browse files Browse the repository at this point in the history
… coe less bulky
  • Loading branch information
P1sar committed Sep 8, 2023
1 parent d08b0e2 commit ddc7eb9
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 326 deletions.
11 changes: 5 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
coreListener "github.com/ChainSafe/chainbridge-core/chains/evm/listener"
"github.com/ChainSafe/chainbridge-core/crypto/secp256k1"
"github.com/ChainSafe/chainbridge-core/flags"
"github.com/ChainSafe/chainbridge-core/logger"
"github.com/ChainSafe/chainbridge-core/lvldb"
"github.com/ChainSafe/chainbridge-core/opentelemetry"
"github.com/ChainSafe/chainbridge-core/observability"
"github.com/ChainSafe/chainbridge-core/relayer"
"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/ChainSafe/chainbridge-core/store"
Expand Down Expand Up @@ -76,7 +75,7 @@ func Run() error {
panicOnError(err)
}

logger.ConfigureLogger(configuration.RelayerConfig.LogLevel, os.Stdout)
observability.ConfigureLogger(configuration.RelayerConfig.LogLevel, os.Stdout)

log.Info().Msg("Successfully loaded configuration")

Expand Down Expand Up @@ -134,8 +133,8 @@ func Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

OTLPResource := opentelemetry.InitResource(fmt.Sprintf("Relayer-%s", configuration.RelayerConfig.Id), configuration.RelayerConfig.Env)
mp, err := opentelemetry.InitMetricProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
OTLPResource := observability.InitResource(fmt.Sprintf("Relayer-%s", configuration.RelayerConfig.Id), configuration.RelayerConfig.Env)
mp, err := observability.InitMetricProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
Expand All @@ -149,7 +148,7 @@ func Run() error {
panic(err)
}

tp, err := opentelemetry.InitTracesProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
tp, err := observability.InitTracesProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
Expand Down
20 changes: 15 additions & 5 deletions chains/evm/calls/contracts/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,23 @@ func NewBridgeContract(
}

func (c *BridgeContract) deposit(
ctx context.Context,
resourceID types.ResourceID,
destDomainID uint8,
data []byte,
feeData []byte,
opts transactor.TransactOptions,
) (*common.Hash, error) {
return c.ExecuteTransaction(
ctx,
"deposit",
opts,
destDomainID, resourceID, data, feeData,
)
}

func (c *BridgeContract) Erc20Deposit(
ctx context.Context,
recipient []byte,
amount *big.Int,
resourceID types.ResourceID,
Expand All @@ -89,7 +92,7 @@ func (c *BridgeContract) Erc20Deposit(
data = deposit.ConstructErc20DepositDataWithPriority(recipient, amount, opts.Priority)
}

txHash, err := c.deposit(resourceID, destDomainID, data, feeData, opts)
txHash, err := c.deposit(ctx, resourceID, destDomainID, data, feeData, opts)
if err != nil {
log.Error().Err(err)
return nil, err
Expand All @@ -98,6 +101,7 @@ func (c *BridgeContract) Erc20Deposit(
}

func (c *BridgeContract) Erc721Deposit(
ctx context.Context,
tokenId *big.Int,
metadata string,
recipient common.Address,
Expand All @@ -120,7 +124,7 @@ func (c *BridgeContract) Erc721Deposit(
data = deposit.ConstructErc721DepositDataWithPriority(recipient.Bytes(), tokenId, []byte(metadata), opts.Priority)
}

txHash, err := c.deposit(resourceID, destDomainID, data, feeData, opts)
txHash, err := c.deposit(ctx, resourceID, destDomainID, data, feeData, opts)
if err != nil {
log.Error().Err(err)
return nil, err
Expand All @@ -129,6 +133,7 @@ func (c *BridgeContract) Erc721Deposit(
}

func (c *BridgeContract) GenericDeposit(
ctx context.Context,
metadata []byte,
resourceID types.ResourceID,
destDomainID uint8,
Expand All @@ -142,7 +147,7 @@ func (c *BridgeContract) GenericDeposit(
Msgf("Generic deposit")
data := deposit.ConstructGenericDepositData(metadata)

txHash, err := c.deposit(resourceID, destDomainID, data, feeData, opts)
txHash, err := c.deposit(ctx, resourceID, destDomainID, data, feeData, opts)
if err != nil {
log.Error().Err(err)
return nil, err
Expand All @@ -151,6 +156,7 @@ func (c *BridgeContract) GenericDeposit(
}

func (c *BridgeContract) PermissionlessGenericDeposit(
ctx context.Context,
metadata []byte,
executeFunctionSig string,
executeContractAddress *common.Address,
Expand All @@ -168,6 +174,7 @@ func (c *BridgeContract) PermissionlessGenericDeposit(
Msgf("Permissionless Generic deposit")
data := ConstructPermissionlessGenericDepositData(metadata, []byte(executeFunctionSig), executeContractAddress.Bytes(), depositor.Bytes(), maxFee)
txHash, err := c.deposit(
ctx,
resourceID,
destDomainID,
data,
Expand All @@ -182,6 +189,7 @@ func (c *BridgeContract) PermissionlessGenericDeposit(
}

func (c *BridgeContract) ExecuteProposal(
ctx context.Context,
proposal *chains.Proposal,
signature []byte,
opts transactor.TransactOptions,
Expand All @@ -191,6 +199,7 @@ func (c *BridgeContract) ExecuteProposal(
Str("resourceID", hexutil.Encode(proposal.ResourceID[:])).
Msgf("Execute proposal")
return c.ExecuteTransaction(
ctx,
"executeProposal",
opts,
proposal.OriginDomainID, proposal.DepositNonce, proposal.Data, proposal.ResourceID, signature,
Expand All @@ -214,6 +223,7 @@ func (c *BridgeContract) ExecuteProposals(
}

return c.ExecuteTransaction(
ctx,
"executeProposals",
opts,
bridgeProposals,
Expand Down Expand Up @@ -254,7 +264,7 @@ func (c *BridgeContract) GetHandlerAddressForResourceID(
return out, nil
}

func (c *BridgeContract) Retry(hash common.Hash, opts transactor.TransactOptions) (*common.Hash, error) {
func (c *BridgeContract) Retry(ctx context.Context, hash common.Hash, opts transactor.TransactOptions) (*common.Hash, error) {
log.Debug().Msgf("Retrying deposit from transaction: %s", hash.Hex())
return c.ExecuteTransaction("retry", opts, hash.Hex())
return c.ExecuteTransaction(ctx, "retry", opts, hash.Hex())
}
52 changes: 17 additions & 35 deletions chains/evm/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package executor
import (
"context"
"fmt"
"github.com/ChainSafe/chainbridge-core/observability"
"math/big"
"sync"
"time"
Expand All @@ -20,12 +21,8 @@ import (
"github.com/binance-chain/tss-lib/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
traceapi "go.opentelemetry.io/otel/trace"
)

var (
Expand Down Expand Up @@ -77,31 +74,26 @@ func NewExecutor(
func (e *Executor) Execute(ctx context.Context, msgs []*message.Message) error {
e.exitLock.RLock()
defer e.exitLock.RUnlock()
ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.Execute")
ctx, span, logger := observability.CreateSpanAndLoggerFromContext(ctx, "sygma-relayer", "relayer.sygma.evm.Execute")
defer span.End()
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()

proposals := make([]*chains.Proposal, 0)
for _, m := range msgs {
logger.Debug().Str("msg.id", m.ID()).Msgf("Message to execute %s", m.String())
span.AddEvent("Message to execute received", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.full", m.String())))
observability.LogAndEvent(logger.Debug(), span, "Message to execute received", attribute.String("msg.id", m.ID()), attribute.String("msg.full", m.String()))
prop, err := e.mh.HandleMessage(m)
if err != nil {
return fmt.Errorf("failed to handle message %s with error: %w", m.String(), err)
return observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to handle message")
}
evmProposal := chains.NewProposal(prop.Source, prop.Destination, prop.DepositNonce, prop.ResourceId, prop.Data, prop.Metadata)
isExecuted, err := e.bridge.IsProposalExecuted(evmProposal)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
return observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to call IsProposalExecuted")
}
if isExecuted {
logger.Info().Str("msg.id", m.ID()).Msgf("Message already executed %s", m.String())
span.AddEvent("Message already executed", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.full", m.String())))
observability.LogAndEvent(logger.Info(), span, "Message already executed")
continue
}
logger.Info().Str("msg.id", m.ID()).Msgf("Executing message %s", m.String())
span.AddEvent("Executing message", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.full", m.String())))
observability.LogAndEvent(logger.Info(), span, "Executing message")
proposals = append(proposals, evmProposal)
}
if len(proposals) == 0 {
Expand All @@ -110,13 +102,11 @@ func (e *Executor) Execute(ctx context.Context, msgs []*message.Message) error {

propHash, err := e.bridge.ProposalsHash(proposals)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
return observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to build ProposalsHash")
}

sessionID := e.sessionID(propHash)

span.AddEvent("SessionID created", traceapi.WithAttributes(attribute.String("tss.session.id", sessionID)))
observability.SetAttrsToSpanAnLogger(&logger, span, attribute.String("tss.session.id", sessionID))

msg := big.NewInt(0)
msg.SetBytes(propHash)
Expand All @@ -127,8 +117,7 @@ func (e *Executor) Execute(ctx context.Context, msgs []*message.Message) error {
e.comm,
e.fetcher)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
return observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to create NewSigning")
}

sigChn := make(chan interface{})
Expand All @@ -147,9 +136,8 @@ func (e *Executor) Execute(ctx context.Context, msgs []*message.Message) error {
}

func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, proposals []*chains.Proposal, sigChn chan interface{}, sessionID string) error {
ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.watchExecution")
ctx, span, logger := observability.CreateSpanAndLoggerFromContext(ctx, "sygma-relayer", "relayer.sygma.evm.watchExecution")
defer span.End()
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()
ticker := time.NewTicker(executionCheckPeriod)
timeout := time.NewTicker(signingTimeout)
defer ticker.Stop()
Expand All @@ -169,26 +157,22 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
hash, err := e.executeProposal(ctx, proposals, signatureData)
if err != nil {
_ = e.comm.Broadcast(ctx, e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
span.SetStatus(codes.Error, fmt.Errorf("executing proposel has failed %w", err).Error())
return err
return observability.LogAndRecordErrorWithStatus(nil, span, err, "executing proposal has failed")
}

logger.Info().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash)
observability.LogAndEvent(logger.Info(), span, "Sent proposals execution with", attribute.String("tx.hash", hash.String()), attribute.String("tss.session.id", sessionID))
}
case <-ticker.C:
{
if !e.areProposalsExecuted(proposals, sessionID) {
continue
}

logger.Debug().Str("SessionID", sessionID).Msgf("Successfully executed proposals")
span.AddEvent("Proposals executed", traceapi.WithAttributes(attribute.String("tss.session.id", sessionID)))
observability.LogAndEvent(logger.Info(), span, "Proposals executed", attribute.String("tss.session.id", sessionID))
return nil
}
case <-timeout.C:
{
span.SetStatus(codes.Error, fmt.Errorf("execution timed out in %s", signingTimeout).Error())
return fmt.Errorf("execution timed out in %s", signingTimeout)
return observability.LogAndRecordErrorWithStatus(nil, span, fmt.Errorf("execution timed out in %s", signingTimeout), "failed to watchExecution")
}
case <-ctx.Done():
{
Expand All @@ -199,7 +183,7 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}

func (e *Executor) executeProposal(ctx context.Context, proposals []*chains.Proposal, signatureData *common.SignatureData) (*ethCommon.Hash, error) {
ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.executeProposal")
ctx, span, _ := observability.CreateSpanAndLoggerFromContext(ctx, "sygma-relayer", "relayer.sygma.evm.executeProposal")
defer span.End()
sig := []byte{}
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.R, 32)...)
Expand All @@ -217,10 +201,8 @@ func (e *Executor) executeProposal(ctx context.Context, proposals []*chains.Prop
GasLimit: gasLimit,
})
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, err
return nil, observability.LogAndRecordErrorWithStatus(nil, span, err, "failed to ExecuteProposals")
}
span.AddEvent("Proposal execution sent", traceapi.WithAttributes(attribute.String("tx.hash", hash.String())))
return hash, err
}

Expand Down
Loading

0 comments on commit ddc7eb9

Please sign in to comment.