Skip to content

Commit

Permalink
expand metrics and improve http client configuration (#24)
Browse files Browse the repository at this point in the history
* expand metrics and improve http client configuration

* update x/crypto dependency

* expand metrics

* open pprof endpoint

* cache sbundle fetching for spikes

* fix worker bug & improve default bundle lifetime

* increase failure metrics only on real errors

* fix lint

* go mod tidy
  • Loading branch information
TymKh authored Jan 23, 2024
1 parent f3cd2b4 commit 86001fb
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ linters:
- wsl
- deadcode
- varcheck
- exhaustruct
- nolintlint

#
# Disabled because of generics:
Expand Down
16 changes: 12 additions & 4 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -164,12 +165,12 @@ func main() {

cachingEthBackend := mevshare.NewCachingEthClient(ethBackend)

api := mevshare.NewAPI(logger, simQueue, dbBackend, cachingEthBackend, signer, simBackends, rate.Limit(rateLimit), buildersBackend, cancelCache)
api := mevshare.NewAPI(logger, simQueue, dbBackend, cachingEthBackend, signer, simBackends, rate.Limit(rateLimit), buildersBackend, cancelCache, time.Millisecond*60)

jsonRPCServer, err := jsonrpcserver.NewHandler(jsonrpcserver.Methods{
"mev_sendBundle": api.SendBundle,
"mev_simBundle": api.SimBundle,
"mev_cancelBundleByHash": api.CancelBundleByHash,
mevshare.SendBundleEndpointName: api.SendBundle,
mevshare.SimBundleEndpointName: api.SimBundle,
mevshare.CancelBundleByHashEndpointName: api.CancelBundleByHash,
})
if err != nil {
logger.Fatal("Failed to create jsonrpc server", zap.Error(err))
Expand All @@ -186,11 +187,18 @@ func main() {
metrics.WritePrometheus(w, true)
})
go func() {
metricsMux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
metricsMux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
metricsMux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
metricsMux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
metricsMux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))

metricsServer := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%s", defaultMetricsPort),
ReadHeaderTimeout: 5 * time.Second,
Handler: metricsMux,
}

