Skip to content

Commit

Permalink
cmd: add load test for beacon node (#3182)
Browse files Browse the repository at this point in the history
Add beacon node load test, which is triggered by a flag. In contrast to the other load tests, this one should be manually triggered, as people might use external beacon nodes against which they might not want to run load tests (infra not in their control either way and risking getting blacklisted).

Small refactors in other files.

category: feature
ticket: none
  • Loading branch information
KaloyanTanev authored Jul 25, 2024
1 parent 4249725 commit 8b1c251
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 23 deletions.
16 changes: 15 additions & 1 deletion cmd/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ const (

// failed tests
testVerdictFail testVerdict = "Fail"

// skipped tests
testVerdictSkipped testVerdict = "Skip"
)

type categoryScore string
Expand Down Expand Up @@ -278,7 +281,7 @@ func calculateScore(results []testResult) categoryScore {
}

continue
case testVerdictOk:
case testVerdictOk, testVerdictSkipped:
continue
}
}
Expand Down Expand Up @@ -325,3 +328,14 @@ func blockAndWait(ctx context.Context, awaitTime time.Duration) {
log.Info(ctx, "Forcefully stopped")
}
}

func sleepWithContext(ctx context.Context, d time.Duration) {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
}
}
107 changes: 97 additions & 10 deletions cmd/testbeacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/http/httptrace"
"strconv"
"sync"
"time"

eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
Expand All @@ -18,19 +21,24 @@ import (
"golang.org/x/sync/errgroup"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)

type testBeaconConfig struct {
testConfig
Endpoints []string
Endpoints []string
EnableLoadTest bool
LoadTestDuration time.Duration
}

type testCaseBeacon func(context.Context, *testBeaconConfig, string) testResult

const (
thresholdBeaconMeasureAvg = 40 * time.Millisecond
thresholdBeaconMeasureBad = 100 * time.Millisecond
thresholdBeaconLoadAvg = 40 * time.Millisecond
thresholdBeaconLoadBad = 100 * time.Millisecond
thresholdBeaconPeersAvg = 50
thresholdBeaconPeersBad = 20
)
Expand Down Expand Up @@ -61,6 +69,8 @@ func bindTestBeaconFlags(cmd *cobra.Command, config *testBeaconConfig) {
const endpoints = "endpoints"
cmd.Flags().StringSliceVar(&config.Endpoints, endpoints, nil, "[REQUIRED] Comma separated list of one or more beacon node endpoint URLs.")
mustMarkFlagRequired(cmd, endpoints)
cmd.Flags().BoolVar(&config.EnableLoadTest, "enable-load-test", false, "Enable load test, not advisable when testing towards external beacon nodes.")
cmd.Flags().DurationVar(&config.LoadTestDuration, "load-test-duration", 5*time.Second, "Time to keep running the load tests in seconds. For each second a new continuous ping instance is spawned.")
}

func supportedBeaconTestCases() map[testCaseName]testCaseBeacon {
Expand All @@ -69,6 +79,7 @@ func supportedBeaconTestCases() map[testCaseName]testCaseBeacon {
{name: "pingMeasure", order: 2}: beaconPingMeasureTest,
{name: "isSynced", order: 3}: beaconIsSyncedTest,
{name: "peerCount", order: 4}: beaconPeerCountTest,
{name: "pingLoad", order: 5}: beaconPingLoadTest,
}
}

Expand Down Expand Up @@ -229,9 +240,7 @@ func beaconPingTest(ctx context.Context, _ *testBeaconConfig, target string) tes
return testRes
}

func beaconPingMeasureTest(ctx context.Context, _ *testBeaconConfig, target string) testResult {
testRes := testResult{Name: "PingMeasure"}

func beaconPingOnce(ctx context.Context, target string) (time.Duration, error) {
var start time.Time
var firstByte time.Duration

Expand All @@ -245,27 +254,105 @@ func beaconPingMeasureTest(ctx context.Context, _ *testBeaconConfig, target stri
targetEndpoint := fmt.Sprintf("%v/eth/v1/node/health", target)
req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, trace), http.MethodGet, targetEndpoint, nil)
if err != nil {
return failedTestResult(testRes, err)
return 0, errors.Wrap(err, "create new request with trace and context")
}

resp, err := http.DefaultTransport.RoundTrip(req)
if err != nil {
return failedTestResult(testRes, err)
return 0, err
}
defer resp.Body.Close()

if resp.StatusCode > 399 {
return failedTestResult(testRes, errors.New("status code %v", z.Int("status_code", resp.StatusCode)))
return 0, errors.New("status code %v", z.Int("status_code", resp.StatusCode))
}

if firstByte > thresholdBeaconMeasureBad {
return firstByte, nil
}

func beaconPingMeasureTest(ctx context.Context, _ *testBeaconConfig, target string) testResult {
testRes := testResult{Name: "PingMeasure"}

rtt, err := beaconPingOnce(ctx, target)
if err != nil {
return failedTestResult(testRes, err)
}

if rtt > thresholdBeaconMeasureBad {
testRes.Verdict = testVerdictBad
} else if rtt > thresholdBeaconMeasureAvg {
testRes.Verdict = testVerdictAvg
} else {
testRes.Verdict = testVerdictGood
}
testRes.Measurement = Duration{rtt}.String()

return testRes
}

func pingBeaconContinuously(ctx context.Context, target string, resCh chan<- time.Duration) {
for {
rtt, err := beaconPingOnce(ctx, target)
if err != nil {
return
}
select {
case <-ctx.Done():
return
case resCh <- rtt:
awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here
sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond)
}
}
}

