Skip to content

Commit

Permalink
Fast mode (#127)
Browse files Browse the repository at this point in the history
Fast endpoint implementation
  • Loading branch information
TymKh authored Oct 16, 2023
1 parent 9652d51 commit a8331b2
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 47 deletions.
42 changes: 42 additions & 0 deletions adapters/webfile/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package webfile

import (
"context"
"fmt"
"io"
"net/http"
)

var ErrRequest = fmt.Errorf("request failed")

//https://raw.githubusercontent.com/flashbots/dowg/main/builder-registrations.json

type Fetcher struct {
url string
cl http.Client
}

func NewFetcher(url string) *Fetcher {
return &Fetcher{url: url, cl: http.Client{}}
}

func (f *Fetcher) Fetch(ctx context.Context) ([]byte, error) {
//execute http request and load bytes
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, f.url, nil)
if err != nil {
return nil, err
}
resp, err := f.cl.Do(httpReq)
if err != nil {
return nil, err
}
bts, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("err: %w status code %d", ErrRequest, resp.StatusCode)
}
return bts, nil
}
21 changes: 21 additions & 0 deletions adapters/webfile/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package webfile

import (
"context"
"fmt"
"net/http"
"testing"
)

func TestFetch(t *testing.T) {
f := Fetcher{
url: "https://raw.githubusercontent.com/flashbots/dowg/main/builder-registrations.json",
cl: http.Client{},
}
bts, err := f.Fetch(context.Background())
if err != nil {
panic(err)
}

fmt.Println(string(bts))
}
76 changes: 76 additions & 0 deletions application/builder_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package application

import (
"context"
"encoding/json"
"strings"
"time"

"github.com/ethereum/go-ethereum/log"
)

type BuilderInfo struct {
Name string `json:"name"`
RPC string `json:"rpc"`
SupportedApis []string `json:"supported-apis"`
}
type Fetcher interface {
Fetch(ctx context.Context) ([]byte, error)
}
type BuilderInfoService struct {
fetcher Fetcher
builderInfos []BuilderInfo
}

func StartBuilderInfoService(ctx context.Context, fetcher Fetcher, fetchInterval time.Duration) (*BuilderInfoService, error) {
bis := BuilderInfoService{
fetcher: fetcher,
}
if fetcher != nil {
err := bis.fetchBuilderInfo(ctx)
if err != nil {
return nil, err
}
go bis.syncLoop(fetchInterval)

}
return &bis, nil
}
func (bis *BuilderInfoService) Builders() []BuilderInfo {
return bis.builderInfos
}

func (bis *BuilderInfoService) BuilderNames() []string {
var names = make([]string, 0, len(bis.builderInfos))
for _, builderInfo := range bis.builderInfos {
names = append(names, strings.ToLower(builderInfo.Name))
}
return names
}

func (bis *BuilderInfoService) syncLoop(fetchInterval time.Duration) {
ticker := time.NewTicker(fetchInterval)
for range ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
err := bis.fetchBuilderInfo(ctx)
if err != nil {
//TODO: probably panic on multiple consequent errors, though it's not critical in nature
log.Error("failed to fetch builder info", "err", err)
}
cancel()
}
}

