Skip to content

Commit

Permalink
[tmpnet] Bootstrap subnets with a single node
Browse files Browse the repository at this point in the history
Speed up subnet creation for a network of multiple nodes by:

 - starting a single node with sybil protection disabled
 - creating subnets and chains
 - restarting the node with sybil protection enabled
 - starting the remaining nodes
  • Loading branch information
marun committed May 13, 2024
1 parent b5c6acc commit fe898b2
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 56 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
FILTER_BY_OWNER: avalanchego-e2e
- name: Run e2e tests
shell: bash
run: E2E_SERIAL=1 ./scripts/tests.e2e.sh
run: E2E_SERIAL=1 ./scripts/tests.e2e.sh --delay-network-shutdown
env:
GH_REPO: ${{ github.repository }}
GH_WORKFLOW: ${{ github.workflow }}
Expand Down Expand Up @@ -132,7 +132,7 @@ jobs:
GH_JOB_ID: ${{ github.job }}
- name: Run e2e tests with existing network
shell: bash
run: E2E_SERIAL=1 ./scripts/tests.e2e.existing.sh
run: E2E_SERIAL=1 ./scripts/tests.e2e.existing.sh --delay-network-shutdown
env:
GH_REPO: ${{ github.repository }}
GH_WORKFLOW: ${{ github.workflow }}
Expand Down
5 changes: 3 additions & 2 deletions scripts/tests.e2e.existing.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set -euo pipefail

# e.g.,
# ./scripts/build.sh
# ./scripts/tests.e2e.sh --ginkgo.label-filter=x # All arguments are supplied to ginkgo
# AVALANCHEGO_PATH=./build/avalanchego ./scripts/tests.e2e.existing.sh # Customization of avalanchego path
if ! [[ "$0" =~ scripts/tests.e2e.existing.sh ]]; then
echo "must be run from repository root"
Expand All @@ -27,7 +28,7 @@ trap cleanup EXIT

print_separator
echo "starting initial test run that should create the reusable network"
./scripts/tests.e2e.sh --reuse-network --ginkgo.focus-file=xsvm.go
./scripts/tests.e2e.sh --reuse-network --ginkgo.focus-file=xsvm.go "${@}"

print_separator
echo "determining the network path of the reusable network created by the first test run"
Expand All @@ -36,7 +37,7 @@ INITIAL_NETWORK_DIR="$(realpath "${SYMLINK_PATH}")"

print_separator
echo "starting second test run that should reuse the network created by the first run"
./scripts/tests.e2e.sh --reuse-network --ginkgo.focus-file=xsvm.go
./scripts/tests.e2e.sh --reuse-network --ginkgo.focus-file=xsvm.go "${@}"