func beaconPingLoadTest(ctx context.Context, conf *testBeaconConfig, target string) testResult {
testRes := testResult{Name: "BeaconLoad"}
if !conf.EnableLoadTest {
testRes.Verdict = testVerdictSkipped
return testRes
}
log.Info(ctx, "Running ping load tests...",
z.Any("duration", conf.LoadTestDuration),
z.Any("target", target),
)

testResCh := make(chan time.Duration, math.MaxInt16)
pingCtx, cancel := context.WithTimeout(ctx, conf.LoadTestDuration)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

var wg sync.WaitGroup
for pingCtx.Err() == nil {
select {
case <-ticker.C:
wg.Add(1)
go func() {
pingBeaconContinuously(pingCtx, target, testResCh)
wg.Done()
}()
case <-pingCtx.Done():
}
}
wg.Wait()
close(testResCh)
log.Info(ctx, "Ping load tests finished", z.Any("target", target))

highestRTT := time.Duration(0)
for rtt := range testResCh {
if rtt > highestRTT {
highestRTT = rtt
}
}
if highestRTT > thresholdBeaconLoadBad {
testRes.Verdict = testVerdictBad
} else if firstByte > thresholdBeaconMeasureAvg {
} else if highestRTT > thresholdBeaconLoadAvg {
testRes.Verdict = testVerdictAvg
} else {
testRes.Verdict = testVerdictGood
}
testRes.Measurement = Duration{firstByte}.String()
testRes.Measurement = Duration{highestRTT}.String()

return testRes
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/testbeacon_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestBeaconTest(t *testing.T) {
{Name: "pingMeasure", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}},
{Name: "isSynced", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}},
{Name: "peerCount", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}},
{Name: "pingLoad", Verdict: testVerdictSkipped, Measurement: "", Suggestion: "", Error: testResultError{}},
},
},
},
Expand All @@ -78,12 +79,14 @@ func TestBeaconTest(t *testing.T) {
{Name: "pingMeasure", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "isSynced", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "peerCount", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "pingLoad", Verdict: testVerdictSkipped, Measurement: "", Suggestion: "", Error: testResultError{}},
},
endpoint2: {
{Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "pingMeasure", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "isSynced", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "peerCount", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "pingLoad", Verdict: testVerdictSkipped, Measurement: "", Suggestion: "", Error: testResultError{}},
},
},
},
Expand Down Expand Up @@ -130,12 +133,14 @@ func TestBeaconTest(t *testing.T) {
{Name: "pingMeasure", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "isSynced", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "peerCount", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "pingLoad", Verdict: testVerdictSkipped, Measurement: "", Suggestion: "", Error: testResultError{}},
},
endpoint2: {
{Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "pingMeasure", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "isSynced", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "peerCount", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "pingLoad", Verdict: testVerdictSkipped, Measurement: "", Suggestion: "", Error: testResultError{}},
},
},
},
Expand Down Expand Up @@ -196,12 +201,14 @@ func TestBeaconTest(t *testing.T) {
{Name: "pingMeasure", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "isSynced", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "peerCount", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port1))}},
{Name: "pingLoad", Verdict: testVerdictSkipped, Measurement: "", Suggestion: "", Error: testResultError{}},
},
endpoint2: {
{Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "pingMeasure", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "isSynced", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "peerCount", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: testResultError{errors.New(fmt.Sprintf(`%v: connect: connection refused`, port2))}},
{Name: "pingLoad", Verdict: testVerdictSkipped, Measurement: "", Suggestion: "", Error: testResultError{}},
},
},
Score: categoryScoreC,
Expand Down
11 changes: 0 additions & 11 deletions cmd/testpeers.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,6 @@ func pingPeerContinuously(ctx context.Context, tcpNode host.Host, peer p2p.Peer,
}
}

func sleepWithContext(ctx context.Context, d time.Duration) {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
}
}

func runTestPeers(ctx context.Context, w io.Writer, conf testPeersConfig) error {
relayTestCases := supportedRelayTestCases()
queuedTestsRelay := filterTests(maps.Keys(relayTestCases), conf.testConfig)
Expand Down
3 changes: 2 additions & 1 deletion cmd/testvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func pingValidatorContinuously(ctx context.Context, address string, resCh chan<-
}

func validatorPingLoadTest(ctx context.Context, conf *testValidatorConfig) testResult {
log.Info(ctx, "Running validator load tests...",
log.Info(ctx, "Running ping load tests...",
z.Any("duration", conf.LoadTestDuration),
z.Any("target", conf.APIAddress),
)
Expand All @@ -256,6 +256,7 @@ func validatorPingLoadTest(ctx context.Context, conf *testValidatorConfig) testR
}
wg.Wait()
close(testResCh)
log.Info(ctx, "Ping load tests finished", z.Any("target", conf.APIAddress))

highestRTT := time.Duration(0)
for rtt := range testResCh {
Expand Down

0 comments on commit 8b1c251

Please sign in to comment.