Skip to content

Commit 91002a9

Browse files
committed
use a queue to avoid overwriting events
Assisted-by: Cursor Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
1 parent f5a89ef commit 91002a9

File tree

8 files changed

+690
-249
lines changed

8 files changed

+690
-249
lines changed

internal/event/event.go

Lines changed: 306 additions & 95 deletions
Large diffs are not rendered by default.

internal/event/event_test.go

Lines changed: 301 additions & 96 deletions
Large diffs are not rendered by default.

internal/kube/client.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,13 @@ func IsRetryableError(err error) bool {
122122
if err == nil {
123123
return false
124124
}
125+
126+
// Errors about immutable fields (especially deletionTimestamp) are NOT retryable
127+
// Once a resource is being deleted, these fields cannot be changed no matter how many times we retry
128+
if isDeletionImmutableError(err) {
129+
return false
130+
}
131+
125132
return kerrors.IsInternalError(err) ||
126133
kerrors.IsInvalid(err) ||
127134
kerrors.IsTooManyRequests(err) ||
@@ -137,6 +144,14 @@ func IsRetryableError(err error) bool {
137144
errors.Is(err, syscall.ECONNRESET)
138145
}
139146

147+
func isDeletionImmutableError(err error) bool {
148+
// Detect errors about immutable deletionTimestamp or deletionGracePeriodSeconds
149+
// These occur when trying to update a resource that's being deleted
150+
errMsg := err.Error()
151+
return (strings.Contains(errMsg, "deletionTimestamp") || strings.Contains(errMsg, "deletionGracePeriodSeconds")) &&
152+
strings.Contains(errMsg, "field is immutable")
153+
}
154+
140155
func isHTTP2GoawayErr(err error) bool {
141156
return strings.Contains(err.Error(), "http2: server sent GOAWAY and closed the connection")
142157
}

internal/manager/application/application.go

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -205,18 +205,7 @@ func (m *ApplicationManager) UpdateManagedApp(ctx context.Context, incoming *v1a
205205
existing.Labels = incoming.Labels
206206
existing.Finalizers = incoming.Finalizers
207207
existing.Spec = *incoming.Spec.DeepCopy()
208-
// Don't update Operation if the app is terminating
209-
if incoming.Operation != nil {
210-
allowMirrorOp := true
211-
if existing.Status.OperationState != nil {
212-
if existing.Status.OperationState.Phase == synccommon.OperationTerminating {
213-
allowMirrorOp = false
214-
}
215-
}
216-
if allowMirrorOp {
217-
existing.Operation = incoming.Operation.DeepCopy()
218-
}
219-
}
208+
existing.Operation = operationToUse(existing, incoming)
220209

221210
if incoming.DeletionTimestamp != nil && existing.DeletionTimestamp == nil {
222211
deletionTimestampChanged = true
@@ -242,31 +231,14 @@ func (m *ApplicationManager) UpdateManagedApp(ctx context.Context, incoming *v1a
242231
deletionTimestampChanged = true
243232
}
244233

245-
var desiredOp *v1alpha1.Operation
246-
if incoming.Operation != nil {
247-
allowMirrorOp := true
248-
if existing.Status.OperationState != nil {
249-
if existing.Status.OperationState.Phase == synccommon.OperationTerminating {
250-
allowMirrorOp = false
251-
}
252-
}
253-
if allowMirrorOp {
254-
desiredOp = incoming.Operation
255-
} else {
256-
desiredOp = existing.Operation
257-
}
258-
} else {
259-
desiredOp = existing.Operation
260-
}
261-
262234
target := &v1alpha1.Application{
263235
ObjectMeta: v1.ObjectMeta{
264236
Annotations: incoming.Annotations,
265237
Labels: incoming.Labels,
266238
Finalizers: incoming.Finalizers,
267239
},
268240
Spec: incoming.Spec,
269-
Operation: desiredOp,
241+
Operation: operationToUse(existing, incoming),
270242
}
271243
source := &v1alpha1.Application{
272244
ObjectMeta: v1.ObjectMeta{
@@ -363,8 +335,12 @@ func (m *ApplicationManager) UpdateAutonomousApp(ctx context.Context, namespace
363335

364336
existing.Annotations = incoming.Annotations
365337
existing.Labels = incoming.Labels
366-
existing.DeletionTimestamp = incoming.DeletionTimestamp
367-
existing.DeletionGracePeriodSeconds = incoming.DeletionGracePeriodSeconds
338+
if existing.DeletionTimestamp == nil {
339+
existing.DeletionTimestamp = incoming.DeletionTimestamp
340+
}
341+
if existing.DeletionGracePeriodSeconds == nil {
342+
existing.DeletionGracePeriodSeconds = incoming.DeletionGracePeriodSeconds
343+
}
368344
existing.Finalizers = incoming.Finalizers
369345
existing.Spec = incoming.Spec
370346
existing.Status = *incoming.Status.DeepCopy()
@@ -380,16 +356,18 @@ func (m *ApplicationManager) UpdateAutonomousApp(ctx context.Context, namespace
380356

381357
target := &v1alpha1.Application{
382358
ObjectMeta: v1.ObjectMeta{
383-
Labels: incoming.Labels,
384-
Annotations: incoming.Annotations,
385-
DeletionTimestamp: incoming.DeletionTimestamp,
386-
DeletionGracePeriodSeconds: incoming.DeletionGracePeriodSeconds,
387-
Finalizers: incoming.Finalizers,
359+
Labels: incoming.Labels,
360+
Annotations: incoming.Annotations,
361+
Finalizers: incoming.Finalizers,
388362
},
389363
Spec: incoming.Spec,
390364
Status: incoming.Status,
391365
Operation: incoming.Operation,
392366
}
367+
if existing.DeletionTimestamp == nil && incoming.DeletionTimestamp != nil {
368+
target.DeletionTimestamp = incoming.DeletionTimestamp
369+
target.DeletionGracePeriodSeconds = incoming.DeletionGracePeriodSeconds
370+
}
393371
source := &v1alpha1.Application{
394372
ObjectMeta: v1.ObjectMeta{
395373
DeletionTimestamp: existing.DeletionTimestamp,
@@ -511,6 +489,7 @@ func (m *ApplicationManager) UpdateOperation(ctx context.Context, incoming *v1al
511489
updated, err = m.update(ctx, false, incoming, func(existing, incoming *v1alpha1.Application) {
512490
existing.Annotations = incoming.Annotations
513491
existing.Labels = incoming.Labels
492+
existing.Operation = operationToUse(existing, incoming)
514493
existing.Status = *incoming.Status.DeepCopy()
515494
}, func(existing, incoming *v1alpha1.Application) (jsondiff.Patch, error) {
516495
annotations := make(map[string]string)
@@ -523,7 +502,7 @@ func (m *ApplicationManager) UpdateOperation(ctx context.Context, incoming *v1al
523502
ObjectMeta: v1.ObjectMeta{
524503
Annotations: incoming.Annotations,
525504
},
526-
Operation: incoming.Operation,
505+
Operation: operationToUse(existing, incoming),
527506
}
528507
source := &v1alpha1.Application{
529508
ObjectMeta: v1.ObjectMeta{
@@ -543,6 +522,22 @@ func (m *ApplicationManager) UpdateOperation(ctx context.Context, incoming *v1al
543522
return updated, err
544523
}
545524

525+
func operationToUse(existing, incoming *v1alpha1.Application) *v1alpha1.Operation {
526+
// Preserve existing operation if the incoming operation is nil
527+
if incoming.Operation == nil {
528+
return existing.Operation
529+
}
530+
531+
// Don't update Operation if the app is terminating
532+
if existing.Status.OperationState != nil {
533+
if existing.Status.OperationState.Phase == synccommon.OperationTerminating {
534+
return existing.Operation
535+
}
536+
}
537+
538+
return incoming.Operation.DeepCopy()
539+
}
540+
546541
// TerminateOperation aborts a running sync operation by setting .status.operationState.phase to Terminating.
547542
func (m *ApplicationManager) TerminateOperation(ctx context.Context, incoming *v1alpha1.Application) (*v1alpha1.Application, error) {
548543
incoming.SetNamespace(m.namespace)

principal/callbacks.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
package principal
1616

1717
import (
18-
"reflect"
19-
2018
"github.com/argoproj-labs/argocd-agent/internal/event"
2119
"github.com/argoproj-labs/argocd-agent/internal/manager"
2220
"github.com/argoproj-labs/argocd-agent/internal/manager/appproject"
@@ -117,12 +115,12 @@ func (s *Server) updateAppCallback(old *v1alpha1.Application, new *v1alpha1.Appl
117115
if isTerminateOperation(old, new) {
118116
ev = s.events.ApplicationEvent(event.TerminateOperation, new)
119117
} else {
120-
// For managed agents: prevent sending operation back on regular spec updates.
118+
// Prevent sending operation back on regular spec updates.
121119
// Allow only nil->non-nil transitions to carry operation (i.e. principal-initiated sync).
122120

123121
// DeepCopy to avoid mutating the informer object
124122
out := new.DeepCopy()
125-
if reflect.DeepEqual(old.Operation, new.Operation) && !isResourceFromAutonomousAgent(new) {
123+
if !(old.Operation == nil && new.Operation != nil) {
126124
out.Operation = nil
127125
}
128126

principal/callbacks_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,7 +1367,7 @@ func TestServer_updateAppCallback(t *testing.T) {
13671367
assert.Equal(t, 1, sendQ.Len())
13681368
})
13691369

1370-
t.Run("include operation in event if it is different from the old one", func(t *testing.T) {
1370+
t.Run("include operation in event if it is initiated for the first time", func(t *testing.T) {
13711371
mockBackend := &mocks.Application{}
13721372

13731373
appManager, err := application.NewApplicationManager(mockBackend, "argocd")
@@ -1386,11 +1386,6 @@ func TestServer_updateAppCallback(t *testing.T) {
13861386

13871387
oldApp := &v1alpha1.Application{
13881388
ObjectMeta: metav1.ObjectMeta{Name: "test-app", Namespace: "managed-agent", ResourceVersion: "1"},
1389-
Operation: &v1alpha1.Operation{
1390-
Sync: &v1alpha1.SyncOperation{
1391-
Revision: "main",
1392-
},
1393-
},
13941389
}
13951390
newApp := &v1alpha1.Application{
13961391
ObjectMeta: metav1.ObjectMeta{Name: "test-app", Namespace: "managed-agent", ResourceVersion: "2"},

principal/event.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,19 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string,
187187
return fmt.Errorf("could not create application %s: %w", incoming.QualifiedName(), err)
188188
}
189189

190+
// Check if the existing application is marked for deletion before attempting update
191+
existing, err := s.appManager.Get(ctx, incoming.Name, agentName)
192+
if err != nil {
193+
logCtx.WithError(err).Errorf("Application should exist before attempting to update")
194+
return nil
195+
}
196+
if existing.DeletionTimestamp != nil {
197+
logCtx.Trace("Application is marked for deletion, skipping update to avoid immutable field error")
198+
return nil
199+
}
200+
190201
// Update the application if it already exists
191-
_, err := s.appManager.UpdateAutonomousApp(ctx, agentName, incoming)
202+
_, err = s.appManager.UpdateAutonomousApp(ctx, agentName, incoming)
192203
if err != nil {
193204
return fmt.Errorf("could not update application spec for %s: %w", incoming.QualifiedName(), err)
194205
}
@@ -200,9 +211,20 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string,
200211
return event.NewEventNotAllowedErr("event type not allowed when mode is not autonomous")
201212
}
202213

214+
// Check if the existing application is marked for deletion before attempting update
215+
existing, err := s.appManager.Get(ctx, incoming.Name, agentName)
216+
if err != nil {
217+
logCtx.WithError(err).Errorf("Application should exist before attempting to update")
218+
return nil
219+
}
220+
if existing.DeletionTimestamp != nil {
221+
logCtx.Trace("Application is marked for deletion, skipping update to avoid immutable field error")
222+
return nil
223+
}
224+
203225
s.sourceCache.Application.Set(incoming.UID, incoming.Spec)
204226

205-
_, err := s.appManager.UpdateAutonomousApp(ctx, agentName, incoming)
227+
_, err = s.appManager.UpdateAutonomousApp(ctx, agentName, incoming)
206228
if err != nil {
207229
return fmt.Errorf("could not update application spec for %s: %w", incoming.QualifiedName(), err)
208230
}

test/e2e/sync_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,15 +340,15 @@ func (suite *SyncTestSuite) Test_TerminateOperationManaged() {
340340
requires.Eventually(func() bool {
341341
app := argoapp.Application{}
342342
err := suite.ManagedAgentClient.Get(suite.Ctx, agentKey, &app, metav1.GetOptions{})
343-
return err == nil && app.Operation != nil && app.Status.OperationState != nil &&
343+
return err == nil && app.Status.OperationState != nil &&
344344
app.Status.OperationState.Phase == synccommon.OperationRunning
345345
}, 60*time.Second, 1*time.Second)
346346

347347
// Wait for the sync operation to start on the principal
348348
requires.Eventually(func() bool {
349349
app := argoapp.Application{}
350350
err := suite.PrincipalClient.Get(suite.Ctx, principalKey, &app, metav1.GetOptions{})
351-
return err == nil && app.Operation != nil && app.Status.OperationState != nil &&
351+
return err == nil && app.Status.OperationState != nil &&
352352
app.Status.OperationState.Phase == synccommon.OperationRunning
353353
}, 60*time.Second, 1*time.Second)
354354

@@ -401,7 +401,7 @@ func (suite *SyncTestSuite) Test_TerminateOperationManaged() {
401401
requires.Eventually(func() bool {
402402
app := argoapp.Application{}
403403
err := suite.ManagedAgentClient.Get(suite.Ctx, agentKey, &app, metav1.GetOptions{})
404-
if err == nil && app.Operation == nil && app.Status.OperationState != nil {
404+
if err == nil && app.Status.OperationState != nil {
405405
suite.T().Logf("Operation phase: %v", app.Status.OperationState.Phase)
406406
}
407407
return err == nil && app.Status.OperationState != nil && app.Status.OperationState.Phase == synccommon.OperationFailed &&
@@ -457,17 +457,17 @@ func (suite *SyncTestSuite) Test_TerminateOperationAutonomous() {
457457
requires.Eventually(func() bool {
458458
app := argoapp.Application{}
459459
err := suite.PrincipalClient.Get(suite.Ctx, principalKey, &app, metav1.GetOptions{})
460-
return err == nil && app.Operation != nil && app.Status.OperationState != nil &&
460+
return err == nil && app.Status.OperationState != nil &&
461461
app.Status.OperationState.Phase == synccommon.OperationRunning
462-
}, 60*time.Second, 1*time.Second)
462+
}, 90*time.Second, 1*time.Second)
463463

464464
// Wait for the sync operation to start on the autonomous-agent as well
465465
requires.Eventually(func() bool {
466466
app := argoapp.Application{}
467467
err := suite.AutonomousAgentClient.Get(suite.Ctx, agentKey, &app, metav1.GetOptions{})
468-
return err == nil && app.Operation != nil && app.Status.OperationState != nil &&
468+
return err == nil && app.Status.OperationState != nil &&
469469
app.Status.OperationState.Phase == synccommon.OperationRunning
470-
}, 60*time.Second, 1*time.Second)
470+
}, 90*time.Second, 1*time.Second)
471471

472472
// Get the Argo server endpoint to use
473473
argoEndpoint, err := fixture.GetArgoCDServerEndpoint(suite.PrincipalClient)
@@ -517,7 +517,7 @@ func (suite *SyncTestSuite) Test_TerminateOperationAutonomous() {
517517
requires.Eventually(func() bool {
518518
app := argoapp.Application{}
519519
err := suite.AutonomousAgentClient.Get(suite.Ctx, agentKey, &app, metav1.GetOptions{})
520-
return err == nil && app.Operation == nil && app.Status.OperationState != nil &&
520+
return err == nil && app.Status.OperationState != nil &&
521521
app.Status.OperationState.Phase == synccommon.OperationFailed &&
522522
app.Status.OperationState.Message == "Operation terminated"
523523
}, 90*time.Second, 1*time.Second)

0 commit comments

Comments
 (0)