SUBSEQUENT_NETWORK_DIR="$(realpath "${SYMLINK_PATH}")"
echo "checking that the symlink path remains the same, indicating that the network was reused"
Expand Down
15 changes: 13 additions & 2 deletions tests/fixture/e2e/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type FlagVars struct {
pluginDir string
networkDir string
reuseNetwork bool
delayNetworkShutdown bool
networkShutdownDelay time.Duration
stopNetwork bool
nodeCount int
Expand Down Expand Up @@ -45,7 +46,11 @@ func (v *FlagVars) ReuseNetwork() bool {
}

func (v *FlagVars) NetworkShutdownDelay() time.Duration {
return v.networkShutdownDelay
if v.delayNetworkShutdown {
// Only return a non-zero value if the delay is enabled
return v.networkShutdownDelay
}
return 0
}

func (v *FlagVars) StopNetwork() bool {
Expand Down Expand Up @@ -82,11 +87,17 @@ func RegisterFlags() *FlagVars {
false,
"[optional] reuse an existing network. If an existing network is not already running, create a new one and leave it running for subsequent usage.",
)
flag.BoolVar(
&vars.delayNetworkShutdown,
"delay-network-shutdown",
false,
"[optional] whether to delay network shutdown to allow a final metrics scrape. The duration is set with --network-shutdown-delay.",
)
flag.DurationVar(
&vars.networkShutdownDelay,
"network-shutdown-delay",
12*time.Second, // Make sure this value takes into account the scrape_interval defined in scripts/run_prometheus.sh
"[optional] the duration to wait before shutting down the test network at the end of the test run. A value greater than the scrape interval is suggested. 0 avoids waiting for shutdown.",
"[optional] the duration to wait before shutting down the test network at the end of the test run. A value greater than the scrape interval is suggested.",
)
flag.BoolVar(
&vars.stopNetwork,
Expand Down
2 changes: 1 addition & 1 deletion tests/fixture/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func StartNetwork(
require := require.New(ginkgo.GinkgoT())

require.NoError(
tmpnet.StartNewNetwork(
tmpnet.BootstrapNewNetwork(
DefaultContext(),
ginkgo.GinkgoWriter,
network,
Expand Down
2 changes: 1 addition & 1 deletion tests/fixture/tmpnet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ network := &tmpnet.Network{ // Configure non-default values fo
},
}

_ := tmpnet.StartNewNetwork( // Start the network
_ := tmpnet.BootstrapNewNetwork( // Bootstrap the network
ctx, // Context used to limit duration of waiting for network health
ginkgo.GinkgoWriter, // Writer to report progress of initialization
network,
Expand Down
2 changes: 1 addition & 1 deletion tests/fixture/tmpnet/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {

ctx, cancel := context.WithTimeout(context.Background(), networkStartTimeout)
defer cancel()
err := tmpnet.StartNewNetwork(
err := tmpnet.BootstrapNewNetwork(
ctx,
os.Stdout,
network,
Expand Down
144 changes: 97 additions & 47 deletions tests/fixture/tmpnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func toCanonicalDir(dir string) (string, error) {
return filepath.EvalSymlinks(absDir)
}

func StartNewNetwork(
func BootstrapNewNetwork(
ctx context.Context,
w io.Writer,
network *Network,
Expand All @@ -144,10 +144,7 @@ func StartNewNetwork(
if err := network.Create(rootNetworkDir); err != nil {
return err
}
if err := network.Start(ctx, w); err != nil {
return err
}
return network.CreateSubnets(ctx, w)
return network.Bootstrap(ctx, w)
}

// Stops the nodes of the network configured in the provided directory.
Expand Down Expand Up @@ -325,26 +322,33 @@ func (n *Network) Create(rootDir string) error {
return n.Write()
}

// Starts all nodes in the network
func (n *Network) Start(ctx context.Context, w io.Writer) error {
if _, err := fmt.Fprintf(w, "Starting network %s (UUID: %s)\n", n.Dir, n.UUID); err != nil {
return err
// Starts the specified nodes
func (n *Network) Start(ctx context.Context, w io.Writer, nodesToStart ...*Node) error {
nodesToWaitFor := nodesToStart
if len(nodesToStart) < len(n.Nodes) && nodesToStart[0] != n.Nodes[0] {
// Wait for all nodes to ensure health is logged for the bootstrap node
nodesToWaitFor = n.Nodes
} else {
// Simplify output by only logging network start for the first node or when all nodes are starting at once
if _, err := fmt.Fprintf(w, "Starting network %s (UUID: %s)\n", n.Dir, n.UUID); err != nil {
return err
}
}

// Record the time before nodes are started to ensure visibility of subsequently collected metrics via the emitted link
startTime := time.Now()

// Configure the networking for each node and start
for _, node := range n.Nodes {
for _, node := range nodesToStart {
if err := n.StartNode(ctx, w, node); err != nil {
return err
}
}

if _, err := fmt.Fprint(w, "Waiting for all nodes to report healthy...\n\n"); err != nil {
if _, err := fmt.Fprint(w, "Waiting for nodes to report healthy...\n\n"); err != nil {
return err
}
if err := n.WaitForHealthy(ctx, w); err != nil {
if err := waitForHealthy(ctx, w, nodesToWaitFor); err != nil {
return err
}
if _, err := fmt.Fprintf(w, "\nStarted network %s (UUID: %s)\n", n.Dir, n.UUID); err != nil {
Expand All @@ -358,6 +362,48 @@ func (n *Network) Start(ctx context.Context, w io.Writer) error {
return nil
}

// Start the network for the first time
func (n *Network) Bootstrap(ctx context.Context, w io.Writer) error {
if len(n.Subnets) == 0 || len(n.Nodes) == 1 {
// Start all nodes at once if a staged start is not worth it
return n.Start(ctx, w, n.Nodes...)
}

// Reduce the cost of subnet creation for a network of multiple nodes by
// creating subnets with a single node with sybil protection
// disabled. This allows the creation of initial subnet state without
// requiring coordination between multiple nodes.

bootstrapNode := n.Nodes[0]
if _, err := fmt.Fprintf(w, "Starting a single-node network with %s for quicker subnet creation\n", bootstrapNode.NodeID); err != nil {
return err
}
bootstrapNode.Flags[config.SybilProtectionEnabledKey] = false
if err := n.Start(ctx, w, bootstrapNode); err != nil {
return err
}
if err := n.CreateSubnets(ctx, w); err != nil {
return err
}

if _, err := fmt.Fprintf(w, "Restarting %s with sybil protection enabled to serve as bootstrap node\n", bootstrapNode.NodeID); err != nil {
return err
}
delete(bootstrapNode.Flags, config.SybilProtectionEnabledKey)
// Avoid using RestartNode since the node won't be able to report healthy until other nodes are started.
if err := bootstrapNode.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop node %s: %w", bootstrapNode.NodeID, err)
}
if err := n.StartNode(ctx, w, bootstrapNode); err != nil {
return fmt.Errorf("failed to start node %s: %w", bootstrapNode.NodeID, err)
}

if _, err := fmt.Fprint(w, "Starting remaining nodes...\n"); err != nil {
return err
}
return n.Start(ctx, w, n.Nodes[1:]...)
}

// Starts the provided node after configuring it for the network.
func (n *Network) StartNode(ctx context.Context, w io.Writer, node *Node) error {
if err := n.EnsureNodeConfig(node); err != nil {
Expand Down Expand Up @@ -415,41 +461,6 @@ func (n *Network) RestartNode(ctx context.Context, w io.Writer, node *Node) erro
return WaitForHealthy(ctx, node)
}

// Waits until all nodes in the network are healthy.
func (n *Network) WaitForHealthy(ctx context.Context, w io.Writer) error {
ticker := time.NewTicker(networkHealthCheckInterval)
defer ticker.Stop()

healthyNodes := set.NewSet[ids.NodeID](len(n.Nodes))
for healthyNodes.Len() < len(n.Nodes) {
for _, node := range n.Nodes {
if healthyNodes.Contains(node.NodeID) {
continue
}

healthy, err := node.IsHealthy(ctx)
if err != nil && !errors.Is(err, ErrNotRunning) {
return err
}
if !healthy {
continue
}

healthyNodes.Add(node.NodeID)
if _, err := fmt.Fprintf(w, "%s is healthy @ %s\n", node.NodeID, node.URI); err != nil {
return err
}
}

select {
case <-ctx.Done():
return fmt.Errorf("failed to see all nodes healthy before timeout: %w", ctx.Err())
case <-ticker.C:
}
}
return nil
}

// Stops all nodes in the network.
func (n *Network) Stop(ctx context.Context) error {
// Target all nodes, including the ephemeral ones
Expand Down Expand Up @@ -662,6 +673,10 @@ func (n *Network) CreateSubnets(ctx context.Context, w io.Writer) error {
reconfiguredNodes = append(reconfiguredNodes, node)
}
for _, node := range reconfiguredNodes {
if len(node.URI) == 0 {
// Only running nodes should be restarted
continue
}
if err := n.RestartNode(ctx, w, node); err != nil {
return err
}
Expand Down Expand Up @@ -781,6 +796,41 @@ func (n *Network) getBootstrapIPsAndIDs(skippedNode *Node) ([]string, []string,
return bootstrapIPs, bootstrapIDs, nil
}

// Waits until the provided nodes are healthy.
func waitForHealthy(ctx context.Context, w io.Writer, nodes []*Node) error {
ticker := time.NewTicker(networkHealthCheckInterval)
defer ticker.Stop()

healthyNodes := set.NewSet[ids.NodeID](len(nodes))
for healthyNodes.Len() < len(nodes) {
for _, node := range nodes {
if healthyNodes.Contains(node.NodeID) {
continue
}

healthy, err := node.IsHealthy(ctx)
if err != nil && !errors.Is(err, ErrNotRunning) {
return err
}
if !healthy {
continue
}

healthyNodes.Add(node.NodeID)
if _, err := fmt.Fprintf(w, "%s is healthy @ %s\n", node.NodeID, node.URI); err != nil {
return err
}
}

select {
case <-ctx.Done():
return fmt.Errorf("failed to see all nodes healthy before timeout: %w", ctx.Err())
case <-ticker.C:
}
}
return nil
}

// Retrieves the root dir for tmpnet data.
func getTmpnetPath() (string, error) {
homeDir, err := os.UserHomeDir()
Expand Down
4 changes: 4 additions & 0 deletions tests/fixture/tmpnet/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ func (n *Node) EnsureNodeID() error {
// labeling of metrics.
func (n *Node) SaveAPIPort() error {
hostPort := strings.TrimPrefix(n.URI, "http://")
if len(hostPort) == 0 {
// Without a staking address there is nothing to save
return nil
}
_, port, err := net.SplitHostPort(hostPort)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions tests/fixture/tmpnet/node_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (p *NodeProcess) Start(w io.Writer) error {
// A node writes a process context file on start. If the file is not
// found in a reasonable amount of time, the node is unlikely to have
// started successfully.
//
// TODO(marun) Consider waiting for the process context only for a
// bootstrap node (i.e. len(bootstrapIDs) == 0) to minimize
// startup time for larger numbers of nodes.
if err := p.waitForProcessContext(context.Background()); err != nil {
return fmt.Errorf("failed to start local node: %w", err)
}
Expand Down

0 comments on commit fe898b2

Please sign in to comment.