diff --git a/beaconclient/beacon_client_test.go b/beaconclient/beacon_client_test.go index 0f13d922..e9f668d9 100644 --- a/beaconclient/beacon_client_test.go +++ b/beaconclient/beacon_client_test.go @@ -48,7 +48,7 @@ func newTestBackend(t require.TestingT, numBeaconNodes int) *testBackend { func TestBeaconInstance(t *testing.T) { r := mux.NewRouter() srv := httptest.NewServer(r) - bc := NewProdBeaconInstance(common.TestLog, srv.URL) + bc := NewProdBeaconInstance(common.TestLog, srv.URL, srv.URL) r.HandleFunc("/eth/v1/beacon/states/1/validators", func(w http.ResponseWriter, _ *http.Request) { resp := []byte(`{ @@ -206,7 +206,7 @@ func TestFetchValidators(t *testing.T) { func TestGetForkSchedule(t *testing.T) { r := mux.NewRouter() srv := httptest.NewServer(r) - bc := NewProdBeaconInstance(common.TestLog, srv.URL) + bc := NewProdBeaconInstance(common.TestLog, srv.URL, srv.URL) r.HandleFunc("/eth/v1/config/fork_schedule", func(w http.ResponseWriter, _ *http.Request) { resp := []byte(`{ diff --git a/beaconclient/mock_beacon_instance.go b/beaconclient/mock_beacon_instance.go index d2d7df3c..6d551ed2 100644 --- a/beaconclient/mock_beacon_instance.go +++ b/beaconclient/mock_beacon_instance.go @@ -100,6 +100,10 @@ func (c *MockBeaconInstance) GetURI() string { return "" } +func (c *MockBeaconInstance) GetPublishURI() string { + return "" +} + func (c *MockBeaconInstance) addDelay() { if c.ResponseDelay > 0 { time.Sleep(c.ResponseDelay) diff --git a/beaconclient/multi_beacon_client.go b/beaconclient/multi_beacon_client.go index 63d4e271..e7feb5c0 100644 --- a/beaconclient/multi_beacon_client.go +++ b/beaconclient/multi_beacon_client.go @@ -55,6 +55,7 @@ type IBeaconInstance interface { GetStateValidators(stateID string) (*GetStateValidatorsResponse, error) GetProposerDuties(epoch uint64) (*ProposerDutiesResponse, error) GetURI() string + GetPublishURI() string PublishBlock(block *common.VersionedSignedProposal, broadcastMode BroadcastMode) (code int, err error) GetGenesis() (*GetGenesisResponse, error) GetSpec() (spec *GetSpecResponse, err error) @@ -271,7 +272,7 @@ func (c *MultiBeaconClient) PublishBlock(block *common.VersionedSignedProposal) resChans := make(chan publishResp, len(clients)) for i, client := range clients { - log := log.WithField("uri", client.GetURI()) + log := log.WithField("uri", client.GetPublishURI()) log.Debug("publishing block") go func(index int, client IBeaconInstance) { code, err := client.PublishBlock(block, c.broadcastMode) @@ -286,7 +287,7 @@ func (c *MultiBeaconClient) PublishBlock(block *common.VersionedSignedProposal) var lastErrPublishResp publishResp for i := 0; i < len(clients); i++ { res := <-resChans - log = log.WithField("beacon", clients[res.index].GetURI()) + log = log.WithField("beacon", clients[res.index].GetPublishURI()) if res.err != nil { log.WithField("statusCode", res.code).WithError(res.err).Warn("failed to publish block") lastErrPublishResp = res diff --git a/beaconclient/prod_beacon_instance.go b/beaconclient/prod_beacon_instance.go index d50b86c6..695629cf 100644 --- a/beaconclient/prod_beacon_instance.go +++ b/beaconclient/prod_beacon_instance.go @@ -15,21 +15,26 @@ import ( ) type ProdBeaconInstance struct { - log *logrus.Entry - beaconURI string + log *logrus.Entry + beaconURI string + beaconPublishURI string // feature flags ffUseV1PublishBlockEndpoint bool ffUseSSZEncodingPublishBlock bool + + // http clients + publishingClient *http.Client } -func NewProdBeaconInstance(log *logrus.Entry, beaconURI string) *ProdBeaconInstance { +func NewProdBeaconInstance(log *logrus.Entry, beaconURI, beaconPublishURI string) *ProdBeaconInstance { _log := log.WithFields(logrus.Fields{ - "component": "beaconInstance", - "beaconURI": beaconURI, + "component": "beaconInstance", + "beaconURI": beaconURI, + "beaconPublishURI": beaconPublishURI, }) - client := &ProdBeaconInstance{_log, beaconURI, false, false} + client := &ProdBeaconInstance{_log, beaconURI, beaconPublishURI, false, false, &http.Client{}} // feature flags if os.Getenv("USE_V1_PUBLISH_BLOCK_ENDPOINT") != "" { @@ -179,7 +184,7 @@ func (c *ProdBeaconInstance) SyncStatus() (*SyncStatusPayloadData, error) { uri := c.beaconURI + "/eth/v1/node/syncing" timeout := 5 * time.Second resp := new(SyncStatusPayload) - _, err := fetchBeacon(http.MethodGet, uri, nil, resp, &timeout, http.Header{}, false) + _, err := fetchBeacon(http.MethodGet, uri, nil, resp, &http.Client{Timeout: timeout}, http.Header{}, false) if err != nil { return nil, err } @@ -248,12 +253,16 @@ func (c *ProdBeaconInstance) GetURI() string { return c.beaconURI } +func (c *ProdBeaconInstance) GetPublishURI() string { + return c.beaconPublishURI +} + func (c *ProdBeaconInstance) PublishBlock(block *common.VersionedSignedProposal, broadcastMode BroadcastMode) (code int, err error) { var uri string if c.ffUseV1PublishBlockEndpoint { - uri = fmt.Sprintf("%s/eth/v1/beacon/blocks", c.beaconURI) + uri = fmt.Sprintf("%s/eth/v1/beacon/blocks", c.beaconPublishURI) } else { - uri = fmt.Sprintf("%s/eth/v2/beacon/blocks?broadcast_validation=%s", c.beaconURI, broadcastMode) + uri = fmt.Sprintf("%s/eth/v2/beacon/blocks?broadcast_validation=%s", c.beaconPublishURI, broadcastMode) } headers := http.Header{} headers.Add("Eth-Consensus-Version", strings.ToLower(block.Version.String())) // optional in v1, required in v2 @@ -279,7 +288,7 @@ func (c *ProdBeaconInstance) PublishBlock(block *common.VersionedSignedProposal, } publishingStartTime := time.Now().UTC() encodeDurationMs := publishingStartTime.Sub(encodeStartTime).Milliseconds() - code, err = fetchBeacon(http.MethodPost, uri, payloadBytes, nil, nil, headers, useSSZ) + code, err = fetchBeacon(http.MethodPost, uri, payloadBytes, nil, c.publishingClient, headers, useSSZ) publishDurationMs := time.Now().UTC().Sub(publishingStartTime).Milliseconds() log.WithFields(logrus.Fields{ "slot": slot, diff --git a/beaconclient/util.go b/beaconclient/util.go index 623b1047..f80d1631 100644 --- a/beaconclient/util.go +++ b/beaconclient/util.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "strings" - "time" ) var ( @@ -31,7 +30,7 @@ func parseBroadcastModeString(s string) (BroadcastMode, bool) { return b, ok } -func fetchBeacon(method, url string, payload []byte, dst any, timeout *time.Duration, headers http.Header, ssz bool) (code int, err error) { +func fetchBeacon(method, url string, payload []byte, dst any, httpClient *http.Client, headers http.Header, ssz bool) (code int, err error) { var req *http.Request if payload == nil { @@ -55,10 +54,11 @@ func fetchBeacon(method, url string, payload []byte, dst any, timeout *time.Dura } req.Header.Set("accept", "application/json") - client := &http.Client{} - if timeout != nil && timeout.Milliseconds() > 0 { - client.Timeout = *timeout + client := http.DefaultClient + if httpClient != nil { + client = httpClient } + resp, err := client.Do(req) if err != nil { return 0, fmt.Errorf("client refused for %s: %w", url, err) diff --git a/cmd/api.go b/cmd/api.go index 89abbf85..16f127bd 100644 --- a/cmd/api.go +++ b/cmd/api.go @@ -53,6 +53,7 @@ func init() { apiCmd.Flags().StringVar(&apiListenAddr, "listen-addr", apiDefaultListenAddr, "listen address for webserver") apiCmd.Flags().StringSliceVar(&beaconNodeURIs, "beacon-uris", defaultBeaconURIs, "beacon endpoints") + apiCmd.Flags().StringSliceVar(&beaconNodePublishURIs, "beacon-publish-uris", defaultBeaconPublishURIs, "beacon publish endpoints") apiCmd.Flags().StringVar(&redisURI, "redis-uri", defaultRedisURI, "redis uri") apiCmd.Flags().StringVar(&redisReadonlyURI, "redis-readonly-uri", defaultRedisReadonlyURI, "redis readonly uri") apiCmd.Flags().StringVar(&postgresDSN, "db", defaultPostgresDSN, "PostgreSQL DSN") @@ -100,9 +101,18 @@ var apiCmd = &cobra.Command{ log.Fatalf("no beacon endpoints specified") } log.Infof("Using beacon endpoints: %s", strings.Join(beaconNodeURIs, ", ")) + if len(beaconNodePublishURIs) == 0 { + // default to same endpoint as the beacon endpoints + beaconNodePublishURIs = beaconNodeURIs + } else if len(beaconNodePublishURIs) != len(beaconNodeURIs) { + log.Fatalf("beacon publish endpoints do not match the number of beacon endpoints") + } else { + log.Infof("Using beacon publish endpoints: %s", strings.Join(beaconNodePublishURIs, ", ")) + } + var beaconInstances []beaconclient.IBeaconInstance - for _, uri := range beaconNodeURIs { - beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri)) + for i, uri := range beaconNodeURIs { + beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri, beaconNodePublishURIs[i])) } beaconClient := beaconclient.NewMultiBeaconClient(log, beaconInstances) diff --git a/cmd/housekeeper.go b/cmd/housekeeper.go index 9b03d8be..b307a2a0 100644 --- a/cmd/housekeeper.go +++ b/cmd/housekeeper.go @@ -63,7 +63,7 @@ var housekeeperCmd = &cobra.Command{ log.Infof("Using beacon endpoints: %s", strings.Join(beaconNodeURIs, ", ")) var beaconInstances []beaconclient.IBeaconInstance for _, uri := range beaconNodeURIs { - beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri)) + beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri, uri)) } beaconClient := beaconclient.NewMultiBeaconClient(log, beaconInstances) diff --git a/cmd/variables.go b/cmd/variables.go index 5f49e7e3..58d6b59b 100644 --- a/cmd/variables.go +++ b/cmd/variables.go @@ -7,20 +7,22 @@ import ( ) var ( - defaultNetwork = common.GetEnv("NETWORK", "") - defaultBeaconURIs = common.GetSliceEnv("BEACON_URIS", []string{"http://localhost:3500"}) - defaultRedisURI = common.GetEnv("REDIS_URI", "localhost:6379") - defaultRedisReadonlyURI = common.GetEnv("REDIS_READONLY_URI", "") - defaultPostgresDSN = common.GetEnv("POSTGRES_DSN", "") - defaultMemcachedURIs = common.GetSliceEnv("MEMCACHED_URIS", nil) - defaultLogJSON = os.Getenv("LOG_JSON") != "" - defaultLogLevel = common.GetEnv("LOG_LEVEL", "info") + defaultNetwork = common.GetEnv("NETWORK", "") + defaultBeaconURIs = common.GetSliceEnv("BEACON_URIS", []string{"http://localhost:3500"}) + defaultBeaconPublishURIs = common.GetSliceEnv("BEACON_PUBLISH_URIS", []string{}) + defaultRedisURI = common.GetEnv("REDIS_URI", "localhost:6379") + defaultRedisReadonlyURI = common.GetEnv("REDIS_READONLY_URI", "") + defaultPostgresDSN = common.GetEnv("POSTGRES_DSN", "") + defaultMemcachedURIs = common.GetSliceEnv("MEMCACHED_URIS", nil) + defaultLogJSON = os.Getenv("LOG_JSON") != "" + defaultLogLevel = common.GetEnv("LOG_LEVEL", "info") - beaconNodeURIs []string - redisURI string - redisReadonlyURI string - postgresDSN string - memcachedURIs []string + beaconNodeURIs []string + beaconNodePublishURIs []string + redisURI string + redisReadonlyURI string + postgresDSN string + memcachedURIs []string logJSON bool logLevel string diff --git a/scripts/sse-event-logger/main.go b/scripts/sse-event-logger/main.go index ed009fb3..854f9c9a 100644 --- a/scripts/sse-event-logger/main.go +++ b/scripts/sse-event-logger/main.go @@ -22,7 +22,7 @@ func main() { log.Infof("Using beacon endpoints: %s", strings.Join(beaconURIs, ", ")) for _, uri := range beaconURIs { - beaconInstance := beaconclient.NewProdBeaconInstance(log, uri) + beaconInstance := beaconclient.NewProdBeaconInstance(log, uri, uri) go subscribeHead(beaconInstance) go subscribePayloadAttr(beaconInstance) } diff --git a/services/api/service.go b/services/api/service.go index 402ae06c..3ed7dc9a 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -2007,13 +2007,6 @@ func (api *RelayAPI) handleSubmitNewBlock(w http.ResponseWriter, req *http.Reque simResultC := make(chan *blockSimResult, 1) var eligibleAt time.Time // will be set once the bid is ready - submission, err = common.GetBlockSubmissionInfo(payload) - if err != nil { - log.WithError(err).Warn("missing fields in submit block request") - api.RespondError(w, http.StatusBadRequest, err.Error()) - return - } - bfOpts := bidFloorOpts{ w: w, tx: tx,