func (bis *BuilderInfoService) fetchBuilderInfo(ctx context.Context) error {
bts, err := bis.fetcher.Fetch(ctx)
if err != nil {
return err
}
var builderInfos []BuilderInfo
err = json.Unmarshal(bts, &builderInfos)
if err != nil {
return err
}
bis.builderInfos = builderInfos
return nil
}
51 changes: 28 additions & 23 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,34 @@ var (
version = "dev" // is set during build process

// defaults
defaultDebug = os.Getenv("DEBUG") == "1"
defaultLogJSON = os.Getenv("LOG_JSON") == "1"
defaultListenAddress = "127.0.0.1:9000"
defaultDrainAddress = "127.0.0.1:9001"
defaultDrainSeconds = 60
defaultProxyUrl = "http://127.0.0.1:8545"
defaultProxyTimeoutSeconds = 10
defaultRelayUrl = "https://relay.flashbots.net"
defaultRedisUrl = "localhost:6379"
defaultServiceName = os.Getenv("SERVICE_NAME")
defaultDebug = os.Getenv("DEBUG") == "1"
defaultLogJSON = os.Getenv("LOG_JSON") == "1"
defaultListenAddress = "127.0.0.1:9000"
defaultDrainAddress = "127.0.0.1:9001"
defaultDrainSeconds = 60
defaultProxyUrl = "http://127.0.0.1:8545"
defaultProxyTimeoutSeconds = 10
defaultRelayUrl = "https://relay.flashbots.net"
defaultRedisUrl = "localhost:6379"
defaultServiceName = os.Getenv("SERVICE_NAME")
defaultFetchInfoIntervalSeconds = 600

// cli flags
versionPtr = flag.Bool("version", false, "just print the program version")
listenAddress = flag.String("listen", getEnvAsStrOrDefault("LISTEN_ADDR", defaultListenAddress), "Listen address")
drainAddress = flag.String("drain", getEnvAsStrOrDefault("DRAIN_ADDR", defaultDrainAddress), "Drain address")
drainSeconds = flag.Int("drainSeconds", getEnvAsIntOrDefault("DRAIN_SECONDS", defaultDrainSeconds), "seconds to wait for graceful shutdown")
proxyUrl = flag.String("proxy", getEnvAsStrOrDefault("PROXY_URL", defaultProxyUrl), "URL for default JSON-RPC proxy target (eth node, Infura, etc.)")
proxyTimeoutSeconds = flag.Int("proxyTimeoutSeconds", getEnvAsIntOrDefault("PROXY_TIMEOUT_SECONDS", defaultProxyTimeoutSeconds), "proxy client timeout in seconds")
redisUrl = flag.String("redis", getEnvAsStrOrDefault("REDIS_URL", defaultRedisUrl), "URL for Redis (use 'dev' to use integrated in-memory redis)")
relayUrl = flag.String("relayUrl", getEnvAsStrOrDefault("RELAY_URL", defaultRelayUrl), "URL for relay")
relaySigningKey = flag.String("signingKey", os.Getenv("RELAY_SIGNING_KEY"), "Signing key for relay requests")
psqlDsn = flag.String("psql", os.Getenv("POSTGRES_DSN"), "Postgres DSN")
debugPtr = flag.Bool("debug", defaultDebug, "print debug output")
logJSONPtr = flag.Bool("logJSON", defaultLogJSON, "log in JSON")
serviceName = flag.String("serviceName", defaultServiceName, "name of the service which will be used in the logs")
versionPtr = flag.Bool("version", false, "just print the program version")
listenAddress = flag.String("listen", getEnvAsStrOrDefault("LISTEN_ADDR", defaultListenAddress), "Listen address")
drainAddress = flag.String("drain", getEnvAsStrOrDefault("DRAIN_ADDR", defaultDrainAddress), "Drain address")
drainSeconds = flag.Int("drainSeconds", getEnvAsIntOrDefault("DRAIN_SECONDS", defaultDrainSeconds), "seconds to wait for graceful shutdown")
fetchIntervalSeconds = flag.Int("fetchIntervalSeconds", getEnvAsIntOrDefault("FETCH_INFO_INTERVAL_SECONDS", defaultFetchInfoIntervalSeconds), "seconds between builder info fetches")
builderInfoSource = flag.String("builderInfoSource", getEnvAsStrOrDefault("BUILDER_INFO_SOURCE", ""), "URL for json source of actual builder info")
proxyUrl = flag.String("proxy", getEnvAsStrOrDefault("PROXY_URL", defaultProxyUrl), "URL for default JSON-RPC proxy target (eth node, Infura, etc.)")
proxyTimeoutSeconds = flag.Int("proxyTimeoutSeconds", getEnvAsIntOrDefault("PROXY_TIMEOUT_SECONDS", defaultProxyTimeoutSeconds), "proxy client timeout in seconds")
redisUrl = flag.String("redis", getEnvAsStrOrDefault("REDIS_URL", defaultRedisUrl), "URL for Redis (use 'dev' to use integrated in-memory redis)")
relayUrl = flag.String("relayUrl", getEnvAsStrOrDefault("RELAY_URL", defaultRelayUrl), "URL for relay")
relaySigningKey = flag.String("signingKey", os.Getenv("RELAY_SIGNING_KEY"), "Signing key for relay requests")
psqlDsn = flag.String("psql", os.Getenv("POSTGRES_DSN"), "Postgres DSN")
debugPtr = flag.Bool("debug", defaultDebug, "print debug output")
logJSONPtr = flag.Bool("logJSON", defaultLogJSON, "log in JSON")
serviceName = flag.String("serviceName", defaultServiceName, "name of the service which will be used in the logs")
)

