Skip to content

Commit

Permalink
Use dedicated http client for publishing (#608)
Browse files Browse the repository at this point in the history
* Use dedicated http client for publishing

* Add publish beacon URI
  • Loading branch information
avalonche authored Apr 24, 2024
1 parent f1781db commit 256fd3c
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 43 deletions.
4 changes: 2 additions & 2 deletions beaconclient/beacon_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newTestBackend(t require.TestingT, numBeaconNodes int) *testBackend {
func TestBeaconInstance(t *testing.T) {
r := mux.NewRouter()
srv := httptest.NewServer(r)
bc := NewProdBeaconInstance(common.TestLog, srv.URL)
bc := NewProdBeaconInstance(common.TestLog, srv.URL, srv.URL)

r.HandleFunc("/eth/v1/beacon/states/1/validators", func(w http.ResponseWriter, _ *http.Request) {
resp := []byte(`{
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestFetchValidators(t *testing.T) {
func TestGetForkSchedule(t *testing.T) {
r := mux.NewRouter()
srv := httptest.NewServer(r)
bc := NewProdBeaconInstance(common.TestLog, srv.URL)
bc := NewProdBeaconInstance(common.TestLog, srv.URL, srv.URL)

r.HandleFunc("/eth/v1/config/fork_schedule", func(w http.ResponseWriter, _ *http.Request) {
resp := []byte(`{
Expand Down
4 changes: 4 additions & 0 deletions beaconclient/mock_beacon_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (c *MockBeaconInstance) GetURI() string {
return ""
}

func (c *MockBeaconInstance) GetPublishURI() string {
return ""
}

func (c *MockBeaconInstance) addDelay() {
if c.ResponseDelay > 0 {
time.Sleep(c.ResponseDelay)
Expand Down
5 changes: 3 additions & 2 deletions beaconclient/multi_beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type IBeaconInstance interface {
GetStateValidators(stateID string) (*GetStateValidatorsResponse, error)
GetProposerDuties(epoch uint64) (*ProposerDutiesResponse, error)
GetURI() string
GetPublishURI() string
PublishBlock(block *common.VersionedSignedProposal, broadcastMode BroadcastMode) (code int, err error)
GetGenesis() (*GetGenesisResponse, error)
GetSpec() (spec *GetSpecResponse, err error)
Expand Down Expand Up @@ -271,7 +272,7 @@ func (c *MultiBeaconClient) PublishBlock(block *common.VersionedSignedProposal)
resChans := make(chan publishResp, len(clients))

for i, client := range clients {
log := log.WithField("uri", client.GetURI())
log := log.WithField("uri", client.GetPublishURI())
log.Debug("publishing block")
go func(index int, client IBeaconInstance) {
code, err := client.PublishBlock(block, c.broadcastMode)
Expand All @@ -286,7 +287,7 @@ func (c *MultiBeaconClient) PublishBlock(block *common.VersionedSignedProposal)
var lastErrPublishResp publishResp
for i := 0; i < len(clients); i++ {
res := <-resChans
log = log.WithField("beacon", clients[res.index].GetURI())
log = log.WithField("beacon", clients[res.index].GetPublishURI())
if res.err != nil {
log.WithField("statusCode", res.code).WithError(res.err).Warn("failed to publish block")
lastErrPublishResp = res
Expand Down
29 changes: 19 additions & 10 deletions beaconclient/prod_beacon_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,26 @@ import (
)

type ProdBeaconInstance struct {
log *logrus.Entry
beaconURI string
log *logrus.Entry
beaconURI string
beaconPublishURI string

// feature flags
ffUseV1PublishBlockEndpoint bool
ffUseSSZEncodingPublishBlock bool

// http clients
publishingClient *http.Client
}

func NewProdBeaconInstance(log *logrus.Entry, beaconURI string) *ProdBeaconInstance {
func NewProdBeaconInstance(log *logrus.Entry, beaconURI, beaconPublishURI string) *ProdBeaconInstance {
_log := log.WithFields(logrus.Fields{
"component": "beaconInstance",
"beaconURI": beaconURI,
"component": "beaconInstance",
"beaconURI": beaconURI,
"beaconPublishURI": beaconPublishURI,
})

client := &ProdBeaconInstance{_log, beaconURI, false, false}
client := &ProdBeaconInstance{_log, beaconURI, beaconPublishURI, false, false, &http.Client{}}

// feature flags
if os.Getenv("USE_V1_PUBLISH_BLOCK_ENDPOINT") != "" {
Expand Down Expand Up @@ -179,7 +184,7 @@ func (c *ProdBeaconInstance) SyncStatus() (*SyncStatusPayloadData, error) {
uri := c.beaconURI + "/eth/v1/node/syncing"
timeout := 5 * time.Second
resp := new(SyncStatusPayload)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, &timeout, http.Header{}, false)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, &http.Client{Timeout: timeout}, http.Header{}, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,12 +253,16 @@ func (c *ProdBeaconInstance) GetURI() string {
return c.beaconURI
}

func (c *ProdBeaconInstance) GetPublishURI() string {
return c.beaconPublishURI
}

func (c *ProdBeaconInstance) PublishBlock(block *common.VersionedSignedProposal, broadcastMode BroadcastMode) (code int, err error) {
var uri string
if c.ffUseV1PublishBlockEndpoint {
uri = fmt.Sprintf("%s/eth/v1/beacon/blocks", c.beaconURI)
uri = fmt.Sprintf("%s/eth/v1/beacon/blocks", c.beaconPublishURI)
} else {
uri = fmt.Sprintf("%s/eth/v2/beacon/blocks?broadcast_validation=%s", c.beaconURI, broadcastMode)
uri = fmt.Sprintf("%s/eth/v2/beacon/blocks?broadcast_validation=%s", c.beaconPublishURI, broadcastMode)
}
headers := http.Header{}
headers.Add("Eth-Consensus-Version", strings.ToLower(block.Version.String())) // optional in v1, required in v2
Expand All @@ -279,7 +288,7 @@ func (c *ProdBeaconInstance) PublishBlock(block *common.VersionedSignedProposal,
}
publishingStartTime := time.Now().UTC()
encodeDurationMs := publishingStartTime.Sub(encodeStartTime).Milliseconds()
code, err = fetchBeacon(http.MethodPost, uri, payloadBytes, nil, nil, headers, useSSZ)
code, err = fetchBeacon(http.MethodPost, uri, payloadBytes, nil, c.publishingClient, headers, useSSZ)
publishDurationMs := time.Now().UTC().Sub(publishingStartTime).Milliseconds()
log.WithFields(logrus.Fields{
"slot": slot,
Expand Down
10 changes: 5 additions & 5 deletions beaconclient/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"net/http"
"strings"
"time"
)

var (
Expand All @@ -31,7 +30,7 @@ func parseBroadcastModeString(s string) (BroadcastMode, bool) {
return b, ok
}

func fetchBeacon(method, url string, payload []byte, dst any, timeout *time.Duration, headers http.Header, ssz bool) (code int, err error) {
func fetchBeacon(method, url string, payload []byte, dst any, httpClient *http.Client, headers http.Header, ssz bool) (code int, err error) {
var req *http.Request

if payload == nil {
Expand All @@ -55,10 +54,11 @@ func fetchBeacon(method, url string, payload []byte, dst any, timeout *time.Dura
}
req.Header.Set("accept", "application/json")

client := &http.Client{}
if timeout != nil && timeout.Milliseconds() > 0 {
client.Timeout = *timeout
client := http.DefaultClient
if httpClient != nil {
client = httpClient
}

resp, err := client.Do(req)
if err != nil {
return 0, fmt.Errorf("client refused for %s: %w", url, err)
Expand Down
14 changes: 12 additions & 2 deletions cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func init() {

apiCmd.Flags().StringVar(&apiListenAddr, "listen-addr", apiDefaultListenAddr, "listen address for webserver")
apiCmd.Flags().StringSliceVar(&beaconNodeURIs, "beacon-uris", defaultBeaconURIs, "beacon endpoints")
apiCmd.Flags().StringSliceVar(&beaconNodePublishURIs, "beacon-publish-uris", defaultBeaconPublishURIs, "beacon publish endpoints")
apiCmd.Flags().StringVar(&redisURI, "redis-uri", defaultRedisURI, "redis uri")
apiCmd.Flags().StringVar(&redisReadonlyURI, "redis-readonly-uri", defaultRedisReadonlyURI, "redis readonly uri")
apiCmd.Flags().StringVar(&postgresDSN, "db", defaultPostgresDSN, "PostgreSQL DSN")
Expand Down Expand Up @@ -100,9 +101,18 @@ var apiCmd = &cobra.Command{
log.Fatalf("no beacon endpoints specified")
}
log.Infof("Using beacon endpoints: %s", strings.Join(beaconNodeURIs, ", "))
if len(beaconNodePublishURIs) == 0 {
// default to same endpoint as the beacon endpoints
beaconNodePublishURIs = beaconNodeURIs
} else if len(beaconNodePublishURIs) != len(beaconNodeURIs) {
log.Fatalf("beacon publish endpoints do not match the number of beacon endpoints")
} else {
log.Infof("Using beacon publish endpoints: %s", strings.Join(beaconNodePublishURIs, ", "))
}

var beaconInstances []beaconclient.IBeaconInstance
for _, uri := range beaconNodeURIs {
beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri))
for i, uri := range beaconNodeURIs {
beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri, beaconNodePublishURIs[i]))
}
beaconClient := beaconclient.NewMultiBeaconClient(log, beaconInstances)

Expand Down
2 changes: 1 addition & 1 deletion cmd/housekeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var housekeeperCmd = &cobra.Command{
log.Infof("Using beacon endpoints: %s", strings.Join(beaconNodeURIs, ", "))
var beaconInstances []beaconclient.IBeaconInstance
for _, uri := range beaconNodeURIs {
beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri))
beaconInstances = append(beaconInstances, beaconclient.NewProdBeaconInstance(log, uri, uri))
}
beaconClient := beaconclient.NewMultiBeaconClient(log, beaconInstances)

Expand Down
28 changes: 15 additions & 13 deletions cmd/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ import (
)

var (
defaultNetwork = common.GetEnv("NETWORK", "")
defaultBeaconURIs = common.GetSliceEnv("BEACON_URIS", []string{"http://localhost:3500"})
defaultRedisURI = common.GetEnv("REDIS_URI", "localhost:6379")
defaultRedisReadonlyURI = common.GetEnv("REDIS_READONLY_URI", "")
defaultPostgresDSN = common.GetEnv("POSTGRES_DSN", "")
defaultMemcachedURIs = common.GetSliceEnv("MEMCACHED_URIS", nil)
defaultLogJSON = os.Getenv("LOG_JSON") != ""
defaultLogLevel = common.GetEnv("LOG_LEVEL", "info")
defaultNetwork = common.GetEnv("NETWORK", "")
defaultBeaconURIs = common.GetSliceEnv("BEACON_URIS", []string{"http://localhost:3500"})
defaultBeaconPublishURIs = common.GetSliceEnv("BEACON_PUBLISH_URIS", []string{})
defaultRedisURI = common.GetEnv("REDIS_URI", "localhost:6379")
defaultRedisReadonlyURI = common.GetEnv("REDIS_READONLY_URI", "")
defaultPostgresDSN = common.GetEnv("POSTGRES_DSN", "")
defaultMemcachedURIs = common.GetSliceEnv("MEMCACHED_URIS", nil)
defaultLogJSON = os.Getenv("LOG_JSON") != ""
defaultLogLevel = common.GetEnv("LOG_LEVEL", "info")

beaconNodeURIs []string
redisURI string
redisReadonlyURI string
postgresDSN string
memcachedURIs []string
beaconNodeURIs []string
beaconNodePublishURIs []string
redisURI string
redisReadonlyURI string
postgresDSN string
memcachedURIs []string

logJSON bool
logLevel string
Expand Down
2 changes: 1 addition & 1 deletion scripts/sse-event-logger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {

log.Infof("Using beacon endpoints: %s", strings.Join(beaconURIs, ", "))
for _, uri := range beaconURIs {
beaconInstance := beaconclient.NewProdBeaconInstance(log, uri)
beaconInstance := beaconclient.NewProdBeaconInstance(log, uri, uri)
go subscribeHead(beaconInstance)
go subscribePayloadAttr(beaconInstance)
}
Expand Down
7 changes: 0 additions & 7 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,13 +2007,6 @@ func (api *RelayAPI) handleSubmitNewBlock(w http.ResponseWriter, req *http.Reque
simResultC := make(chan *blockSimResult, 1)
var eligibleAt time.Time // will be set once the bid is ready

submission, err = common.GetBlockSubmissionInfo(payload)
if err != nil {
log.WithError(err).Warn("missing fields in submit block request")
api.RespondError(w, http.StatusBadRequest, err.Error())
return
}

bfOpts := bidFloorOpts{
w: w,
tx: tx,
Expand Down

0 comments on commit 256fd3c

Please sign in to comment.