Skip to content

Commit

Permalink
Allow infinite retry of actions until timeout (#519)
Browse files Browse the repository at this point in the history
To wait for a scaling operation to completed, we port-forward to a
gateway and repeatedly query the status. However, it sometimes failed
because the port-forwarding disconnected but it was not retried until
the action is retried. By default the actions are retried 3 times. So we
have configured a long timeout (around 30 minutes) for the query and
wait. That means, when the port-forwarding disconnected the action was
still running for 30 minutes, with out a successful query response. If
we fail the action early, it might result in an incident because the
retries is hard-coded to 3.

Instead, as a generic solution for retries, this PR allows infinite
retry of all actions until timeout. This can simplify operations that
has to repeatedly query and wait. For example, `verify readiness` or
`cluster wait`, we can in theory remove the loop with in the code and
instead can be retried by just adjusting the timeout in the chaos action
provider parameters.

PS:- Although with these changes implementation of `verify readiness`
can be simplified, it is not done in this PR to reduce the impact of
these changes.

closes #516
  • Loading branch information
Zelldon authored Apr 4, 2024
2 parents 9dbad5e + 60eeca5 commit 7466f8e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 14 deletions.
13 changes: 8 additions & 5 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int, replica
ensureNoError(err)

// Wait for brokers to leave before scaling down
err = waitForChange(port, changeResponse.ChangeId)
timeout := time.Minute * 25
err = waitForChange(port, changeResponse.ChangeId, timeout)
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)

Expand Down Expand Up @@ -204,12 +205,13 @@ func portForwardAndWaitForChange(flags *Flags) error {
port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()

return waitForChange(port, flags.changeId)
// Wait for shorter time. Retry and longer timeout can be configured in the chaos experiment description
timeout := time.Minute * 5
return waitForChange(port, flags.changeId, timeout)
}

func waitForChange(port int, changeId int64) error {
func waitForChange(port int, changeId int64, timeout time.Duration) error {
interval := time.Second * 5
timeout := (time.Minute * 25)
iterations := int(timeout / interval)
for i := 0; i < int(iterations); i++ {
topology, err := QueryTopology(port)
Expand Down Expand Up @@ -270,7 +272,8 @@ func forceFailover(flags *Flags) error {
changeResponse, err := sendScaleRequest(port, brokersInRegion, true, -1)
ensureNoError(err)

err = waitForChange(port, changeResponse.ChangeId)
timeout := time.Minute * 5
err = waitForChange(port, changeResponse.ChangeId, timeout)
ensureNoError(err)

return nil
Expand Down
4 changes: 3 additions & 1 deletion go-chaos/worker/chaos_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ func HandleZbChaosJob(client worker.JobClient, job entities.Job, commandRunner C
err = commandRunner(commandArgs, commandCtx)
if err != nil {
internal.LogInfo("Error on running command. [key: %d, args: %s]. Error: %s", job.Key, commandArgs, err.Error())
_, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries - 1).Send(ctx)
backoffDuration := time.Duration(10) * time.Second
// Do not reduce number of retries. The failed job can be retried several times until the configured timeout in chaos action provider
_, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries).RetryBackoff(backoffDuration).Send(ctx)
return
}

Expand Down
6 changes: 4 additions & 2 deletions go-chaos/worker/chaos_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"testing"
"time"

"github.com/camunda/zeebe/clients/go/v8/pkg/entities"
"github.com/camunda/zeebe/clients/go/v8/pkg/pb"
Expand Down Expand Up @@ -237,7 +238,9 @@ func Test_ShouldFailJobWhenHandleFails(t *testing.T) {
// then
assert.True(t, fakeJobClient.Failed)
assert.Equal(t, 123, fakeJobClient.Key)
assert.Equal(t, 2, fakeJobClient.RetriesVal)
// retry count is not decreased
assert.Equal(t, 3, fakeJobClient.RetriesVal)
assert.Equal(t, time.Duration(10)*time.Second, fakeJobClient.RetryBackoff)
var expectedArgs = []string{
"--namespace", "clusterId-zeebe",
"disconnect", "gateway",
Expand All @@ -254,7 +257,6 @@ func createVariablesAsJson() (string, error) {

marshal, err := json.Marshal(variables)
return string(marshal), err

}

func createZbChaosVariables() ZbChaosVariables {
Expand Down
19 changes: 13 additions & 6 deletions go-chaos/worker/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package worker

import (
"context"
"time"

"github.com/camunda/zeebe/clients/go/v8/pkg/commands"
"github.com/camunda/zeebe/clients/go/v8/pkg/pb"
Expand All @@ -25,12 +26,13 @@ import (
type FakeJobClient struct {
worker.JobClient

Key int
RetriesVal int
ErrorMsg string
Failed bool
Succeeded bool
Variables interface{}
Key int
RetriesVal int
RetryBackoff time.Duration
ErrorMsg string
Failed bool
Succeeded bool
Variables interface{}
}

type FakeCompleteClient struct {
Expand Down Expand Up @@ -84,6 +86,11 @@ func (f *FakeFailClient) Retries(retries int32) commands.FailJobCommandStep3 {
return f
}

func (f *FakeFailClient) RetryBackoff(retryBackoff time.Duration) commands.FailJobCommandStep3 {
f.JobClient.RetryBackoff = retryBackoff
return f
}

func (f *FakeFailClient) ErrorMessage(errorMsg string) commands.FailJobCommandStep3 {
f.JobClient.ErrorMsg = errorMsg
return f
Expand Down

0 comments on commit 7466f8e

Please sign in to comment.