func main() {
Expand Down Expand Up @@ -108,6 +111,8 @@ func main() {
RelaySigningKey: key,
RelayUrl: *relayUrl,
Version: version,
BuilderInfoSource: *builderInfoSource,
FetchInfoInterval: *fetchIntervalSeconds,
})
if err != nil {
logger.Crit("Server init error", "error", err)
Expand Down
2 changes: 2 additions & 0 deletions server/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ type Configuration struct {
RelaySigningKey *ecdsa.PrivateKey
RelayUrl string
Version string
BuilderInfoSource string
FetchInfoInterval int
}
6 changes: 4 additions & 2 deletions server/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type RpcRequestHandler struct {
relayUrl string
uid uuid.UUID
requestRecord *requestRecord
builderNames []string
}

func NewRpcRequestHandler(logger log.Logger, respw *http.ResponseWriter, req *http.Request, proxyUrl string, proxyTimeoutSeconds int, relaySigningKey *ecdsa.PrivateKey, relayUrl string, db database.Store) *RpcRequestHandler {
func NewRpcRequestHandler(logger log.Logger, respw *http.ResponseWriter, req *http.Request, proxyUrl string, proxyTimeoutSeconds int, relaySigningKey *ecdsa.PrivateKey, relayUrl string, db database.Store, builderNames []string) *RpcRequestHandler {
return &RpcRequestHandler{
logger: logger,
respw: respw,
Expand All @@ -39,6 +40,7 @@ func NewRpcRequestHandler(logger log.Logger, respw *http.ResponseWriter, req *ht
relayUrl: relayUrl,
uid: uuid.New(),
requestRecord: NewRequestRecord(db),
builderNames: builderNames,
}
}

Expand Down Expand Up @@ -96,7 +98,7 @@ func (r *RpcRequestHandler) process() {
}

// mev-share parameters
urlParams, err := ExtractParametersFromUrl(r.req.URL)
urlParams, err := ExtractParametersFromUrl(r.req.URL, r.builderNames)
if err != nil {
r.logger.Warn("[process] Invalid auction preference", "error", err)
res := AuctionPreferenceErrorToJSONRPCResponse(jsonReq, err)
Expand Down
18 changes: 17 additions & 1 deletion server/request_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,29 @@ func (r *RpcRequest) sendTxToRelay() {
sendPrivateTxArgs := types.SendPrivateTxRequestWithPreferences{}
sendPrivateTxArgs.Tx = r.rawTxHex
sendPrivateTxArgs.Preferences = &r.urlParams.pref
if r.urlParams.fast {
if len(sendPrivateTxArgs.Preferences.Validity.Refund) == 0 {
addr, err := GetSenderAddressFromTx(r.tx)
if err != nil {
r.logger.Error("[sendTxToRelay] GetSenderAddressFromTx failed", "error", err)
r.writeRpcError(err.Error(), types.JsonRpcInternalError)
return
}
sendPrivateTxArgs.Preferences.Validity.Refund = []types.RefundConfig{
{
Address: addr,
Percent: 50,
},
}
}
}

fbRpc := flashbotsrpc.New(r.relayUrl, func(rpc *flashbotsrpc.FlashbotsRPC) {
if r.urlParams.originId != "" {
rpc.Headers["X-Flashbots-Origin"] = r.urlParams.originId
}
})

r.logger.Info("[sendTxToRelay] sending transaction", "builders count", len(sendPrivateTxArgs.Preferences.Privacy.Builders), "is_fast", r.urlParams.fast)
_, err = fbRpc.CallWithFlashbotsSignature("eth_sendPrivateTransaction", r.relaySigningKey, sendPrivateTxArgs)
if err != nil {
if errors.Is(err, flashbotsrpc.ErrRelayErrorResponse) {
Expand Down
18 changes: 16 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"syscall"
"time"

"github.com/flashbots/rpc-endpoint/adapters/webfile"
"github.com/flashbots/rpc-endpoint/application"
"github.com/flashbots/rpc-endpoint/database"

"github.com/ethereum/go-ethereum/log"
Expand All @@ -30,6 +32,9 @@ var DebugDontSendTx = os.Getenv("DEBUG_DONT_SEND_RAWTX") != ""
// Metamask fix helper
var RState *RedisState

type BuilderNameProvider interface {
BuilderNames() []string
}
type RpcEndPointServer struct {
server *http.Server
drain *http.Server
Expand All @@ -47,6 +52,7 @@ type RpcEndPointServer struct {
relayUrl string
startTime time.Time
version string
builderNameProvider BuilderNameProvider
}

func NewRpcEndPointServer(cfg Configuration) (*RpcEndPointServer, error) {
Expand All @@ -69,7 +75,14 @@ func NewRpcEndPointServer(cfg Configuration) (*RpcEndPointServer, error) {
if err != nil {
return nil, errors.Wrap(err, "Redis init error")
}

var builderInfoFetcher application.Fetcher
if cfg.BuilderInfoSource != "" {
builderInfoFetcher = webfile.NewFetcher(cfg.BuilderInfoSource)
}
bis, err := application.StartBuilderInfoService(context.Background(), builderInfoFetcher, time.Second*time.Duration(cfg.FetchInfoInterval))
if err != nil {
return nil, errors.Wrap(err, "BuilderInfoService init error")
}
return &RpcEndPointServer{
db: cfg.DB,
drainAddress: cfg.DrainAddress,
Expand All @@ -83,6 +96,7 @@ func NewRpcEndPointServer(cfg Configuration) (*RpcEndPointServer, error) {
relayUrl: cfg.RelayUrl,
startTime: Now(),
version: cfg.Version,
builderNameProvider: bis,
}, nil
}

Expand Down Expand Up @@ -189,7 +203,7 @@ func (s *RpcEndPointServer) HandleHttpRequest(respw http.ResponseWriter, req *ht
return
}

request := NewRpcRequestHandler(s.logger, &respw, req, s.proxyUrl, s.proxyTimeoutSeconds, s.relaySigningKey, s.relayUrl, s.db)
request := NewRpcRequestHandler(s.logger, &respw, req, s.proxyUrl, s.proxyTimeoutSeconds, s.relaySigningKey, s.relayUrl, s.db, s.builderNameProvider.BuilderNames())
request.process()
}

Expand Down
17 changes: 13 additions & 4 deletions server/url_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package server

import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/flashbots/rpc-endpoint/types"
"github.com/pkg/errors"
"net/url"
"strconv"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/flashbots/rpc-endpoint/types"
"github.com/pkg/errors"
)

var (
Expand All @@ -27,6 +28,7 @@ type URLParameters struct {
pref types.PrivateTxPreferences
prefWasSet bool
originId string
fast bool
}

// ExtractParametersFromUrl extracts the auction preference from the url query
Expand All @@ -36,7 +38,10 @@ type URLParameters struct {
// - builder: target builder, can be set multiple times, default: empty (only send to flashbots builders)
// - refund: refund in the form of 0xaddress:percentage, default: empty (will be set by default when backrun is produced)
// example: 0x123:80 - will refund 80% of the backrun profit to 0x123
func ExtractParametersFromUrl(url *url.URL) (params URLParameters, err error) {
func ExtractParametersFromUrl(url *url.URL, allBuilders []string) (params URLParameters, err error) {
if strings.HasPrefix(url.Path, "/fast") {
params.fast = true
}
var hint []string
hintQuery, ok := url.Query()["hint"]
if ok {
Expand Down Expand Up @@ -71,6 +76,10 @@ func ExtractParametersFromUrl(url *url.URL) (params URLParameters, err error) {
}
params.pref.Privacy.Builders = targetBuildersQuery
}
if params.fast {
// set all builders no matter what's in the url
params.pref.Privacy.Builders = allBuilders
}

refundAddressQuery, ok := url.Query()["refund"]
if ok {
Expand Down
Loading

0 comments on commit a8331b2

Please sign in to comment.