Skip to content

Commit

Permalink
ActionQueue retry on failure (#1944)
Browse files Browse the repository at this point in the history
Co-authored-by: James Pickett <James-Pickett@users.noreply.github.com>
  • Loading branch information
cesarfda and James-Pickett authored Nov 8, 2024
1 parent 4fa558a commit 8f090c9
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 8 deletions.
5 changes: 4 additions & 1 deletion ee/control/actionqueue/actionqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (aq *ActionQueue) Update(data io.Reader) error {
return fmt.Errorf("failed to decode actions data: %w", err)
}

var processError error = nil

for _, rawAction := range rawActionsToProcess {
var action action
if err := json.Unmarshal(rawAction, &action); err != nil {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (aq *ActionQueue) Update(data io.Reader) error {
"failed to do action with action, not marking action complete",
"err", err,
)
processError = fmt.Errorf("actor.Do, action type: %s, failed: %w", action.Type, err)
continue
}

Expand All @@ -136,7 +139,7 @@ func (aq *ActionQueue) Update(data io.Reader) error {
aq.storeActionRecord(action)
}

return nil
return processError
}

func (aq *ActionQueue) RegisterActor(actorType string, actorToRegister actor) {
Expand Down
9 changes: 7 additions & 2 deletions ee/control/actionqueue/actionqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func TestActionQueue_HandlesDuplicatesWhenFirstActionCouldNotBeSent(t *testing.T
}
testActionsRaw, err := json.Marshal(actions)
require.NoError(t, err)
testActionsData := bytes.NewReader(testActionsRaw)

// Expect that the actor is called twice: once to unsuccessfully send the first action, and again to send the duplicate successfully
mockActor := mocks.NewActor(t)
Expand All @@ -194,7 +193,13 @@ func TestActionQueue_HandlesDuplicatesWhenFirstActionCouldNotBeSent(t *testing.T
// Call Do and assert our expectations about completed actions
actionqueue := New(mockKnapsack)
actionqueue.RegisterActor(testActorType, mockActor)
require.NoError(t, actionqueue.Update(testActionsData))
// First attempt fails
err = actionqueue.Update(bytes.NewReader(testActionsRaw))
require.Error(t, err)

// Second attempt succeeds
err = actionqueue.Update(bytes.NewReader(testActionsRaw))
require.NoError(t, err)
}

func TestCleanup(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions ee/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,12 @@ func (cs *ControlService) fetchAndUpdate(subsystem, hash string) error {

// Consumer and subscriber(s) notified now
if err := cs.update(subsystem, data); err != nil {
// Although we failed to update, the payload may be bad and there's no
// sense in repeatedly attempting to apply a bad update.
// A new update will have a new hash, so continue and remember this hash.
// Returning the error so we don't store the hash and we can try again next time
slogger.Log(context.TODO(), slog.LevelError,
"failed to update consumers and subscribers",
"err", err,
)
return err
}

// Remember the hash of the last fetched version of this subsystem's data
Expand Down
90 changes: 88 additions & 2 deletions ee/control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (

type (
mockConsumer struct {
updates int
updates int
updateErr error
}
mockSubscriber struct {
pings int
Expand All @@ -35,7 +36,7 @@ type (

func (mc *mockConsumer) Update(io.Reader) error {
mc.updates++
return nil
return mc.updateErr
}

func (ms *mockSubscriber) Ping() {
Expand Down Expand Up @@ -198,6 +199,91 @@ func TestControlServiceUpdate(t *testing.T) {
}
}

func TestControlServiceUpdateErr(t *testing.T) {
t.Parallel()

// Create mock consumer that returns error on update
errConsumer := &mockConsumer{
updateErr: errors.New("simulated update failure"),
}

mockKnapsack := typesMocks.NewKnapsack(t)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ControlRequestInterval)
mockKnapsack.On("ControlRequestInterval").Return(60 * time.Second)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())

// Set up test data with known hash
subsystems := map[string]string{"actions": "abc123"}
hashData := map[string]any{"abc123": "test-data"}
data, _ := NewControlTestClient(subsystems, hashData)

// Create control service and register error-producing consumer
store := &mockStore{keyValues: make(map[string]string)}
controlOpts := []Option{WithStore(store)}
cs := New(mockKnapsack, data, controlOpts...)
err := cs.RegisterConsumer("actions", errConsumer)
require.NoError(t, err)

// Trigger fetch which should cause update error
err = cs.Fetch()
require.NoError(t, err)

// Verify error consumer was called
assert.Equal(t, 1, errConsumer.updates)

// Verify hash was not recorded due to error
val, err := store.Get([]byte("actions"))
require.NoError(t, err)
assert.Empty(t, string(val))
}

func TestControlServiceRetryAfterUpdateErr(t *testing.T) {
t.Parallel()

errConsumer := &mockConsumer{
updateErr: errors.New("simulated update failure"),
}

mockKnapsack := typesMocks.NewKnapsack(t)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ControlRequestInterval)
mockKnapsack.On("ControlRequestInterval").Return(60 * time.Second)
mockKnapsack.On("Slogger").Return(multislogger.NewNopLogger())

// First fetch data
subsystems := map[string]string{"actions": "abc123"}
hashData := map[string]any{"abc123": "test-data-1"}
data, _ := NewControlTestClient(subsystems, hashData)

store := &mockStore{keyValues: make(map[string]string)}
controlOpts := []Option{WithStore(store)}
cs := New(mockKnapsack, data, controlOpts...)

err := cs.RegisterConsumer("actions", errConsumer)
require.NoError(t, err)

// First fetch - should fail and not store hash
err = cs.Fetch()
require.NoError(t, err)
assert.Equal(t, 1, errConsumer.updates)

// Create new test client with updated hash
subsystems = map[string]string{"actions": "def456"}
hashData = map[string]any{"def456": "test-data-2"}
newData, _ := NewControlTestClient(subsystems, hashData)
cs.fetcher = newData

// Second fetch with new hash - should succeed
errConsumer.updateErr = nil
err = cs.Fetch()
require.NoError(t, err)
assert.Equal(t, 2, errConsumer.updates)

// Verify final hash was recorded
val, err := store.Get([]byte("actions"))
require.NoError(t, err)
assert.Equal(t, "def456", string(val))
}

func TestControlServiceFetch(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 8f090c9

Please sign in to comment.