Skip to content

Commit 146f8d2

Browse files
faecmergify[bot]
authored andcommitted
Rework runtime manager updates to block the coordinator less (#3747)
(cherry picked from commit 112f618) # Conflicts: # internal/pkg/agent/application/coordinator/coordinator_state.go
1 parent 2302264 commit 146f8d2

File tree

6 files changed

+288
-198
lines changed

6 files changed

+288
-198
lines changed

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,7 @@ type RuntimeManager interface {
9393
Runner
9494

9595
// Update updates the current components model.
96-
Update(model component.Model) error
97-
98-
// State returns the current components model state.
99-
State() []runtime.ComponentComponentState
96+
Update(model component.Model)
10097

10198
// PerformAction executes an action on a unit.
10299
PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error)
@@ -219,17 +216,19 @@ type Coordinator struct {
219216
// into the reported state before broadcasting -- State() will report
220217
// agentclient.Failed if one of these is set, even if the underlying
221218
// coordinator state is agentclient.Healthy.
222-
runtimeMgrErr error // Currently unused
223-
configMgrErr error
224-
actionsErr error
225-
varsMgrErr error
219+
// Errors from the runtime manager report policy update failures and are
220+
// stored in runtimeUpdateErr below.
221+
configMgrErr error
222+
actionsErr error
223+
varsMgrErr error
226224

227225
// Errors resulting from different possible failure modes when setting a
228226
// new policy. Right now there are three different stages where a policy
229227
// update can fail:
230228
// - in generateAST, converting the policy to an AST
231229
// - in process, converting the AST and vars into a full component model
232-
// - while sending the final component model to the runtime manager
230+
// - while applying the final component model in the runtime manager
231+
// (reported asynchronously via the runtime manager error channel)
233232
//
234233
// The plan is to improve our preprocessing so we can always detect
235234
// failures immediately https://github.com/elastic/elastic-agent/issues/2887.
@@ -851,7 +850,13 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
851850
return
852851

853852
case runtimeErr := <-c.managerChans.runtimeManagerError:
854-
c.setRuntimeManagerError(runtimeErr)
853+
// runtime manager errors report the result of a policy update.
854+
// Coordinator transitions from starting to healthy when a policy update
855+
// is successful.
856+
c.setRuntimeUpdateError(runtimeErr)
857+
if runtimeErr == nil {
858+
c.setCoordinatorState(agentclient.Healthy, "Running")
859+
}
855860

856861
case configErr := <-c.managerChans.configManagerError:
857862
if c.isManaged {
@@ -1070,12 +1075,7 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
10701075

10711076
c.logger.Info("Updating running component model")
10721077
c.logger.With("components", model.Components).Debug("Updating running component model")
1073-
err = c.runtimeMgr.Update(model)
1074-
c.setRuntimeUpdateError(err)
1075-
if err != nil {
1076-
return fmt.Errorf("updating runtime: %w", err)
1077-
}
1078-
c.setCoordinatorState(agentclient.Healthy, "Running")
1078+
c.runtimeMgr.Update(model)
10791079
return nil
10801080
}
10811081

internal/pkg/agent/application/coordinator/coordinator_state.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,19 @@ func (c *Coordinator) ClearOverrideState() {
5454
c.overrideStateChan <- nil
5555
}
5656

57+
<<<<<<< HEAD
5758
// setRuntimeManagerError updates the error state for the runtime manager.
59+
=======
60+
// SetUpgradeDetails sets upgrade details. This is used during upgrades.
61+
func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) {
62+
c.upgradeDetailsChan <- upgradeDetails
63+
}
64+
65+
// setRuntimeUpdateError reports a failed policy update in the runtime manager.
66+
>>>>>>> 112f618969 (Rework runtime manager updates to block the coordinator less (#3747))
5867
// Called on the main Coordinator goroutine.
59-
func (c *Coordinator) setRuntimeManagerError(err error) {
60-
c.runtimeMgrErr = err
68+
func (c *Coordinator) setRuntimeUpdateError(err error) {
69+
c.runtimeUpdateErr = err
6170
c.stateNeedsRefresh = true
6271
}
6372

@@ -98,14 +107,6 @@ func (c *Coordinator) setComponentGenError(err error) {
98107
c.stateNeedsRefresh = true
99108
}
100109

101-
// setRuntimeUpdateError updates the error state for sending a component model
102-
// update to the runtime manager.
103-
// Called on the main Coordinator goroutine.
104-
func (c *Coordinator) setRuntimeUpdateError(err error) {
105-
c.runtimeUpdateErr = err
106-
c.stateNeedsRefresh = true
107-
}
108-
109110
// setOverrideState is the internal helper to set the override state and
110111
// set stateNeedsRefresh.
111112
// Must be called on the main Coordinator goroutine.
@@ -184,9 +185,6 @@ func (c *Coordinator) generateReportableState() (s State) {
184185
} else if c.runtimeUpdateErr != nil {
185186
s.State = agentclient.Failed
186187
s.Message = fmt.Sprintf("Runtime update failed: %s", c.runtimeUpdateErr.Error())
187-
} else if c.runtimeMgrErr != nil {
188-
s.State = agentclient.Failed
189-
s.Message = fmt.Sprintf("Runtime manager: %s", c.runtimeMgrErr.Error())
190188
} else if c.configMgrErr != nil {
191189
s.State = agentclient.Failed
192190
s.Message = fmt.Sprintf("Config manager: %s", c.configMgrErr.Error())

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,8 @@ func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) {
715715
type fakeRuntimeManager struct {
716716
state []runtime.ComponentComponentState
717717
updateCallback func([]component.Component) error
718+
result error
719+
errChan chan error
718720
}
719721

720722
func (r *fakeRuntimeManager) Run(ctx context.Context) error {
@@ -724,11 +726,15 @@ func (r *fakeRuntimeManager) Run(ctx context.Context) error {
724726

725727
func (r *fakeRuntimeManager) Errors() <-chan error { return nil }
726728

727-
func (r *fakeRuntimeManager) Update(model component.Model) error {
729+
func (r *fakeRuntimeManager) Update(model component.Model) {
730+
r.result = nil
728731
if r.updateCallback != nil {
729-
return r.updateCallback(model.Components)
732+
r.result = r.updateCallback(model.Components)
733+
}
734+
if r.errChan != nil {
735+
// If a reporting channel is set, send the result to it
736+
r.errChan <- r.result
730737
}
731-
return nil
732738
}
733739

734740
// State returns the current components model state.

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -618,13 +618,15 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
618618
logger := logp.NewLogger("testing")
619619

620620
configChan := make(chan ConfigChange, 1)
621+
updateErrChan := make(chan error, 1)
621622

622623
const errorStr = "update failed for testing reasons"
623624
// Create a mocked runtime manager that always reports an error
624625
runtimeManager := &fakeRuntimeManager{
625626
updateCallback: func(comp []component.Component) error {
626627
return fmt.Errorf(errorStr)
627628
},
629+
errChan: updateErrChan,
628630
}
629631

630632
coord := &Coordinator{
@@ -633,6 +635,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
633635
stateBroadcaster: broadcaster.New(State{}, 0, 0),
634636
managerChans: managerChans{
635637
configManagerUpdate: configChan,
638+
// Give coordinator the same error channel we set on the runtime
639+
// manager, so it receives the update result.
640+
runtimeManagerError: updateErrChan,
636641
},
637642
runtimeMgr: runtimeManager,
638643
vars: emptyVars(t),
@@ -645,16 +650,19 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
645650
configChan <- configChange
646651
coord.runLoopIteration(ctx)
647652

648-
// Make sure the failure was reported to the config manager
649-
assert.True(t, configChange.failed, "Config change should report failure if the runtime manager returns an error")
650-
require.Error(t, configChange.err, "Config change should get an error if runtime manager update fails")
651-
assert.Contains(t, configChange.err.Error(), errorStr)
653+
// Make sure the config change was acknowledged to the config manager
654+
// (the failure is not reported here since it happens asynchronously; it
655+
// will appear in the coordinator state afterwards.)
656+
assert.True(t, configChange.acked, "Config change should be acknowledged to the config manager")
657+
assert.NoError(t, configChange.err, "Config change with async error should succeed")
652658

653-
// Make sure the error is saved in Coordinator.runtimeUpdateErr
659+
// Now do another run loop iteration to let the update error propagate,
660+
// and make sure it is reported correctly.
661+
coord.runLoopIteration(ctx)
654662
require.Error(t, coord.runtimeUpdateErr, "Runtime update failure should be saved in runtimeUpdateErr")
655663
assert.Equal(t, errorStr, coord.runtimeUpdateErr.Error(), "runtimeUpdateErr should match the error reported by the runtime manager")
656664

657-
// Make sure the error is reported in Coordinator state.
665+
// Make sure the error appears in the Coordinator state.
658666
state := coord.State()
659667
assert.Equal(t, agentclient.Failed, state.State, "Failed policy update should cause failed Coordinator")
660668
assert.Contains(t, state.Message, errorStr, "Failed policy update should be reported in Coordinator state message")

0 commit comments

Comments
 (0)