@@ -13,6 +13,7 @@ import (
13
13
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
14
k8stypes "k8s.io/apimachinery/pkg/types"
15
15
"sigs.k8s.io/controller-runtime/pkg/client"
16
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
16
17
17
18
"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
18
19
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
@@ -30,8 +31,11 @@ const (
30
31
ErrBuildPodTemplate stdErrors.ErrorCode = "POD_TEMPLATE_FAILED"
31
32
ErrReplaceCmdTemplate stdErrors.ErrorCode = "CMD_TEMPLATE_FAILED"
32
33
FlyteK8sArrayIndexVarName string = "FLYTE_K8S_ARRAY_INDEX"
33
- finalizer string = "flyte/array"
34
- JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
34
+ finalizer string = "flyte.org/finalizer-array"
35
+ // Old non-domain-qualified finalizer for backwards compatibility
36
+ // This should eventually be removed
37
+ oldFinalizer string = "flyte/array"
38
+ JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
35
39
)
36
40
37
41
var (
@@ -69,8 +73,7 @@ func addMetadata(stCtx SubTaskExecutionContext, cfg *Config, k8sPluginCfg *confi
69
73
}
70
74
71
75
if k8sPluginCfg .InjectFinalizer {
72
- f := append (pod .GetFinalizers (), finalizer )
73
- pod .SetFinalizers (f )
76
+ _ = controllerutil .AddFinalizer (pod , finalizer )
74
77
}
75
78
76
79
if len (cfg .DefaultScheduler ) > 0 {
@@ -134,25 +137,28 @@ func abortSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Confi
134
137
}
135
138
136
139
if err != nil && ! isK8sObjectNotExists (err ) {
137
- logger .Warningf (ctx , "Failed to clear finalizers for Resource with name: %v/%v. Error: %v" ,
140
+ logger .Warningf (ctx , "Failed to clear finalizer for Resource with name: %v/%v. Error: %v" ,
138
141
resourceToFinalize .GetNamespace (), resourceToFinalize .GetName (), err )
139
142
return err
140
143
}
141
144
142
145
return nil
143
146
}
144
147
145
- // clearFinalizers removes finalizers (if they exist) from the k8s resource
146
- func clearFinalizers (ctx context.Context , o client.Object , kubeClient pluginsCore.KubeClient ) error {
147
- if len (o .GetFinalizers ()) > 0 {
148
- o .SetFinalizers ([]string {})
148
+ // clearFinalizer removes the Flyte finalizer (if it exists) from the k8s resource
149
+ func clearFinalizer (ctx context.Context , o client.Object , kubeClient pluginsCore.KubeClient ) error {
150
+ // Checking for the old finalizer too for backwards compatibility. This should eventually be removed
151
+ // Go does short-circuiting so we have to make sure both are removed
152
+ finalizerRemoved := controllerutil .RemoveFinalizer (o , finalizer )
153
+ oldFinalizerRemoved := controllerutil .RemoveFinalizer (o , oldFinalizer )
154
+ if finalizerRemoved || oldFinalizerRemoved {
149
155
err := kubeClient .GetClient ().Update (ctx , o )
150
156
if err != nil && ! isK8sObjectNotExists (err ) {
151
- logger .Warningf (ctx , "Failed to clear finalizers for Resource with name: %v/%v. Error: %v" , o .GetNamespace (), o .GetName (), err )
157
+ logger .Warningf (ctx , "Failed to clear finalizer for Resource with name: %v/%v. Error: %v" , o .GetNamespace (), o .GetName (), err )
152
158
return err
153
159
}
154
160
} else {
155
- logger .Debugf (ctx , "Finalizers are already empty for Resource with name: %v/%v" , o .GetNamespace (), o .GetName ())
161
+ logger .Debugf (ctx , "Finalizer is already cleared for Resource with name: %v/%v" , o .GetNamespace (), o .GetName ())
156
162
}
157
163
return nil
158
164
}
@@ -211,7 +217,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf
211
217
}
212
218
213
219
// finalizeSubtask performs operations to complete the k8s pod defined by the SubTaskExecutionContext
214
- // and Config. These may include removing finalizers and deleting the k8s resource.
220
+ // and Config. These may include removing finalizer and deleting the k8s resource.
215
221
func finalizeSubtask (ctx context.Context , stCtx SubTaskExecutionContext , cfg * Config , kubeClient pluginsCore.KubeClient ) error {
216
222
errs := stdErrors.ErrorCollection {}
217
223
var pod * v1.Pod
@@ -231,10 +237,10 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co
231
237
nsName = k8stypes.NamespacedName {Namespace : pod .GetNamespace (), Name : pod .GetName ()}
232
238
}
233
239
234
- // In InjectFinalizer is on, it means we may have added the finalizers when we launched this resource. Attempt to
235
- // clear them to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
240
+ // In InjectFinalizer is on, it means we may have added the finalizer when we launched this resource. Attempt to
241
+ // clear it to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
236
242
// after the resource was created, we will not find any finalizers to clear and the object may have already been
237
- // deleted at this point. Therefore, account for these cases and do not consider them errors.
243
+ // deleted at this point. Therefore, account for these cases and do not consider the errors.
238
244
if k8sPluginCfg .InjectFinalizer {
239
245
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
240
246
if err := kubeClient .GetClient ().Get (ctx , nsName , pod ); err != nil {
@@ -250,7 +256,7 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co
250
256
// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
251
257
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
252
258
// the same event (idempotent) and then come here again...
253
- err := clearFinalizers (ctx , pod , kubeClient )
259
+ err := clearFinalizer (ctx , pod , kubeClient )
254
260
if err != nil {
255
261
errs .Append (err )
256
262
}
@@ -308,10 +314,10 @@ func getSubtaskPhaseInfo(ctx context.Context, stCtx SubTaskExecutionContext, cfg
308
314
return pluginsCore .PhaseInfoUndefined , err
309
315
}
310
316
311
- if ! phaseInfo .Phase ().IsTerminal () && o .GetDeletionTimestamp () != nil {
317
+ if ! phaseInfo .Phase ().IsTerminal () && ! o .GetDeletionTimestamp (). IsZero () {
312
318
// If the object has been deleted, that is, it has a deletion timestamp, but is not in a terminal state, we should
313
319
// mark the task as a retryable failure. We've seen this happen when a kubelet disappears - all pods running on
314
- // the node are marked with a deletionTimestamp, but our finalizers prevent the pod from being deleted.
320
+ // the node are marked with a deletionTimestamp, but our finalizer prevents the pod from being deleted.
315
321
// This can also happen when a user deletes a Pod directly.
316
322
failureReason := fmt .Sprintf ("object [%s] terminated in the background, manually" , nsName .String ())
317
323
return pluginsCore .PhaseInfoSystemRetryableFailure ("UnexpectedObjectDeletion" , failureReason , nil ), nil
0 commit comments