Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zbchaos): query current cluster topology #459

Merged
merged 7 commits into from
Dec 12, 2023
190 changes: 190 additions & 0 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2023 Camunda Services GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/spf13/cobra"
"github.com/zeebe-io/zeebe-chaos/go-chaos/internal"
)

func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) {
var clusterCommand = &cobra.Command{
Use: "cluster",
Short: "Interact with the Cluster API",
Long: "Can be used query cluster topology and to request dynamic scaling",
lenaschoenburg marked this conversation as resolved.
Show resolved Hide resolved
}
var statusCommand = &cobra.Command{
Use: "status",
Short: "Queries the current cluster topology",
RunE: func(cmd *cobra.Command, args []string) error {
return printCurrentTopology(flags)
},
}
var waitCommand = &cobra.Command{
Use: "wait",
Short: "Waits for a topology change to complete",
RunE: func(cmd *cobra.Command, args []string) error {
return waitForChange(flags)
},
}

rootCmd.AddCommand(clusterCommand)
clusterCommand.AddCommand(statusCommand)
clusterCommand.AddCommand(waitCommand)
waitCommand.Flags().IntVar(&flags.changeId, "changeId", 0, "The id of the change to wait for")
lenaschoenburg marked this conversation as resolved.
Show resolved Hide resolved
}

func printCurrentTopology(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}

err = k8Client.AwaitReadiness()
if err != nil {
return err
}

port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
defer closePortForward()

topology, err := queryTopology(port)
if err != nil {
return err
}
formatted, err := json.MarshalIndent(topology, "", " ")
lenaschoenburg marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
fmt.Println(string(formatted))
return nil
}

func waitForChange(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}

err = k8Client.AwaitReadiness()
if err != nil {
return err
}

port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
defer closePortForward()

for {
topology, err := queryTopology(port)
if err != nil {
return err
}
if topology.LastChange != nil && topology.LastChange.Id == int64(flags.changeId) {
if topology.LastChange.Status == "COMPLETED" {
internal.LogInfo("Change %d completed successfully", flags.changeId)
return nil
} else {
return fmt.Errorf("change %d failed with status %s", flags.changeId, topology.LastChange.Status)
}
} else if topology.LastChange != nil && topology.LastChange.Id > int64(flags.changeId) {
internal.LogInfo("Change %d is outdated but was most likely completed successfully, latest change is %d", flags.changeId, topology.LastChange.Id)
return nil
} else if topology.PendingChange != nil && topology.PendingChange.Id == int64(flags.changeId) {
competed := len(topology.PendingChange.Completed)
pending := len(topology.PendingChange.Pending)
total := competed + pending
internal.LogInfo("Change %d is %s with %d/%d operations complete", flags.changeId, topology.PendingChange.Status, competed, total)
} else {
internal.LogInfo("Change %d not yet started", flags.changeId)
}

lenaschoenburg marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(5 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 I would add some maximum time out or something, this looks to me like it could potentially loop forever? I know we assume the change will eventually complete, but still... 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should definitely fail at some point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point 👍 Picking a good value is tricky though, I'd expect scaling to take anywhere from 5 minutes to >30 minutes..

Using that in an automated chaos test is problematic anyway because we'll run into job timeouts... 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W00t 30 minutes 😆 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've set a timeout of 25 minutes now

}
}

func queryTopology(port int) (*CurrentTopology, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Would be great if you could add a IT for it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a minimal one now. None of the other commands have integration tests so I hope I did it in a way that is acceptable: ffd0a20

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No integration tests for other commands you say?

shock

url := fmt.Sprintf("http://localhost:%d/actuator/cluster", port)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var topology CurrentTopology
err = json.Unmarshal(body, &topology)
if err != nil {
return nil, err
}
return &topology, nil
}

type CurrentTopology struct {
Version int32
ChrisKujawa marked this conversation as resolved.
Show resolved Hide resolved
Brokers []BrokerState
LastChange *LastChange
PendingChange *TopologyChange
}

type BrokerState struct {
Id int32
State string
Version int64
LastUpdatedAt string
Partitions []PartitionState
}

type PartitionState struct {
Id int32
State string
Priority int32
}

type LastChange struct {
Id int64
Status string
StartedAt string
CompletedAt string
}

type TopologyChange struct {
Id int64
Status string
StartedAt string
CompletedAt string
InternalVersion int64
Completed []Operation
Pending []Operation
}

type Operation struct {
Operation string
BrokerId int32
PartitionId int32
Priority int32
}
4 changes: 4 additions & 0 deletions go-chaos/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Flags struct {
instanceCount int
jobCount int
jobType string

// cluster
changeId int
}

var Version = "development"
Expand Down Expand Up @@ -123,6 +126,7 @@ func NewCmd() *cobra.Command {
AddVerifyCommands(rootCmd, &flags)
AddVersionCmd(rootCmd)
AddWorkerCmd(rootCmd)
AddClusterCommands(rootCmd, &flags)

return rootCmd
}
Expand Down
Loading