err := metricsServer.ListenAndServe()
if err != nil {
logger.Fatal("Failed to start metrics server", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ require (
github.com/flashbots/go-utils v0.4.11
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/redis/go-redis/v9 v9.0.2
github.com/stretchr/testify v1.8.1
github.com/ybbus/jsonrpc/v3 v3.1.4
go.uber.org/zap v1.23.0
golang.org/x/crypto v0.9.0
golang.org/x/crypto v0.17.0
golang.org/x/time v0.3.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
8 changes: 5 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxd
github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A=
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -130,15 +132,15 @@ go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
Expand Down
70 changes: 64 additions & 6 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,43 @@
// Package metrics contains all application-logic metrics
package metrics

import "github.com/VictoriaMetrics/metrics"
import (
"fmt"

"github.com/VictoriaMetrics/metrics"
)

var (
sbundlesReceived = metrics.NewCounter("sbundles_received_total")
sbundlesReceivedValid = metrics.NewCounter("sbundles_received_valid_total")
sbundlesReceivedStale = metrics.NewCounter("sbundles_received_stale_total")
queueFullSbundles = metrics.NewCounter("sbundles_queue_full_total")
queuePopstaleItemSbundles = metrics.NewCounter("sbundles_queue_pop_stale_item_total")
sbundlesReceived = metrics.NewCounter("sbundles_received_total")
sbundlesReceivedValid = metrics.NewCounter("sbundles_received_valid_total")
sbundlesReceivedStale = metrics.NewCounter("sbundles_received_stale_total")
queueFullSbundles = metrics.NewCounter("sbundles_queue_full_total")
queuePopstaleItemSbundles = metrics.NewCounter("sbundles_queue_pop_stale_item_total")
sbundleProcessDurationSummary = metrics.NewSummary("sbundle_process_duration_milliseconds")
sbundleValidationDurationSummary = metrics.NewSummary("sbundle_validation_duration_milliseconds")
sbundleFetchUnmatchedDurationSummary = metrics.NewSummary("sbundle_fetch_unmatched_duration_milliseconds")
sbundleAddQueueDurationSummary = metrics.NewSummary("sbundle_add_queue_milliseconds")
)

const (
sbundleRPCCallDurationLabel = `sbundle_rpc_call_duration_milliseconds{method="%s"}`
sbundleRPCCallErrorCounterLabel = `sbundle_rpc_call_error_total{method="%s"}`

sbundleSentToBuilderLabel = `bundle_sent_to_builder_total{builder="%s"}`
sbundleSentToBuilderFailureLabel = `bundle_sent_to_builder_failure_total{builder="%s"}`
sbundleSentToBuilderDurationSummaryLabel = `bundle_sent_to_builder_duration_milliseconds{builder="%s"}`
)

func RecordRPCCallDuration(method string, duration int64) {
l := fmt.Sprintf(sbundleRPCCallDurationLabel, method)
metrics.GetOrCreateSummary(l).Update(float64(duration))
}

func IncRPCCallFailure(method string) {
l := fmt.Sprintf(sbundleRPCCallErrorCounterLabel, method)
metrics.GetOrCreateCounter(l).Inc()
}

func IncSbundlesReceived() {
sbundlesReceived.Inc()
}
Expand All @@ -30,3 +57,34 @@ func IncQueueFullSbundles() {
func IncQueuePopStaleItemSbundles() {
queuePopstaleItemSbundles.Inc()
}

func IncBundleSentToBuilder(builder string) {
l := fmt.Sprintf(sbundleSentToBuilderLabel, builder)
metrics.GetOrCreateCounter(l).Inc()
}

func IncBundleSentToBuilderFailure(builder string) {
l := fmt.Sprintf(sbundleSentToBuilderFailureLabel, builder)
metrics.GetOrCreateCounter(l).Inc()
}

func RecordBundleSentToBuilderTime(endpoint string, duration int64) {
l := fmt.Sprintf(sbundleSentToBuilderDurationSummaryLabel, endpoint)
metrics.GetOrCreateSummary(l).Update(float64(duration))
}

func RecordBundleProcessDuration(duration int64) {
sbundleProcessDurationSummary.Update(float64(duration))
}

func RecordBundleValidationDuration(duration int64) {
sbundleValidationDurationSummary.Update(float64(duration))
}

func RecordBundleFetchUnmatchedDuration(duration int64) {
sbundleFetchUnmatchedDurationSummary.Update(float64(duration))
}

func RecordBundleAddQueueDuration(duration int64) {
sbundleAddQueueDurationSummary.Update(float64(duration))
}
66 changes: 50 additions & 16 deletions mevshare/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/flashbots/mev-share-node/jsonrpcserver"
"github.com/flashbots/mev-share-node/metrics"
"github.com/flashbots/mev-share-node/spike"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -55,6 +56,7 @@ type API struct {
simRateLimiter *rate.Limiter
builders BuildersBackend

spikeManager *spike.Manager[*SendMevBundleArgs]
knownBundleCache *lru.Cache[common.Hash, SendMevBundleArgs]
cancellationCache *RedisCancellationCache
}
Expand All @@ -63,18 +65,23 @@ func NewAPI(
log *zap.Logger,
scheduler SimScheduler, bundleStorage BundleStorage, eth EthClient, signer types.Signer,
simBackends []SimulationBackend, simRateLimit rate.Limit, builders BuildersBackend, cancellationCache *RedisCancellationCache,
sbundleValidDuration time.Duration,
) *API {
sm := spike.NewManager(func(ctx context.Context, k string) (*SendMevBundleArgs, error) {
return bundleStorage.GetBundleByMatchingHash(ctx, common.HexToHash(k))
}, sbundleValidDuration)

return &API{
log: log,

scheduler: scheduler,
bundleStorage: bundleStorage,
eth: eth,
signer: signer,
simBackends: simBackends,
simRateLimiter: rate.NewLimiter(simRateLimit, 1),
builders: builders,

scheduler: scheduler,
bundleStorage: bundleStorage,
eth: eth,
signer: signer,
simBackends: simBackends,
simRateLimiter: rate.NewLimiter(simRateLimit, 1),
builders: builders,
spikeManager: sm,
knownBundleCache: lru.NewCache[common.Hash, SendMevBundleArgs](bundleCacheSize),
cancellationCache: cancellationCache,
}
Expand All @@ -91,12 +98,18 @@ func findAndReplace(strs []common.Hash, old, replacer common.Hash) bool {
return found
}

func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMevBundleResponse, error) {
func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (_ SendMevBundleResponse, err error) {
logger := m.log

startAt := time.Now()
defer func() {
metrics.RecordRPCCallDuration(SendBundleEndpointName, time.Since(startAt).Milliseconds())
}()
metrics.IncSbundlesReceived()

validateBundleTime := time.Now()
currentBlock, err := m.eth.BlockNumber(ctx)
if err != nil {
metrics.IncRPCCallFailure(SendBundleEndpointName)
logger.Error("failed to get current block", zap.Error(err))
return SendMevBundleResponse{}, ErrInternalServiceError
}
Expand Down Expand Up @@ -124,16 +137,21 @@ func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMev
bundle.Metadata.OriginID = origin
bundle.Metadata.Prematched = !hasUnmatchedHash

metrics.RecordBundleValidationDuration(time.Since(validateBundleTime).Milliseconds())

if hasUnmatchedHash {
var unmatchedHash common.Hash
if len(bundle.Body) > 0 && bundle.Body[0].Hash != nil {
unmatchedHash = *bundle.Body[0].Hash
} else {
return SendMevBundleResponse{}, ErrInternalServiceError
}

unmatchedBundle, err := m.bundleStorage.GetBundleByMatchingHash(ctx, unmatchedHash)
fetchUnmatchedTime := time.Now()
unmatchedBundle, err := m.spikeManager.GetResult(ctx, unmatchedHash.String())
metrics.RecordBundleFetchUnmatchedDuration(time.Since(fetchUnmatchedTime).Milliseconds())
if err != nil {
logger.Error("Failed to fetch unmatched bundle", zap.Error(err), zap.String("matching_hash", unmatchedHash.Hex()))
metrics.IncRPCCallFailure(SendBundleEndpointName)
return SendMevBundleResponse{}, ErrBackrunNotFound
}
if privacy := unmatchedBundle.Privacy; privacy == nil && privacy.Hints.HasHint(HintHash) {
Expand Down Expand Up @@ -162,6 +180,7 @@ func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMev
highPriority := jsonrpcserver.GetPriority(ctx)
err = m.scheduler.ScheduleBundleSimulation(ctx, &bundle, highPriority)
if err != nil {
metrics.IncRPCCallFailure(SendBundleEndpointName)
logger.Error("Failed to schedule bundle simulation", zap.Error(err))
return SendMevBundleResponse{}, ErrInternalServiceError
}
Expand All @@ -171,7 +190,15 @@ func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMev
}, nil
}

func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMevBundleAuxArgs) (*SimMevBundleResponse, error) {
func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMevBundleAuxArgs) (_ *SimMevBundleResponse, err error) {
startAt := time.Now()
defer func() {
metrics.RecordRPCCallDuration(SimBundleEndpointName, time.Since(startAt).Milliseconds())
if err != nil {
metrics.IncRPCCallFailure(SimBundleEndpointName)
}
}()

if len(m.simBackends) == 0 {
return nil, ErrInternalServiceError
}
Expand All @@ -181,7 +208,7 @@ func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMe
simTimeout := int64(simBundleTimeout / time.Millisecond)
aux.Timeout = &simTimeout

err := m.simRateLimiter.Wait(ctx)
err = m.simRateLimiter.Wait(ctx)
if err != nil {
return nil, err
}
Expand All @@ -195,12 +222,19 @@ func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMe
// CancelBundleByHash cancels a bundle by hash
// This method is not exposed on the bundle relay.
// However, it is used by the Flashbots bundle relay for now to handle the cancellation of private transactions.
func (m *API) CancelBundleByHash(ctx context.Context, hash common.Hash) error {
func (m *API) CancelBundleByHash(ctx context.Context, hash common.Hash) (err error) {
startAt := time.Now()
defer func() {
metrics.RecordRPCCallDuration(CancelBundleByHashEndpointName, time.Since(startAt).Milliseconds())
if err != nil {
metrics.IncRPCCallFailure(CancelBundleByHashEndpointName)
}
}()
logger := m.log.With(zap.String("bundle", hash.Hex()))
ctx, cancel := context.WithTimeout(ctx, cancelBundleTimeout)
defer cancel()
signerAddress := jsonrpcserver.GetSigner(ctx)
err := m.bundleStorage.CancelBundleByHash(ctx, hash, signerAddress)
err = m.bundleStorage.CancelBundleByHash(ctx, hash, signerAddress)
if err != nil {
if !errors.Is(err, ErrBundleNotCancelled) {
logger.Warn("Failed to cancel bundle", zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions mevshare/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type JSONRPCSimulationBackend struct {
func NewJSONRPCSimulationBackend(url string) *JSONRPCSimulationBackend {
return &JSONRPCSimulationBackend{
client: jsonrpc.NewClient(url),
// todo here use optsx
}
}

Expand Down
27 changes: 25 additions & 2 deletions mevshare/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package mevshare
import (
"context"
"errors"
"net"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/mev-share-node/metrics"
"github.com/ybbus/jsonrpc/v3"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -80,7 +83,19 @@ func LoadBuilderConfig(file string) (BuildersBackend, error) {
}

cl := jsonrpc.NewClientWithOpts(builder.URL, &jsonrpc.RPCClientOpts{
HTTPClient: nil,
HTTPClient: &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 20, // since we have one client per host we may keep it pretty low
MaxIdleConnsPerHost: 20,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
CustomHeaders: customHeaders,
AllowUnknownFields: false,
DefaultRequestID: 0,
Expand Down Expand Up @@ -115,7 +130,15 @@ type JSONRPCBuilderBackend struct {
API BuilderAPI
}

func (b *JSONRPCBuilderBackend) SendBundle(ctx context.Context, bundle *SendMevBundleArgs) error {
func (b *JSONRPCBuilderBackend) SendBundle(ctx context.Context, bundle *SendMevBundleArgs) (err error) {
startAt := time.Now()
metrics.IncBundleSentToBuilder(b.Name)
defer func() {
metrics.RecordBundleSentToBuilderTime(b.Name, time.Since(startAt).Milliseconds())
if err != nil {
metrics.IncBundleSentToBuilderFailure(b.Name)
}
}()
switch b.API {
case BuilderAPIRefundRecipient:
refRec, err := ConvertBundleToRefundRecipient(bundle)
Expand Down
Loading

0 comments on commit 86001fb

Please sign in to comment.