Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ActionQueue retry on failure #1944

Merged
merged 6 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ee/control/actionqueue/actionqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (aq *ActionQueue) Update(data io.Reader) error {
return fmt.Errorf("failed to decode actions data: %w", err)
}

var processError error = nil

for _, rawAction := range rawActionsToProcess {
var action action
if err := json.Unmarshal(rawAction, &action); err != nil {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (aq *ActionQueue) Update(data io.Reader) error {
"failed to do action with action, not marking action complete",
"err", err,
)
processError = fmt.Errorf("actor.Do failed: %w", err)
cesarfda marked this conversation as resolved.
Show resolved Hide resolved
continue
}

Expand All @@ -136,7 +139,7 @@ func (aq *ActionQueue) Update(data io.Reader) error {
aq.storeActionRecord(action)
}

return nil
return processError
}

func (aq *ActionQueue) RegisterActor(actorType string, actorToRegister actor) {
Expand Down
9 changes: 7 additions & 2 deletions ee/control/actionqueue/actionqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func TestActionQueue_HandlesDuplicatesWhenFirstActionCouldNotBeSent(t *testing.T
}
testActionsRaw, err := json.Marshal(actions)
require.NoError(t, err)
testActionsData := bytes.NewReader(testActionsRaw)

// Expect that the actor is called twice: once to unsuccessfully send the first action, and again to send the duplicate successfully
mockActor := mocks.NewActor(t)
Expand All @@ -194,7 +193,13 @@ func TestActionQueue_HandlesDuplicatesWhenFirstActionCouldNotBeSent(t *testing.T
// Call Do and assert our expectations about completed actions
actionqueue := New(mockKnapsack)
actionqueue.RegisterActor(testActorType, mockActor)
require.NoError(t, actionqueue.Update(testActionsData))
// First attempt fails
err = actionqueue.Update(bytes.NewReader(testActionsRaw))
require.Error(t, err)

// Second attempt succeeds
err = actionqueue.Update(bytes.NewReader(testActionsRaw))
require.NoError(t, err)
}

func TestCleanup(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions ee/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,12 @@ func (cs *ControlService) fetchAndUpdate(subsystem, hash string) error {

// Consumer and subscriber(s) notified now
if err := cs.update(subsystem, data); err != nil {
// Although we failed to update, the payload may be bad and there's no
// sense in repeatedly attempting to apply a bad update.
// A new update will have a new hash, so continue and remember this hash.
// Returning the error so we don't store the hash and we can try again next time
slogger.Log(context.TODO(), slog.LevelError,
"failed to update consumers and subscribers",
"err", err,
)
return err
}

// Remember the hash of the last fetched version of this subsystem's data
Expand Down
90 changes: 88 additions & 2 deletions ee/control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (

type (
mockConsumer struct {
updates int
updates int
updateErr error
}
mockSubscriber struct {
pings int
Expand All @@ -35,7 +36,7 @@ type (

func (mc *mockConsumer) Update(io.Reader) error {
mc.updates++
return nil
return mc.updateErr
}

func (ms *mockSubscriber) Ping() {
Expand Down Expand Up @@ -198,6 +199,91 @@ func TestControlServiceUpdate(t *testing.T) {
}
}

func TestControlServiceUpdateErr(t *testing.T) {
t.Parallel()

// Create mock consumer that returns error on update
errConsumer := &mockConsumer{
updateErr: errors.New("simulated update failure"),
}

mockKnapsack := typesMocks.NewKnapsack(t)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ControlRequestInterval)
mockKnapsack.On("ControlRequestInterval").Return(60 * time.Second)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())

// Set up test data with known hash
subsystems := map[string]string{"actions": "abc123"}
hashData := map[string]any{"abc123": "test-data"}
data, _ := NewControlTestClient(subsystems, hashData)

// Create control service and register error-producing consumer
store := &mockStore{keyValues: make(map[string]string)}
controlOpts := []Option{WithStore(store)}
cs := New(mockKnapsack, data, controlOpts...)
err := cs.RegisterConsumer("actions", errConsumer)
require.NoError(t, err)

// Trigger fetch which should cause update error
err = cs.Fetch()
require.NoError(t, err)

// Verify error consumer was called
assert.Equal(t, 1, errConsumer.updates)

// Verify hash was not recorded due to error
val, err := store.Get([]byte("actions"))
require.NoError(t, err)
assert.Empty(t, string(val))
}

func TestControlServiceRetryAfterUpdateErr(t *testing.T) {
t.Parallel()

errConsumer := &mockConsumer{
updateErr: errors.New("simulated update failure"),
}

mockKnapsack := typesMocks.NewKnapsack(t)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ControlRequestInterval)
mockKnapsack.On("ControlRequestInterval").Return(60 * time.Second)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())

// First fetch data
subsystems := map[string]string{"actions": "abc123"}
hashData := map[string]any{"abc123": "test-data-1"}
data, _ := NewControlTestClient(subsystems, hashData)

store := &mockStore{keyValues: make(map[string]string)}
controlOpts := []Option{WithStore(store)}
cs := New(mockKnapsack, data, controlOpts...)

err := cs.RegisterConsumer("actions", errConsumer)
require.NoError(t, err)

// First fetch - should fail and not store hash
err = cs.Fetch()
require.NoError(t, err)
assert.Equal(t, 1, errConsumer.updates)

// Create new test client with updated hash
subsystems = map[string]string{"actions": "def456"}
hashData = map[string]any{"def456": "test-data-2"}
newData, _ := NewControlTestClient(subsystems, hashData)
cs.fetcher = newData

// Second fetch with new hash - should succeed
errConsumer.updateErr = nil
err = cs.Fetch()
require.NoError(t, err)
assert.Equal(t, 2, errConsumer.updates)

// Verify final hash was recorded
val, err := store.Get([]byte("actions"))
require.NoError(t, err)
assert.Equal(t, "def456", string(val))
}

func TestControlServiceFetch(t *testing.T) {
t.Parallel()

Expand Down
Loading