From 86001fb5547b2d1121b9252d66b6a4fd326c5f4a Mon Sep 17 00:00:00 2001 From: TymKh Date: Tue, 23 Jan 2024 16:06:39 +0100 Subject: [PATCH] expand metrics and improve http client configuration (#24) * expand metrics and improve http client configuration * update x/crypto dependency * expand metrics * open pprof endpoint * cache sbundle fetching for spikes * fix worker bug & improve default bundle lifetime * increase failure metrics only on real errors * fix lint * go mod tidy --- .golangci.yaml | 2 + cmd/node/main.go | 16 +++-- go.mod | 3 +- go.sum | 8 ++- metrics/metrics.go | 70 ++++++++++++++++-- mevshare/api.go | 66 ++++++++++++----- mevshare/backend.go | 1 + mevshare/builders.go | 27 ++++++- mevshare/sim_queue.go | 25 ++++--- mevshare/types.go | 6 ++ spike/manager.go | 163 ++++++++++++++++++++++++++++++++++++++++++ spike/manager_test.go | 130 +++++++++++++++++++++++++++++++++ 12 files changed, 477 insertions(+), 40 deletions(-) create mode 100644 spike/manager.go create mode 100644 spike/manager_test.go diff --git a/.golangci.yaml b/.golangci.yaml index cb08659..22a479d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -26,6 +26,8 @@ linters: - wsl - deadcode - varcheck + - exhaustruct + - nolintlint # # Disabled because of generics: diff --git a/cmd/node/main.go b/cmd/node/main.go index 7ae0e3a..9b2e255 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "net/http" + "net/http/pprof" "os" "os/signal" "strconv" @@ -164,12 +165,12 @@ func main() { cachingEthBackend := mevshare.NewCachingEthClient(ethBackend) - api := mevshare.NewAPI(logger, simQueue, dbBackend, cachingEthBackend, signer, simBackends, rate.Limit(rateLimit), buildersBackend, cancelCache) + api := mevshare.NewAPI(logger, simQueue, dbBackend, cachingEthBackend, signer, simBackends, rate.Limit(rateLimit), buildersBackend, cancelCache, time.Millisecond*60) jsonRPCServer, err := jsonrpcserver.NewHandler(jsonrpcserver.Methods{ - "mev_sendBundle": api.SendBundle, - "mev_simBundle": api.SimBundle, - "mev_cancelBundleByHash": api.CancelBundleByHash, + mevshare.SendBundleEndpointName: api.SendBundle, + mevshare.SimBundleEndpointName: api.SimBundle, + mevshare.CancelBundleByHashEndpointName: api.CancelBundleByHash, }) if err != nil { logger.Fatal("Failed to create jsonrpc server", zap.Error(err)) @@ -186,11 +187,18 @@ func main() { metrics.WritePrometheus(w, true) }) go func() { + metricsMux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + metricsMux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + metricsMux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + metricsMux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + metricsMux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + metricsServer := &http.Server{ Addr: fmt.Sprintf("0.0.0.0:%s", defaultMetricsPort), ReadHeaderTimeout: 5 * time.Second, Handler: metricsMux, } + err := metricsServer.ListenAndServe() if err != nil { logger.Fatal("Failed to start metrics server", zap.Error(err)) diff --git a/go.mod b/go.mod index 2007470..1d479d6 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,12 @@ require ( github.com/flashbots/go-utils v0.4.11 github.com/jmoiron/sqlx v1.3.5 github.com/lib/pq v1.2.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/redis/go-redis/v9 v9.0.2 github.com/stretchr/testify v1.8.1 github.com/ybbus/jsonrpc/v3 v3.1.4 go.uber.org/zap v1.23.0 - golang.org/x/crypto v0.9.0 + golang.org/x/crypto v0.17.0 golang.org/x/time v0.3.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index f3b96e4..b04113a 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,8 @@ github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxd github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A= github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -130,15 +132,15 @@ go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= diff --git a/metrics/metrics.go b/metrics/metrics.go index 62ade70..5309bd8 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1,16 +1,43 @@ // Package metrics contains all application-logic metrics package metrics -import "github.com/VictoriaMetrics/metrics" +import ( + "fmt" + + "github.com/VictoriaMetrics/metrics" +) var ( - sbundlesReceived = metrics.NewCounter("sbundles_received_total") - sbundlesReceivedValid = metrics.NewCounter("sbundles_received_valid_total") - sbundlesReceivedStale = metrics.NewCounter("sbundles_received_stale_total") - queueFullSbundles = metrics.NewCounter("sbundles_queue_full_total") - queuePopstaleItemSbundles = metrics.NewCounter("sbundles_queue_pop_stale_item_total") + sbundlesReceived = metrics.NewCounter("sbundles_received_total") + sbundlesReceivedValid = metrics.NewCounter("sbundles_received_valid_total") + sbundlesReceivedStale = metrics.NewCounter("sbundles_received_stale_total") + queueFullSbundles = metrics.NewCounter("sbundles_queue_full_total") + queuePopstaleItemSbundles = metrics.NewCounter("sbundles_queue_pop_stale_item_total") + sbundleProcessDurationSummary = metrics.NewSummary("sbundle_process_duration_milliseconds") + sbundleValidationDurationSummary = metrics.NewSummary("sbundle_validation_duration_milliseconds") + sbundleFetchUnmatchedDurationSummary = metrics.NewSummary("sbundle_fetch_unmatched_duration_milliseconds") + sbundleAddQueueDurationSummary = metrics.NewSummary("sbundle_add_queue_milliseconds") +) + +const ( + sbundleRPCCallDurationLabel = `sbundle_rpc_call_duration_milliseconds{method="%s"}` + sbundleRPCCallErrorCounterLabel = `sbundle_rpc_call_error_total{method="%s"}` + + sbundleSentToBuilderLabel = `bundle_sent_to_builder_total{builder="%s"}` + sbundleSentToBuilderFailureLabel = `bundle_sent_to_builder_failure_total{builder="%s"}` + sbundleSentToBuilderDurationSummaryLabel = `bundle_sent_to_builder_duration_milliseconds{builder="%s"}` ) +func RecordRPCCallDuration(method string, duration int64) { + l := fmt.Sprintf(sbundleRPCCallDurationLabel, method) + metrics.GetOrCreateSummary(l).Update(float64(duration)) +} + +func IncRPCCallFailure(method string) { + l := fmt.Sprintf(sbundleRPCCallErrorCounterLabel, method) + metrics.GetOrCreateCounter(l).Inc() +} + func IncSbundlesReceived() { sbundlesReceived.Inc() } @@ -30,3 +57,34 @@ func IncQueueFullSbundles() { func IncQueuePopStaleItemSbundles() { queuePopstaleItemSbundles.Inc() } + +func IncBundleSentToBuilder(builder string) { + l := fmt.Sprintf(sbundleSentToBuilderLabel, builder) + metrics.GetOrCreateCounter(l).Inc() +} + +func IncBundleSentToBuilderFailure(builder string) { + l := fmt.Sprintf(sbundleSentToBuilderFailureLabel, builder) + metrics.GetOrCreateCounter(l).Inc() +} + +func RecordBundleSentToBuilderTime(endpoint string, duration int64) { + l := fmt.Sprintf(sbundleSentToBuilderDurationSummaryLabel, endpoint) + metrics.GetOrCreateSummary(l).Update(float64(duration)) +} + +func RecordBundleProcessDuration(duration int64) { + sbundleProcessDurationSummary.Update(float64(duration)) +} + +func RecordBundleValidationDuration(duration int64) { + sbundleValidationDurationSummary.Update(float64(duration)) +} + +func RecordBundleFetchUnmatchedDuration(duration int64) { + sbundleFetchUnmatchedDurationSummary.Update(float64(duration)) +} + +func RecordBundleAddQueueDuration(duration int64) { + sbundleAddQueueDurationSummary.Update(float64(duration)) +} diff --git a/mevshare/api.go b/mevshare/api.go index b582d4d..90bac5b 100644 --- a/mevshare/api.go +++ b/mevshare/api.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/flashbots/mev-share-node/jsonrpcserver" "github.com/flashbots/mev-share-node/metrics" + "github.com/flashbots/mev-share-node/spike" "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -55,6 +56,7 @@ type API struct { simRateLimiter *rate.Limiter builders BuildersBackend + spikeManager *spike.Manager[*SendMevBundleArgs] knownBundleCache *lru.Cache[common.Hash, SendMevBundleArgs] cancellationCache *RedisCancellationCache } @@ -63,18 +65,23 @@ func NewAPI( log *zap.Logger, scheduler SimScheduler, bundleStorage BundleStorage, eth EthClient, signer types.Signer, simBackends []SimulationBackend, simRateLimit rate.Limit, builders BuildersBackend, cancellationCache *RedisCancellationCache, + sbundleValidDuration time.Duration, ) *API { + sm := spike.NewManager(func(ctx context.Context, k string) (*SendMevBundleArgs, error) { + return bundleStorage.GetBundleByMatchingHash(ctx, common.HexToHash(k)) + }, sbundleValidDuration) + return &API{ log: log, - scheduler: scheduler, - bundleStorage: bundleStorage, - eth: eth, - signer: signer, - simBackends: simBackends, - simRateLimiter: rate.NewLimiter(simRateLimit, 1), - builders: builders, - + scheduler: scheduler, + bundleStorage: bundleStorage, + eth: eth, + signer: signer, + simBackends: simBackends, + simRateLimiter: rate.NewLimiter(simRateLimit, 1), + builders: builders, + spikeManager: sm, knownBundleCache: lru.NewCache[common.Hash, SendMevBundleArgs](bundleCacheSize), cancellationCache: cancellationCache, } @@ -91,12 +98,18 @@ func findAndReplace(strs []common.Hash, old, replacer common.Hash) bool { return found } -func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMevBundleResponse, error) { +func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (_ SendMevBundleResponse, err error) { logger := m.log - + startAt := time.Now() + defer func() { + metrics.RecordRPCCallDuration(SendBundleEndpointName, time.Since(startAt).Milliseconds()) + }() metrics.IncSbundlesReceived() + + validateBundleTime := time.Now() currentBlock, err := m.eth.BlockNumber(ctx) if err != nil { + metrics.IncRPCCallFailure(SendBundleEndpointName) logger.Error("failed to get current block", zap.Error(err)) return SendMevBundleResponse{}, ErrInternalServiceError } @@ -124,6 +137,8 @@ func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMev bundle.Metadata.OriginID = origin bundle.Metadata.Prematched = !hasUnmatchedHash + metrics.RecordBundleValidationDuration(time.Since(validateBundleTime).Milliseconds()) + if hasUnmatchedHash { var unmatchedHash common.Hash if len(bundle.Body) > 0 && bundle.Body[0].Hash != nil { @@ -131,9 +146,12 @@ func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMev } else { return SendMevBundleResponse{}, ErrInternalServiceError } - - unmatchedBundle, err := m.bundleStorage.GetBundleByMatchingHash(ctx, unmatchedHash) + fetchUnmatchedTime := time.Now() + unmatchedBundle, err := m.spikeManager.GetResult(ctx, unmatchedHash.String()) + metrics.RecordBundleFetchUnmatchedDuration(time.Since(fetchUnmatchedTime).Milliseconds()) if err != nil { + logger.Error("Failed to fetch unmatched bundle", zap.Error(err), zap.String("matching_hash", unmatchedHash.Hex())) + metrics.IncRPCCallFailure(SendBundleEndpointName) return SendMevBundleResponse{}, ErrBackrunNotFound } if privacy := unmatchedBundle.Privacy; privacy == nil && privacy.Hints.HasHint(HintHash) { @@ -162,6 +180,7 @@ func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMev highPriority := jsonrpcserver.GetPriority(ctx) err = m.scheduler.ScheduleBundleSimulation(ctx, &bundle, highPriority) if err != nil { + metrics.IncRPCCallFailure(SendBundleEndpointName) logger.Error("Failed to schedule bundle simulation", zap.Error(err)) return SendMevBundleResponse{}, ErrInternalServiceError } @@ -171,7 +190,15 @@ func (m *API) SendBundle(ctx context.Context, bundle SendMevBundleArgs) (SendMev }, nil } -func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMevBundleAuxArgs) (*SimMevBundleResponse, error) { +func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMevBundleAuxArgs) (_ *SimMevBundleResponse, err error) { + startAt := time.Now() + defer func() { + metrics.RecordRPCCallDuration(SimBundleEndpointName, time.Since(startAt).Milliseconds()) + if err != nil { + metrics.IncRPCCallFailure(SimBundleEndpointName) + } + }() + if len(m.simBackends) == 0 { return nil, ErrInternalServiceError } @@ -181,7 +208,7 @@ func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMe simTimeout := int64(simBundleTimeout / time.Millisecond) aux.Timeout = &simTimeout - err := m.simRateLimiter.Wait(ctx) + err = m.simRateLimiter.Wait(ctx) if err != nil { return nil, err } @@ -195,12 +222,19 @@ func (m *API) SimBundle(ctx context.Context, bundle SendMevBundleArgs, aux SimMe // CancelBundleByHash cancels a bundle by hash // This method is not exposed on the bundle relay. // However, it is used by the Flashbots bundle relay for now to handle the cancellation of private transactions. -func (m *API) CancelBundleByHash(ctx context.Context, hash common.Hash) error { +func (m *API) CancelBundleByHash(ctx context.Context, hash common.Hash) (err error) { + startAt := time.Now() + defer func() { + metrics.RecordRPCCallDuration(CancelBundleByHashEndpointName, time.Since(startAt).Milliseconds()) + if err != nil { + metrics.IncRPCCallFailure(CancelBundleByHashEndpointName) + } + }() logger := m.log.With(zap.String("bundle", hash.Hex())) ctx, cancel := context.WithTimeout(ctx, cancelBundleTimeout) defer cancel() signerAddress := jsonrpcserver.GetSigner(ctx) - err := m.bundleStorage.CancelBundleByHash(ctx, hash, signerAddress) + err = m.bundleStorage.CancelBundleByHash(ctx, hash, signerAddress) if err != nil { if !errors.Is(err, ErrBundleNotCancelled) { logger.Warn("Failed to cancel bundle", zap.Error(err)) diff --git a/mevshare/backend.go b/mevshare/backend.go index c02f0f4..e1dd02a 100644 --- a/mevshare/backend.go +++ b/mevshare/backend.go @@ -32,6 +32,7 @@ type JSONRPCSimulationBackend struct { func NewJSONRPCSimulationBackend(url string) *JSONRPCSimulationBackend { return &JSONRPCSimulationBackend{ client: jsonrpc.NewClient(url), + // todo here use optsx } } diff --git a/mevshare/builders.go b/mevshare/builders.go index 00f82df..200503d 100644 --- a/mevshare/builders.go +++ b/mevshare/builders.go @@ -3,6 +3,8 @@ package mevshare import ( "context" "errors" + "net" + "net/http" "os" "strings" "sync" @@ -10,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/flashbots/mev-share-node/metrics" "github.com/ybbus/jsonrpc/v3" "go.uber.org/zap" "gopkg.in/yaml.v3" @@ -80,7 +83,19 @@ func LoadBuilderConfig(file string) (BuildersBackend, error) { } cl := jsonrpc.NewClientWithOpts(builder.URL, &jsonrpc.RPCClientOpts{ - HTTPClient: nil, + HTTPClient: &http.Client{ + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 20, // since we have one client per host we may keep it pretty low + MaxIdleConnsPerHost: 20, + IdleConnTimeout: 30 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + }, CustomHeaders: customHeaders, AllowUnknownFields: false, DefaultRequestID: 0, @@ -115,7 +130,15 @@ type JSONRPCBuilderBackend struct { API BuilderAPI } -func (b *JSONRPCBuilderBackend) SendBundle(ctx context.Context, bundle *SendMevBundleArgs) error { +func (b *JSONRPCBuilderBackend) SendBundle(ctx context.Context, bundle *SendMevBundleArgs) (err error) { + startAt := time.Now() + metrics.IncBundleSentToBuilder(b.Name) + defer func() { + metrics.RecordBundleSentToBuilderTime(b.Name, time.Since(startAt).Milliseconds()) + if err != nil { + metrics.IncBundleSentToBuilderFailure(b.Name) + } + }() switch b.API { case BuilderAPIRefundRecipient: refRec, err := ConvertBundleToRefundRecipient(bundle) diff --git a/mevshare/sim_queue.go b/mevshare/sim_queue.go index bf0ef61..e1f940e 100644 --- a/mevshare/sim_queue.go +++ b/mevshare/sim_queue.go @@ -10,6 +10,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/common" + "github.com/flashbots/mev-share-node/metrics" "github.com/flashbots/mev-share-node/simqueue" "go.uber.org/zap" "golang.org/x/time/rate" @@ -41,10 +42,10 @@ func NewQueue( workersPerNode: workersPerNode, } - for i, s := range sim { + for i := range sim { worker := SimulationWorker{ log: log.Named("worker").With(zap.Int("worker-id", i)), - simulationBackend: s, + simulationBackend: sim[i], simRes: simRes, cancelCache: cancelCache, backgroundWg: backgroundWg, @@ -56,12 +57,12 @@ func NewQueue( func (q *SimQueue) Start(ctx context.Context) *sync.WaitGroup { process := make([]simqueue.ProcessFunc, 0, len(q.workers)*q.workersPerNode) - for _, w := range q.workers { + for i := range q.workers { if q.workersPerNode > 1 { - workers := simqueue.MultipleWorkers(w.Process, q.workersPerNode, rate.Inf, 1)[0] - process = append(process, workers) + workers := simqueue.MultipleWorkers(q.workers[i].Process, q.workersPerNode, rate.Inf, 1) + process = append(process, workers...) } else { - process = append(process, w.Process) + process = append(process, q.workers[i].Process) } } blockNumber, err := q.eth.BlockNumber(ctx) @@ -104,6 +105,10 @@ func (q *SimQueue) Start(ctx context.Context) *sync.WaitGroup { } func (q *SimQueue) ScheduleBundleSimulation(ctx context.Context, bundle *SendMevBundleArgs, highPriority bool) error { + startAt := time.Now() + defer func() { + metrics.RecordBundleAddQueueDuration(time.Since(startAt).Milliseconds()) + }() data, err := json.Marshal(bundle) if err != nil { return err @@ -119,9 +124,13 @@ type SimulationWorker struct { backgroundWg *sync.WaitGroup } -func (w *SimulationWorker) Process(ctx context.Context, data []byte, info simqueue.QueueItemInfo) error { +func (w *SimulationWorker) Process(ctx context.Context, data []byte, info simqueue.QueueItemInfo) (err error) { + startAt := time.Now() + defer func() { + metrics.RecordBundleProcessDuration(time.Since(startAt).Milliseconds()) + }() var bundle SendMevBundleArgs - err := json.Unmarshal(data, &bundle) + err = json.Unmarshal(data, &bundle) if err != nil { w.log.Error("Failed to unmarshal bundle simulation data", zap.Error(err)) return err diff --git a/mevshare/types.go b/mevshare/types.go index 3cade91..596ede2 100644 --- a/mevshare/types.go +++ b/mevshare/types.go @@ -15,6 +15,12 @@ var ( ErrNilBundleMetadata = errors.New("bundle metadata is nil") ) +const ( + SendBundleEndpointName = "mev_sendBundle" + SimBundleEndpointName = "mev_simBundle" + CancelBundleByHashEndpointName = "mev_cancelBundleByHash" +) + // HintIntent is a set of hint intents // its marshalled as an array of strings type HintIntent uint8 diff --git a/spike/manager.go b/spike/manager.go new file mode 100644 index 0000000..0f24f03 --- /dev/null +++ b/spike/manager.go @@ -0,0 +1,163 @@ +// Package spike provides a primitive to handle spike-like load on retrieving external resources +package spike + +import ( + "context" + "sync" + "time" + + gocache "github.com/patrickmn/go-cache" +) + +const ( + taskQueueLen = 60 + currentlyExecutedSize = 50 + defaultCleanupInterval = 5 * time.Millisecond +) + +// TODO: cache errors and allow common.Hash as key + +type Manager[T any] struct { + mu sync.RWMutex + handler Handler[T] + taskQueue chan task[T] + currentlyExecuted map[string][]chan<- result[T] +} + +// NewCustomManager creates a new Manager with a custom cache implementation controlled by client code +// it should be used for non-trivial flows or non-default cache implementations +func NewCustomManager[T any](h Handler[T]) *Manager[T] { + cm := &Manager[T]{ + handler: h, + taskQueue: make(chan task[T], taskQueueLen), + currentlyExecuted: make(map[string][]chan<- result[T], currentlyExecutedSize), + } + go cm.start() + return cm +} + +// NewManager creates a new Manager with a default cache implementation +// it is preferred way of creating a new Manager +func NewManager[T any](fetch func(ctx context.Context, k string) (T, error), cacheTime time.Duration) *Manager[T] { + g := gocache.New(cacheTime, defaultCleanupInterval) + return NewCustomManager[T](Handler[T]{ + Fetch: fetch, + Set: func(k string, v T) { + g.Set(k, v, cacheTime) + }, + Get: func(k string) (T, bool) { + v, ok := g.Get(k) + if !ok { + var rt T + return rt, false + } + //nolint:forcetypeassert + return v.(T), true + }, + }) +} + +type Handler[T any] struct { + Fetch func(ctx context.Context, k string) (T, error) + Set func(k string, v T) + Get func(k string) (T, bool) +} + +type task[T any] struct { + key string + res chan<- result[T] +} + +type result[T any] struct { + v T + e error +} + +func (m *Manager[T]) start() { + for t := range m.taskQueue { + m.mu.Lock() + v, ok := m.handler.Get(t.key) + if ok { + t.res <- result[T]{v: v} + close(t.res) + m.mu.Unlock() + continue + } + + chans, ok := m.currentlyExecuted[t.key] + if ok { + chans = append(chans, t.res) + m.currentlyExecuted[t.key] = chans + m.mu.Unlock() + continue + } + m.mu.Unlock() + + go func(currentTask task[T]) { + m.mu.Lock() + v, ok := m.handler.Get(currentTask.key) + if ok { + currentTask.res <- result[T]{v: v} + close(currentTask.res) + m.mu.Unlock() + return + } + chans, ok := m.currentlyExecuted[currentTask.key] + if ok { + chans = append(chans, currentTask.res) + m.currentlyExecuted[currentTask.key] = chans + m.mu.Unlock() + return + } + + m.currentlyExecuted[currentTask.key] = []chan<- result[T]{currentTask.res} + m.mu.Unlock() + + res, err := m.handler.Fetch(context.Background(), currentTask.key) + if err != nil { + m.mu.Lock() + chans = m.currentlyExecuted[currentTask.key] + for _, ch := range chans { + ch <- result[T]{e: err} + close(ch) + } + + delete(m.currentlyExecuted, currentTask.key) + m.mu.Unlock() + return + } + m.handler.Set(currentTask.key, res) + + m.mu.Lock() + chans = m.currentlyExecuted[currentTask.key] + for _, ch := range chans { + ch <- result[T]{v: res} + close(ch) + } + delete(m.currentlyExecuted, currentTask.key) + m.mu.Unlock() + }(t) + } +} + +func (m *Manager[T]) GetResult(ctx context.Context, k string) (T, error) { //nolint:ireturn + r, ok := m.handler.Get(k) + if ok { + return r, nil + } + + resChan := make(chan result[T], 1) + + t := task[T]{ + key: k, + res: resChan, + } + m.taskQueue <- t + select { + case <-ctx.Done(): + var tr T + return tr, ctx.Err() + case completed := <-resChan: + return completed.v, completed.e + } +} diff --git a/spike/manager_test.go b/spike/manager_test.go new file mode 100644 index 0000000..3e9c659 --- /dev/null +++ b/spike/manager_test.go @@ -0,0 +1,130 @@ +// nolint +package spike + +import ( + "context" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + gocache "github.com/patrickmn/go-cache" + "github.com/stretchr/testify/assert" +) + +func TestManager(t *testing.T) { + tokens := []string{"1", "2", "3", "4", "1", "2", "3", "4"} + response := map[string]*big.Int{ + "1": big.NewInt(9031161740652627), + "2": big.NewInt(336199114644976), + "3": big.NewInt(336578093626181), + "4": big.NewInt(10), + } + cacheControl := new(int32) + m := NewManager(func(ctx context.Context, k string) (*big.Int, error) { + atomic.AddInt32(cacheControl, 1) + return response[k], nil + }, time.Second*3) + + wg := sync.WaitGroup{} + wg.Add(len(tokens) * 11) + for i := 0; i <= 10; i++ { + for _, token := range tokens { + go func(token string) { + defer wg.Done() + res, err := m.GetResult(context.Background(), token) + + assert.NoError(t, err) + assert.Equal(t, res, response[token]) + }(token) + } + <-time.After(time.Millisecond * 100) + } + wg.Wait() + assert.Equal(t, int(atomic.LoadInt32(cacheControl)), 4) + <-time.After(time.Second * 3) + + atomic.StoreInt32(cacheControl, 0) + wg.Add(len(tokens) * 11) + for i := 0; i <= 10; i++ { + for _, token := range tokens { + go func(token string) { + defer wg.Done() + res, err := m.GetResult(context.Background(), token) + + assert.NoError(t, err) + assert.Equal(t, res, response[token]) + }(token) + } + <-time.After(time.Millisecond * 100) + } + wg.Wait() + assert.Equal(t, int(atomic.LoadInt32(cacheControl)), 4) +} + +func TestCustomManager(t *testing.T) { + tokens := []string{"1", "2", "3", "4", "1", "2", "3", "4"} + response := map[string]*big.Int{ + "1": big.NewInt(9031161740652627), + "2": big.NewInt(336199114644976), + "3": big.NewInt(336578093626181), + "4": big.NewInt(10), + } + + g := gocache.New(gocache.NoExpiration, gocache.DefaultExpiration) + cacheControl := new(int32) + handler := Handler[*big.Int]{ + Fetch: func(ctx context.Context, t string) (*big.Int, error) { + atomic.AddInt32(cacheControl, 1) + return response[t], nil + }, + Set: func(k string, v *big.Int) { + g.Set(k, v, time.Second*2) + }, + Get: func(k string) (*big.Int, bool) { + v, ok := g.Get(k) + if !ok { + return nil, false + } + return v.(*big.Int), true + }, + } + + manager := NewCustomManager(handler) + wg := sync.WaitGroup{} + wg.Add(len(tokens) * 11) + for i := 0; i <= 10; i++ { + for _, token := range tokens { + go func(token string) { + defer wg.Done() + res, err := manager.GetResult(context.Background(), token) + assert.NoError(t, err) + assert.Equal(t, res, response[token]) + }(token) + } + <-time.After(time.Millisecond * 100) + } + wg.Wait() + assert.Equal(t, int(atomic.LoadInt32(cacheControl)), 4) + <-time.After(time.Second * 2) + + _, ok := g.Get(tokens[0]) + assert.Equal(t, ok, false) + + atomic.StoreInt32(cacheControl, 0) + wg.Add(len(tokens) * 11) + for i := 0; i <= 10; i++ { + for _, token := range tokens { + go func(token string) { + defer wg.Done() + res, err := manager.GetResult(context.Background(), token) + assert.NoError(t, err) + assert.Equal(t, res, response[token]) + }(token) + } + <-time.After(time.Millisecond * 100) + } + wg.Wait() + assert.Equal(t, int(atomic.LoadInt32(cacheControl)), 4) +}