Skip to content

Commit

Permalink
fix(zbchaos): ensure scale down waits for changes to complete
Browse files Browse the repository at this point in the history
  • Loading branch information
lenaschoenburg committed Dec 18, 2023
1 parent 55c7c9a commit d2f521c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 26 deletions.
61 changes: 36 additions & 25 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,45 +62,56 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) {
clusterCommand.AddCommand(scaleCommand)
scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to")
scaleCommand.MarkFlagRequired("brokers")
scaleCommand.Flags().BoolVar(&flags.wait, "wait", false, "Whether to wait for the scaling to complete")
}

func scaleCluster(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}
ensureNoError(err)

err = k8Client.AwaitReadiness()
if err != nil {
return err
}
ensureNoError(err)

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()

changeResponse, err := requestBrokerScaling(port, flags.brokers)
if err != nil {
return err
}
formatted, err := json.MarshalIndent(changeResponse, "", " ")
if err != nil {
return err
currentTopology, err := QueryTopology(port)
ensureNoError(err)
if currentTopology.PendingChange != nil {
return fmt.Errorf("cluster is already scaling")
}
fmt.Println(string(formatted))

_, err = k8Client.ScaleZeebeCluster(flags.brokers)
if err != nil {
return err
}

if flags.wait {
waitForChange(port, changeResponse.ChangeId)
if len(currentTopology.Brokers) > flags.brokers {
_, err = scaleDownBrokers(k8Client, port, flags.brokers)
} else if len(currentTopology.Brokers) < flags.brokers {
_, err = scaleUpBrokers(k8Client, port, flags.brokers)
} else {
return fmt.Errorf("cluster is already at size %d", flags.brokers)
}
ensureNoError(err)

return nil
}

func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)
ensureNoError(err)
waitForChange(port, changeResponse.ChangeId)
return changeResponse, nil
}

func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
ensureNoError(err)

// Wait for brokers to leave before scaling down
waitForChange(port, changeResponse.ChangeId)
_, err = k8Client.ScaleZeebeCluster(brokers)

ensureNoError(err)
return changeResponse, nil
}

func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) {
brokerIds := make([]int32, brokers)
for i := 0; i < brokers; i++ {
Expand Down Expand Up @@ -158,7 +169,7 @@ func printCurrentTopology(flags *Flags) error {
if err != nil {
return err
}
fmt.Println(string(formatted))
internal.LogInfo("Current topology: %s", string(formatted))
return nil
}

Expand Down Expand Up @@ -207,7 +218,7 @@ func waitForChange(port int, changeId int64) error {
case ChangeStatusUnknown:
internal.LogInfo("Change %d not yet started", changeId)
}
internal.LogInfo("Waiting %s before checking again. Iteration %d out of %d", interval, i, iterations)
internal.LogVerbose("Waiting %s before checking again. Iteration %d out of %d", interval, i, iterations)
time.Sleep(interval)
}

Expand Down
1 change: 0 additions & 1 deletion go-chaos/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ type Flags struct {
// cluster
changeId int64
brokers int
wait bool
}

var Version = "development"
Expand Down

0 comments on commit d2f521c

Please sign in to comment.