diff --git a/xset/api/well_knowns.go b/xset/api/well_knowns.go index 2ace1048..1830b78f 100644 --- a/xset/api/well_knowns.go +++ b/xset/api/well_knowns.go @@ -94,9 +94,6 @@ const ( // SubResourcePvcTemplateHashLabelKey is used to attach hash of pvc template to pvc subresource SubResourcePvcTemplateHashLabelKey - // LastXStatusAnnotationKey is used to record the last status of a target by xset - LastXStatusAnnotationKey - // wellKnownCount is the number of XSetLabelAnnotationEnum wellKnownCount ) @@ -130,8 +127,6 @@ var defaultXSetLabelAnnotationManager = map[XSetLabelAnnotationEnum]string{ XExcludeIndicationLabelKey: appsv1alpha1.PodExcludeIndicationLabelKey, SubResourcePvcTemplateLabelKey: appsv1alpha1.PvcTemplateLabelKey, SubResourcePvcTemplateHashLabelKey: appsv1alpha1.PvcTemplateHashLabelKey, - - LastXStatusAnnotationKey: appsv1alpha1.LastPodStatusAnnotationKey, } func NewXSetLabelAnnotationManager() XSetLabelAnnotationManager { diff --git a/xset/api/xset_controller_types.go b/xset/api/xset_controller_types.go index ee2ca2f1..88384d93 100644 --- a/xset/api/xset_controller_types.go +++ b/xset/api/xset_controller_types.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" ) type XSetController interface { @@ -95,3 +96,21 @@ type SubResourcePvcAdapter interface { // SetXSpecVolumes sets spec.volumes to X object. SetXSpecVolumes(object client.Object, pvcs []corev1.Volume) } + +// DecorationAdapter is used to manage decoration for XSet. Decoration should be a workload to manage patcher on X target. +// Once adapter is implemented, XSetController will (1) watch for decoration change, (2) patch effective decorations on +// X target when creating, (3) manage decoration update when decoration changed. +type DecorationAdapter interface { + // WatchDecoration allows controller to watch decoration change. + WatchDecoration(c controller.Controller) error + // GetDecorationGroupVersionKind returns decoration gvk. + GetDecorationGroupVersionKind() metav1.GroupVersionKind + // GetTargetCurrentDecorationRevisions returns decoration revision on target. + GetTargetCurrentDecorationRevisions(ctx context.Context, c client.Client, target client.Object) (string, error) + // GetTargetUpdatedDecorationRevisions returns decoration revision on target. + GetTargetUpdatedDecorationRevisions(ctx context.Context, c client.Client, target client.Object) (string, error) + // GetDecorationPatcherByRevisions returns patcher for decoration from revisions. + GetDecorationPatcherByRevisions(ctx context.Context, c client.Client, target client.Object, revision string) (func(client.Object) error, error) + // IsTargetDecorationChanged returns true if decoration on target is changed. + IsTargetDecorationChanged(currentRevision, updatedRevision string) (bool, error) +} diff --git a/xset/api/xset_types.go b/xset/api/xset_types.go index 4e4bca62..40c5b9ae 100644 --- a/xset/api/xset_types.go +++ b/xset/api/xset_types.go @@ -55,6 +55,10 @@ type XSetSpec struct { // +optional ScaleStrategy ScaleStrategy `json:"scaleStrategy,omitempty"` + // NamigPolicy indicates the strategy detail that will be used for replica naming + // +optional + NamingStrategy *NamingStrategy `json:"namingStrategy,omitempty"` + // Indicate the number of histories to be conserved // If unspecified, defaults to 20 // +optional @@ -120,6 +124,25 @@ type ScaleStrategy struct { OperationDelaySeconds *int32 `json:"operationDelaySeconds,omitempty"` } +// TargetNamingSuffixPolicy indicates how a new pod name suffix part is generated. +type TargetNamingSuffixPolicy string + +const ( + // TargetNamingSuffixPolicyPersistentSequence uses persistent sequential numbers as pod name suffix. + TargetNamingSuffixPolicyPersistentSequence TargetNamingSuffixPolicy = "PersistentSequence" + // TargetNamingSuffixPolicyRandom uses collaset name as pod generateName, which is the prefix + // of pod name. Kubernetes then adds a random string as suffix after the generateName. + // This is defaulting policy. + TargetNamingSuffixPolicyRandom TargetNamingSuffixPolicy = "Random" +) + +type NamingStrategy struct { + // TargetNamingSuffixPolicy is a string enumeration that determaines how pod name suffix will be generated. + // A collaset pod name contains two parts to be placed in a string formation %s-%s; the prefix is collaset + // name, and the suffix is determined by TargetNamingSuffixPolicy. + TargetNamingSuffixPolicy TargetNamingSuffixPolicy `json:"TargetNamingSuffixPolicy,omitempty"` +} + // UpdateStrategyType is a string enumeration type that enumerates // all possible ways we can update a Target when updating application type UpdateStrategyType string diff --git a/xset/revisionowner/revision_owner.go b/xset/revisionowner/revision_owner.go index 96aff8a4..bfed3453 100644 --- a/xset/revisionowner/revision_owner.go +++ b/xset/revisionowner/revision_owner.go @@ -66,7 +66,7 @@ func (r *revisionOwner) GetInUsedRevisions(parent metav1.Object) (sets.String, e res.Insert(status.UpdatedRevision) res.Insert(status.CurrentRevision) - targets, err := r.TargetControl.GetFilteredTargets(context.TODO(), spec.Selector, xSetObject) + targets, _, err := r.TargetControl.GetFilteredTargets(context.TODO(), spec.Selector, xSetObject) if err != nil { return nil, err } diff --git a/xset/synccontrols/sync_control.go b/xset/synccontrols/sync_control.go index aa7b4ac0..39347643 100644 --- a/xset/synccontrols/sync_control.go +++ b/xset/synccontrols/sync_control.go @@ -77,17 +77,17 @@ func NewRealSyncControl(reconcileMixIn *mixin.ReconcilerMixin, updateLifecycleAdapter, scaleInOpsLifecycleAdapter := opslifecycle.GetLifecycleAdapters(xsetController, xsetLabelAnnoManager, xsetMeta) updateConfig := &UpdateConfig{ - xsetController: xsetController, - xsetLabelAnnoMgr: xsetLabelAnnoManager, - client: reconcileMixIn.Client, - targetControl: xControl, - resourceContextControl: resourceContexts, - recorder: reconcileMixIn.Recorder, + XsetController: xsetController, + XsetLabelAnnoMgr: xsetLabelAnnoManager, + Client: reconcileMixIn.Client, + TargetControl: xControl, + ResourceContextControl: resourceContexts, + Recorder: reconcileMixIn.Recorder, scaleInLifecycleAdapter: scaleInOpsLifecycleAdapter, updateLifecycleAdapter: updateLifecycleAdapter, - cacheExpectations: cacheExpectations, - targetGVK: targetGVK, + CacheExpectations: cacheExpectations, + TargetGVK: targetGVK, } return &RealSyncControl{ ReconcilerMixin: *reconcileMixIn, @@ -135,12 +135,19 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje return false, fmt.Errorf("fail to get XSetSpec") } - var err error - syncContext.FilteredTarget, err = r.xControl.GetFilteredTargets(ctx, xspec.Selector, instance) + filteredTargets, allTargets, err := r.xControl.GetFilteredTargets(ctx, xspec.Selector, instance) if err != nil { return false, fmt.Errorf("fail to get filtered Targets: %w", err) } + if IsTargetNamingSuffixPolicyPersistentSequence(xspec) { + // for naming with persistent sequences suffix, targets with same name should not exist at same time + syncContext.FilteredTarget = allTargets + } else { + // for naming with random suffix, targets with random names can be created at same time + syncContext.FilteredTarget = filteredTargets + } + // sync subresource // 1. list pvcs using ownerReference // 2. adopt and retain orphaned pvcs according to PVC retention policy @@ -173,7 +180,7 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje } // stateless case - var targetWrappers []*targetWrapper + var targetWrappers []*TargetWrapper syncContext.CurrentIDs = sets.Int{} idToReclaim := sets.Int{} toDeleteTargetNames := sets.NewString(xspec.ScaleStrategy.TargetToDelete...) @@ -199,7 +206,8 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje } } - if target.GetDeletionTimestamp() != nil { + // for naming with persistent sequences suffix, targets with same name should not exist at same time + if target.GetDeletionTimestamp() != nil && !IsTargetNamingSuffixPolicyPersistentSequence(xspec) { // 1. Reclaim ID from Target which is scaling in and terminating. if contextDetail, exist := ownedIDs[id]; exist && r.resourceContextControl.Contains(contextDetail, api.EnumScaleInContextDataKey, "true") { idToReclaim.Insert(id) @@ -220,7 +228,27 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje } } - targetWrappers = append(targetWrappers, &targetWrapper{ + // sync decoration revisions + var decorationInfo DecorationInfo + if decorationAdapter, enabled := r.xsetController.(api.DecorationAdapter); enabled { + if decorationInfo.DecorationCurrentRevisions, err = decorationAdapter.GetTargetCurrentDecorationRevisions(ctx, r.Client, target); err != nil { + return false, err + } + if decorationInfo.DecorationUpdatedRevisions, err = decorationAdapter.GetTargetUpdatedDecorationRevisions(ctx, r.Client, target); err != nil { + return false, err + } + if decorationInfo.DecorationChanged, err = decorationAdapter.IsTargetDecorationChanged(decorationInfo.DecorationCurrentRevisions, decorationInfo.DecorationUpdatedRevisions); err != nil { + return false, err + } + } + + // sync target ops priority + var opsPriority *api.OpsPriority + if opsPriority, err = r.xsetController.GetXOpsPriority(ctx, r.Client, target); err != nil { + return false, err + } + + targetWrappers = append(targetWrappers, &TargetWrapper{ Object: target, ID: id, ContextDetail: ownedIDs[id], @@ -229,8 +257,11 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje ToDelete: toDelete, ToExclude: toExclude, - IsDuringScaleInOps: opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, target), - IsDuringUpdateOps: opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.updateLifecycleAdapter, target), + IsDuringScaleInOps: opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, target), + IsDuringUpdateOps: opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.updateLifecycleAdapter, target), + + DecorationInfo: decorationInfo, + OpsPriority: opsPriority, }) if id >= 0 { @@ -395,7 +426,7 @@ func (r *RealSyncControl) Replace(ctx context.Context, xsetObject api.XSetObject syncContext.replacingMap = classifyTargetReplacingMapping(r.xsetLabelAnnoMgr, syncContext.activeTargets) }() - needReplaceOriginTargets, needCleanLabelTargets, targetsNeedCleanLabels, needDeleteTargets := r.dealReplaceTargets(ctx, syncContext.FilteredTarget) + needReplaceOriginTargets, needCleanLabelTargets, targetsNeedCleanLabels, needDeleteTargets := r.dealReplaceTargets(ctx, syncContext.TargetWrappers) // delete origin targets for replace err = r.BatchDeleteTargetsByLabel(ctx, r.xControl, needDeleteTargets) @@ -438,7 +469,7 @@ func (r *RealSyncControl) Replace(ctx context.Context, xsetObject api.XSetObject if _, inUsed := syncContext.CurrentIDs[id]; inUsed { continue } - syncContext.TargetWrappers = append(syncContext.TargetWrappers, &targetWrapper{ + syncContext.TargetWrappers = append(syncContext.TargetWrappers, &TargetWrapper{ ID: id, Object: nil, ContextDetail: contextDetail, @@ -504,7 +535,6 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject, } // scale out new Targets with updatedRevision // TODO use cache - // TODO decoration for target template target, err := NewTargetFrom(r.xsetController, r.xsetLabelAnnoMgr, xsetObject, revision, availableIDContext.ID, func(object client.Object) error { if _, exist := r.resourceContextControl.Get(availableIDContext, api.EnumJustCreateContextDataKey); exist { @@ -512,6 +542,25 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject, } else { r.xsetLabelAnnoMgr.Set(object, api.XCompletingLabel, strconv.FormatInt(time.Now().UnixNano(), 10)) } + + // decoration for target template + if decorationAdapter, ok := r.xsetController.(api.DecorationAdapter); ok { + revisionsInfo, ok := r.resourceContextControl.Get(availableIDContext, api.EnumTargetDecorationRevisionKey) + if !ok { + // get updated decoration revisions from target and write to resource context + if revisionsInfo, err = decorationAdapter.GetTargetUpdatedDecorationRevisions(ctx, r.Client, object); err != nil { + return err + } + r.resourceContextControl.Put(availableIDContext, api.EnumTargetDecorationRevisionKey, revisionsInfo) + needUpdateContext.Store(true) + } + // get patcher from decoration revisions and patch target + if fn, err := decorationAdapter.GetDecorationPatcherByRevisions(ctx, r.Client, object, revisionsInfo); err != nil { + return err + } else { + return fn(object) + } + } return nil }, r.xsetController.GetXSetTemplatePatcher(xsetObject), @@ -553,14 +602,10 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject, } if diff <= 0 { - // get targets ops priority - if err := r.getTargetsOpsPriority(ctx, r.Client, syncContext.activeTargets); err != nil { - return false, recordedRequeueAfter, err - } // chose the targets to scale in targetsToScaleIn := r.getTargetsToDelete(xsetObject, syncContext.activeTargets, syncContext.replacingMap, diff*-1) // filter out Targets need to trigger TargetOpsLifecycle - wrapperCh := make(chan *targetWrapper, len(targetsToScaleIn)) + wrapperCh := make(chan *TargetWrapper, len(targetsToScaleIn)) for i := range targetsToScaleIn { if targetsToScaleIn[i].IsDuringScaleInOps { continue @@ -575,7 +620,7 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject, // trigger TargetOpsLifecycle with scaleIn OperationType logger.V(1).Info("try to begin TargetOpsLifecycle for scaling in Target in XSet", "wrapper", ObjectKeyString(object)) - if updated, err := opslifecycle.Begin(r.updateConfig.xsetLabelAnnoMgr, r.Client, r.scaleInLifecycleAdapter, object); err != nil { + if updated, err := opslifecycle.Begin(r.updateConfig.XsetLabelAnnoMgr, r.Client, r.scaleInLifecycleAdapter, object); err != nil { return fmt.Errorf("fail to begin TargetOpsLifecycle for Scaling in Target %s/%s: %w", object.GetNamespace(), object.GetName(), err) } else if updated { wrapper.IsDuringScaleInOps = true @@ -599,7 +644,7 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject, needUpdateContext := false for i, targetWrapper := range targetsToScaleIn { - requeueAfter, allowed := opslifecycle.AllowOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, ptr.Deref(spec.ScaleStrategy.OperationDelaySeconds, 0), targetWrapper.Object) + requeueAfter, allowed := opslifecycle.AllowOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, ptr.Deref(spec.ScaleStrategy.OperationDelaySeconds, 0), targetWrapper.Object) if !allowed && targetWrapper.Object.GetDeletionTimestamp() == nil { r.Recorder.Eventf(targetWrapper.Object, corev1.EventTypeNormal, "TargetScaleInLifecycle", "Target is not allowed to scale in") continue @@ -707,13 +752,8 @@ func (r *RealSyncControl) Update(ctx context.Context, xsetObject api.XSetObject, var err error var recordedRequeueAfter *time.Duration - // 0. get targets ops priority - if err := r.getTargetsOpsPriority(ctx, r.Client, syncContext.TargetWrappers); err != nil { - return false, recordedRequeueAfter, err - } - // 1. scan and analysis targets update info for active targets and PlaceHolder targets - targetUpdateInfos, err := r.attachTargetUpdateInfo(xsetObject, syncContext) + targetUpdateInfos, err := r.attachTargetUpdateInfo(ctx, xsetObject, syncContext) if err != nil { return false, recordedRequeueAfter, err } @@ -721,14 +761,14 @@ func (r *RealSyncControl) Update(ctx context.Context, xsetObject api.XSetObject, // 2. decide Target update candidates candidates := r.decideTargetToUpdate(r.xsetController, xsetObject, targetUpdateInfos) targetToUpdate := filterOutPlaceHolderUpdateInfos(candidates) - targetCh := make(chan *targetUpdateInfo, len(targetToUpdate)) + targetCh := make(chan *TargetUpdateInfo, len(targetToUpdate)) updater := r.newTargetUpdater(xsetObject) updating := false // 3. filter already updated revision, for i, targetInfo := range targetToUpdate { // TODO check decoration and pvc template changed - if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged { + if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged && !targetInfo.DecorationChanged { continue } @@ -865,7 +905,8 @@ func (r *RealSyncControl) CalculateStatus(_ context.Context, instance api.XSetOb activeTargets := FilterOutActiveTargetWrappers(syncContext.TargetWrappers) for _, targetWrapper := range activeTargets { - if targetWrapper.GetDeletionTimestamp() != nil { + // for naming with persistent sequences suffix, terminating targets can be shown in status + if targetWrapper.GetDeletionTimestamp() != nil && !IsTargetNamingSuffixPolicyPersistentSequence(r.xsetController.GetXSetSpec(instance)) { continue } @@ -876,8 +917,8 @@ func (r *RealSyncControl) CalculateStatus(_ context.Context, instance api.XSetOb updatedReplicas++ } - if opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, targetWrapper) || - opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.updateLifecycleAdapter, targetWrapper) { + if opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, targetWrapper) || + opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.updateLifecycleAdapter, targetWrapper) { operatingReplicas++ } @@ -989,25 +1030,9 @@ func (r *RealSyncControl) reclaimOwnedIDs( return nil } -// getTargetsOpsPriority try to set targets' ops priority -func (r *RealSyncControl) getTargetsOpsPriority(ctx context.Context, c client.Client, targets []*targetWrapper) error { - _, err := controllerutils.SlowStartBatch(len(targets), controllerutils.SlowStartInitialBatchSize, true, func(i int, _ error) error { - if targets[i].PlaceHolder || targets[i].Object == nil || targets[i].OpsPriority != nil { - return nil - } - var iErr error - targets[i].OpsPriority, iErr = r.xsetController.GetXOpsPriority(ctx, c, targets[i].Object) - if iErr != nil { - return fmt.Errorf("failed to get target %s/%s ops priority: %w", targets[i].Object.GetNamespace(), targets[i].Object.GetName(), iErr) - } - return nil - }) - return err -} - // FilterOutActiveTargetWrappers filter out non placeholder targets -func FilterOutActiveTargetWrappers(targets []*targetWrapper) []*targetWrapper { - var filteredTargetWrappers []*targetWrapper +func FilterOutActiveTargetWrappers(targets []*TargetWrapper) []*TargetWrapper { + var filteredTargetWrappers []*TargetWrapper for i, target := range targets { if target.PlaceHolder { continue diff --git a/xset/synccontrols/types.go b/xset/synccontrols/types.go index 23542f02..35580e60 100644 --- a/xset/synccontrols/types.go +++ b/xset/synccontrols/types.go @@ -34,9 +34,9 @@ type SyncContext struct { ExistingSubResource []client.Object FilteredTarget []client.Object - TargetWrappers []*targetWrapper - activeTargets []*targetWrapper - replacingMap map[string]*targetWrapper + TargetWrappers []*TargetWrapper + activeTargets []*TargetWrapper + replacingMap map[string]*TargetWrapper CurrentIDs sets.Int OwnedIds map[int]*api.ContextDetail @@ -50,7 +50,7 @@ type SubResources struct { ExistingPvcs []*corev1.PersistentVolumeClaim } -type targetWrapper struct { +type TargetWrapper struct { // parameters must be set during creation client.Object ID int @@ -63,11 +63,22 @@ type targetWrapper struct { IsDuringScaleInOps bool IsDuringUpdateOps bool + DecorationInfo + OpsPriority *api.OpsPriority } -type targetUpdateInfo struct { - *targetWrapper +type DecorationInfo struct { + // indicate if the decoration changed + DecorationChanged bool + // updated revisions of decoration + DecorationUpdatedRevisions string + // current revisions of decoration + DecorationCurrentRevisions string +} + +type TargetUpdateInfo struct { + *TargetWrapper UpdatedTarget client.Object @@ -81,7 +92,6 @@ type targetUpdateInfo struct { // carry the desired update revision UpdateRevision *appsv1.ControllerRevision - // TODO decoration revisions SubResourcesChanged // indicates operate is allowed for TargetOpsLifecycle. @@ -95,7 +105,7 @@ type targetUpdateInfo struct { IsInReplaceUpdate bool // replace new created target - ReplacePairNewTargetInfo *targetUpdateInfo + ReplacePairNewTargetInfo *TargetUpdateInfo // replace origin target ReplacePairOriginTargetName string diff --git a/xset/synccontrols/x_replace.go b/xset/synccontrols/x_replace.go index 1d02e577..119f43c8 100644 --- a/xset/synccontrols/x_replace.go +++ b/xset/synccontrols/x_replace.go @@ -114,14 +114,15 @@ func (r *RealSyncControl) replaceOriginTargets( ctx context.Context, instance api.XSetObject, syncContext *SyncContext, - needReplaceOriginTargets []client.Object, + needReplaceOriginTargets []*TargetWrapper, ownedIDs map[int]*api.ContextDetail, availableContexts []*api.ContextDetail, ) (int, error) { logger := logr.FromContext(ctx) mapNewToOriginTargetContext := r.mapReplaceNewToOriginTargetContext(ownedIDs) successCount, err := controllerutils.SlowStartBatch(len(needReplaceOriginTargets), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error { - originTarget := needReplaceOriginTargets[i] + originWrapper := needReplaceOriginTargets[i] + originTarget := needReplaceOriginTargets[i].Object originTargetId, _ := GetInstanceID(r.xsetLabelAnnoMgr, originTarget) if ownedIDs[originTargetId] == nil { @@ -131,12 +132,6 @@ func (r *RealSyncControl) replaceOriginTargets( replaceRevision := r.getReplaceRevision(originTarget, syncContext) - // create target using update revision if replaced by update, otherwise using current revision - newTarget, err := NewTargetFrom(r.xsetController, r.xsetLabelAnnoMgr, instance, replaceRevision, originTargetId, - r.xsetController.GetXSetTemplatePatcher(instance)) - if err != nil { - return err - } // add instance id and replace pair label var newInstanceId string var newTargetContext *api.ContextDetail @@ -144,7 +139,6 @@ func (r *RealSyncControl) replaceOriginTargets( newTargetContext = contextDetail // reuse targetContext ID if pair-relation exists newInstanceId = fmt.Sprintf("%d", newTargetContext.ID) - r.xsetLabelAnnoMgr.Set(newTarget, api.XInstanceIdLabelKey, newInstanceId) logger.Info("replaceOriginTargets", "try to reuse new pod resourceContext id", newInstanceId) } else { if availableContexts[i] == nil { @@ -154,11 +148,30 @@ func (r *RealSyncControl) replaceOriginTargets( newTargetContext = availableContexts[i] // add replace pair-relation to targetContexts for originTarget and newTarget newInstanceId = fmt.Sprintf("%d", newTargetContext.ID) - r.xsetLabelAnnoMgr.Set(newTarget, api.XInstanceIdLabelKey, newInstanceId) r.resourceContextControl.Put(ownedIDs[originTargetId], api.EnumReplaceNewTargetIDContextDataKey, newInstanceId) r.resourceContextControl.Put(ownedIDs[newTargetContext.ID], api.EnumReplaceOriginTargetIDContextDataKey, strconv.Itoa(originTargetId)) r.resourceContextControl.Remove(ownedIDs[newTargetContext.ID], api.EnumJustCreateContextDataKey) } + + // create target using update revision if replaced by update, otherwise using current revision + newTarget, err := NewTargetFrom(r.xsetController, r.xsetLabelAnnoMgr, instance, replaceRevision, newTargetContext.ID, + r.xsetController.GetXSetTemplatePatcher(instance), + func(object client.Object) error { + if decorationAdapter, ok := r.xsetController.(api.DecorationAdapter); ok { + // get current decoration patcher from origin target, and patch new target + if fn, err := decorationAdapter.GetDecorationPatcherByRevisions(ctx, r.Client, originTarget, originWrapper.DecorationUpdatedRevisions); err != nil { + return err + } else { + return fn(object) + } + } + return nil + }, + ) + if err != nil { + return err + } + r.xsetLabelAnnoMgr.Set(newTarget, api.XReplacePairOriginName, originTarget.GetName()) r.xsetLabelAnnoMgr.Set(newTarget, api.XCreatingLabel, strconv.FormatInt(time.Now().UnixNano(), 10)) r.resourceContextControl.Put(newTargetContext, api.EnumRevisionContextDataKey, replaceRevision.GetName()) @@ -206,14 +219,15 @@ func (r *RealSyncControl) replaceOriginTargets( return successCount, err } -func (r *RealSyncControl) dealReplaceTargets(ctx context.Context, targets []client.Object) ( - needReplaceTargets, needCleanLabelTargets []client.Object, targetNeedCleanLabels [][]string, needDeleteTargets []client.Object, +func (r *RealSyncControl) dealReplaceTargets(ctx context.Context, targets []*TargetWrapper) ( + needReplaceTargets []*TargetWrapper, needCleanLabelTargets []client.Object, targetNeedCleanLabels [][]string, needDeleteTargets []client.Object, ) { logger := logr.FromContext(ctx) targetInstanceIdMap := make(map[string]client.Object) - targetNameMap := make(map[string]client.Object) + targetNameMap := make(map[string]*TargetWrapper) + filteredTargets := FilterOutActiveTargetWrappers(targets) - for _, target := range targets { + for _, target := range filteredTargets { targetLabels := target.GetLabels() if instanceId, ok := r.xsetLabelAnnoMgr.Get(targetLabels, api.XInstanceIdLabelKey); ok { @@ -223,7 +237,7 @@ func (r *RealSyncControl) dealReplaceTargets(ctx context.Context, targets []clie } // deal need replace targets - for _, target := range targets { + for _, target := range filteredTargets { targetLabels := target.GetLabels() // no replace indication label @@ -232,7 +246,7 @@ func (r *RealSyncControl) dealReplaceTargets(ctx context.Context, targets []clie } // origin target is about to scaleIn, skip replace - if opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, target) { + if opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, target) { logger.Info("dealReplaceTargets", "target is during scaleIn ops lifecycle, skip replacing", target.GetName()) continue } @@ -254,7 +268,8 @@ func (r *RealSyncControl) dealReplaceTargets(ctx context.Context, targets []clie needReplaceTargets = append(needReplaceTargets, target) } - for _, target := range targets { + for _, wrapper := range filteredTargets { + target := wrapper.Object targetLabels := target.GetLabels() _, replaceByUpdate := r.xsetLabelAnnoMgr.Get(targetLabels, api.XReplaceByReplaceUpdateLabelKey) var needCleanLabels []string @@ -272,7 +287,7 @@ func (r *RealSyncControl) dealReplaceTargets(ctx context.Context, targets []clie } else if !replaceByUpdate { // not replace update, delete origin target when new created target is service available if r.xsetController.CheckAvailable(target) { - needDeleteTargets = append(needDeleteTargets, originTarget) + needDeleteTargets = append(needDeleteTargets, originTarget.Object) } } } @@ -296,7 +311,7 @@ func updateReplaceOriginTarget( c client.Client, recorder record.EventRecorder, xsetLabelAnnoMgr api.XSetLabelAnnotationManager, - originTargetUpdateInfo, newTargetUpdateInfo *targetUpdateInfo, + originTargetUpdateInfo, newTargetUpdateInfo *TargetUpdateInfo, ) error { originTarget := originTargetUpdateInfo.Object @@ -371,16 +386,16 @@ func (r *RealSyncControl) getReplaceRevision(originTarget client.Object, syncCon } // classify the pair relationship for Target replacement. -func classifyTargetReplacingMapping(xsetLabelAnnoMgr api.XSetLabelAnnotationManager, targetWrappers []*targetWrapper) map[string]*targetWrapper { - targetNameMap := make(map[string]*targetWrapper) - targetIdMap := make(map[string]*targetWrapper) +func classifyTargetReplacingMapping(xsetLabelAnnoMgr api.XSetLabelAnnotationManager, targetWrappers []*TargetWrapper) map[string]*TargetWrapper { + targetNameMap := make(map[string]*TargetWrapper) + targetIdMap := make(map[string]*TargetWrapper) for _, targetWrapper := range targetWrappers { targetNameMap[targetWrapper.GetName()] = targetWrapper targetIdMap[strconv.Itoa(targetWrapper.ID)] = targetWrapper } // old target name => new target wrapper - replaceTargetMapping := make(map[string]*targetWrapper) + replaceTargetMapping := make(map[string]*TargetWrapper) for _, targetWrapper := range targetWrappers { if targetWrapper.Object == nil { continue diff --git a/xset/synccontrols/x_scale.go b/xset/synccontrols/x_scale.go index 3901e49b..9b94762a 100644 --- a/xset/synccontrols/x_scale.go +++ b/xset/synccontrols/x_scale.go @@ -41,8 +41,8 @@ import ( // getTargetsToDelete // 1. finds number of diff targets from filteredTargets to do scaleIn // 2. finds targets allowed to scale in out of diff -func (r *RealSyncControl) getTargetsToDelete(xsetObject api.XSetObject, filteredTargets []*targetWrapper, replaceMapping map[string]*targetWrapper, diff int) []*targetWrapper { - var countedTargets []*targetWrapper +func (r *RealSyncControl) getTargetsToDelete(xsetObject api.XSetObject, filteredTargets []*TargetWrapper, replaceMapping map[string]*TargetWrapper, diff int) []*TargetWrapper { + var countedTargets []*TargetWrapper for _, target := range filteredTargets { if _, exist := replaceMapping[target.GetName()]; exist { countedTargets = append(countedTargets, target) @@ -56,11 +56,11 @@ func (r *RealSyncControl) getTargetsToDelete(xsetObject api.XSetObject, filtered } // 2. select targets to delete in second round according to replace, delete, exclude - var needDeleteTargets []*targetWrapper + var needDeleteTargets []*TargetWrapper for i, target := range countedTargets { // find targets to be scaleIn out of diff, is allowed to ops spec := r.xsetController.GetXSetSpec(xsetObject) - _, allowed := opslifecycle.AllowOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, ptr.Deref(spec.ScaleStrategy.OperationDelaySeconds, 0), target) + _, allowed := opslifecycle.AllowOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, ptr.Deref(spec.ScaleStrategy.OperationDelaySeconds, 0), target) if i >= diff && !allowed { continue } @@ -87,12 +87,12 @@ func (r *RealSyncControl) getTargetsToDelete(xsetObject api.XSetObject, filtered } type ActiveTargetsForDeletion struct { - targets []*targetWrapper + targets []*TargetWrapper checkReadyFunc func(object client.Object) (bool, *metav1.Time) } func newActiveTargetsForDeletion( - targets []*targetWrapper, + targets []*TargetWrapper, checkReadyFunc func(object client.Object) (bool, *metav1.Time), ) *ActiveTargetsForDeletion { return &ActiveTargetsForDeletion{ diff --git a/xset/synccontrols/x_update.go b/xset/synccontrols/x_update.go index 01df9abc..c7678661 100644 --- a/xset/synccontrols/x_update.go +++ b/xset/synccontrols/x_update.go @@ -18,8 +18,6 @@ package synccontrols import ( "context" - "encoding/json" - "errors" "fmt" "sort" "time" @@ -36,7 +34,6 @@ import ( clientutil "kusionstack.io/kube-utils/client" "kusionstack.io/kube-utils/controller/expectations" - "kusionstack.io/kube-utils/controller/merge" controllerutils "kusionstack.io/kube-utils/controller/utils" "kusionstack.io/kube-utils/xset/api" "kusionstack.io/kube-utils/xset/opslifecycle" @@ -47,16 +44,15 @@ import ( const UnknownRevision = "__unknownRevision__" -func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, syncContext *SyncContext) ([]*targetUpdateInfo, error) { +func (r *RealSyncControl) attachTargetUpdateInfo(_ context.Context, xsetObject api.XSetObject, syncContext *SyncContext) ([]*TargetUpdateInfo, error) { activeTargets := FilterOutActiveTargetWrappers(syncContext.TargetWrappers) - targetUpdateInfoList := make([]*targetUpdateInfo, len(activeTargets)) + targetUpdateInfoList := make([]*TargetUpdateInfo, len(activeTargets)) for i, target := range activeTargets { - updateInfo := &targetUpdateInfo{ - targetWrapper: syncContext.TargetWrappers[i], + updateInfo := &TargetUpdateInfo{ + TargetWrapper: syncContext.TargetWrappers[i], } - // TODO decoration for target template updateInfo.UpdateRevision = syncContext.UpdatedRevision // decide this target current revision, or nil if not indicated if target.GetLabels() != nil { @@ -92,7 +88,7 @@ func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, sync var err error spec := r.xsetController.GetXSetSpec(xsetObject) // decide whether the TargetOpsLifecycle is during ops or not - updateInfo.RequeueForOperationDelay, updateInfo.IsAllowUpdateOps = opslifecycle.AllowOps(r.updateConfig.xsetLabelAnnoMgr, r.updateLifecycleAdapter, ptr.Deref(spec.UpdateStrategy.OperationDelaySeconds, 0), target) + updateInfo.RequeueForOperationDelay, updateInfo.IsAllowUpdateOps = opslifecycle.AllowOps(r.updateConfig.XsetLabelAnnoMgr, r.updateLifecycleAdapter, ptr.Deref(spec.UpdateStrategy.OperationDelaySeconds, 0), target) // check subresource pvc template changed if _, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { updateInfo.PvcTmpHashChanged, err = r.pvcControl.IsTargetPvcTmpChanged(xsetObject, target.Object, syncContext.ExistingPvcs) @@ -104,7 +100,7 @@ func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, sync } // attach replace info - targetUpdateInfoMap := make(map[string]*targetUpdateInfo) + targetUpdateInfoMap := make(map[string]*TargetUpdateInfo) for _, targetUpdateInfo := range targetUpdateInfoList { targetUpdateInfoMap[targetUpdateInfo.GetName()] = targetUpdateInfo } @@ -140,8 +136,8 @@ func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, sync if !target.PlaceHolder { continue } - updateInfo := &targetUpdateInfo{ - targetWrapper: target, + updateInfo := &TargetUpdateInfo{ + TargetWrapper: target, UpdateRevision: syncContext.UpdatedRevision, } if revision, exist := r.resourceContextControl.Get(target.ContextDetail, api.EnumRevisionContextDataKey); exist && @@ -154,8 +150,8 @@ func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, sync return targetUpdateInfoList, nil } -func filterOutPlaceHolderUpdateInfos(targets []*targetUpdateInfo) []*targetUpdateInfo { - var filteredTargetUpdateInfos []*targetUpdateInfo +func filterOutPlaceHolderUpdateInfos(targets []*TargetUpdateInfo) []*TargetUpdateInfo { + var filteredTargetUpdateInfos []*TargetUpdateInfo for _, target := range targets { if target.PlaceHolder { continue @@ -165,7 +161,7 @@ func filterOutPlaceHolderUpdateInfos(targets []*targetUpdateInfo) []*targetUpdat return filteredTargetUpdateInfos } -func (r *RealSyncControl) decideTargetToUpdate(xsetController api.XSetController, xset api.XSetObject, targetInfos []*targetUpdateInfo) []*targetUpdateInfo { +func (r *RealSyncControl) decideTargetToUpdate(xsetController api.XSetController, xset api.XSetObject, targetInfos []*TargetUpdateInfo) []*TargetUpdateInfo { spec := xsetController.GetXSetSpec(xset) filteredPodInfos := r.getTargetsUpdateTargets(targetInfos) @@ -177,22 +173,30 @@ func (r *RealSyncControl) decideTargetToUpdate(xsetController api.XSetController return r.decideTargetToUpdateByPartition(xsetController, xset, filteredPodInfos) } -func (r *RealSyncControl) decideTargetToUpdateByLabel(targetInfos []*targetUpdateInfo) (targetToUpdate []*targetUpdateInfo) { +func (r *RealSyncControl) decideTargetToUpdateByLabel(targetInfos []*TargetUpdateInfo) (targetToUpdate []*TargetUpdateInfo) { for i := range targetInfos { if _, exist := r.xsetLabelAnnoMgr.Get(targetInfos[i].GetLabels(), api.XSetUpdateIndicationLabelKey); exist { targetToUpdate = append(targetToUpdate, targetInfos[i]) continue } - // TODO separate decoration and xset update progress + // separate decoration and xset update progress + if targetInfos[i].DecorationChanged { + if targetInfos[i].IsInReplace { + continue + } + targetInfos[i].IsUpdatedRevision = true + targetInfos[i].UpdateRevision = targetInfos[i].CurrentRevision + targetToUpdate = append(targetToUpdate, targetInfos[i]) + } } return targetToUpdate } -func (r *RealSyncControl) decideTargetToUpdateByPartition(xsetController api.XSetController, xset api.XSetObject, filteredTargetInfos []*targetUpdateInfo) []*targetUpdateInfo { +func (r *RealSyncControl) decideTargetToUpdateByPartition(xsetController api.XSetController, xset api.XSetObject, filteredTargetInfos []*TargetUpdateInfo) []*TargetUpdateInfo { spec := xsetController.GetXSetSpec(xset) replicas := ptr.Deref(spec.Replicas, 0) - + currentTargetCount := int32(len(filteredTargetInfos)) partition := int32(0) if spec.UpdateStrategy.RollingUpdate != nil && spec.UpdateStrategy.RollingUpdate.ByPartition != nil { @@ -211,11 +215,19 @@ func (r *RealSyncControl) decideTargetToUpdateByPartition(xsetController api.XSe ordered := newOrderedTargetUpdateInfos(filteredTargetInfos, xsetController.CheckReadyTime) sort.Sort(ordered) targetToUpdate := ordered.targets[:replicas-partition] + // separate decoration and xset update progress + for i := replicas - partition; i < int32Min(replicas, currentTargetCount); i++ { + if ordered.targets[i].DecorationChanged { + ordered.targets[i].IsUpdatedRevision = true + ordered.targets[i].UpdateRevision = ordered.targets[i].CurrentRevision + targetToUpdate = append(targetToUpdate, ordered.targets[i]) + } + } return targetToUpdate } // when sort targets to choose update, only sort (1) replace origin targets, (2) non-exclude targets -func (r *RealSyncControl) getTargetsUpdateTargets(targetInfos []*targetUpdateInfo) (filteredTargetInfos []*targetUpdateInfo) { +func (r *RealSyncControl) getTargetsUpdateTargets(targetInfos []*TargetUpdateInfo) (filteredTargetInfos []*TargetUpdateInfo) { for _, targetInfo := range targetInfos { if targetInfo.ReplacePairOriginTargetName != "" { continue @@ -231,7 +243,7 @@ func (r *RealSyncControl) getTargetsUpdateTargets(targetInfos []*targetUpdateInf } func newOrderedTargetUpdateInfos( - targetInfos []*targetUpdateInfo, + targetInfos []*TargetUpdateInfo, checkReadyFunc func(object client.Object) (bool, *metav1.Time), ) *orderByDefault { return &orderByDefault{ @@ -241,7 +253,7 @@ func newOrderedTargetUpdateInfos( } type orderByDefault struct { - targets []*targetUpdateInfo + targets []*TargetUpdateInfo checkReadyFunc func(object client.Object) (bool, *metav1.Time) } @@ -257,6 +269,14 @@ func (o *orderByDefault) Less(i, j int) bool { return l.IsUpdatedRevision } + if l.IsDuringUpdateOps != r.IsDuringUpdateOps { + return l.IsDuringUpdateOps + } + + if l.IsInReplaceUpdate != r.IsInReplaceUpdate { + return l.IsInReplaceUpdate + } + if l.PlaceHolder != r.PlaceHolder { return r.PlaceHolder } @@ -265,16 +285,16 @@ func (o *orderByDefault) Less(i, j int) bool { return true } - if l.IsDuringUpdateOps != r.IsDuringUpdateOps { - return l.IsDuringUpdateOps - } - lReady, _ := o.checkReadyFunc(l.Object) rReady, _ := o.checkReadyFunc(r.Object) if lReady != rReady { return lReady } + if l.DecorationChanged != r.DecorationChanged { + return l.DecorationChanged + } + if l.OpsPriority != nil && r.OpsPriority != nil { if l.OpsPriority.PriorityClass != r.OpsPriority.PriorityClass { return l.OpsPriority.PriorityClass < r.OpsPriority.PriorityClass @@ -288,28 +308,28 @@ func (o *orderByDefault) Less(i, j int) bool { } type UpdateConfig struct { - xsetController api.XSetController - xsetLabelAnnoMgr api.XSetLabelAnnotationManager - client client.Client - targetControl xcontrol.TargetControl - resourceContextControl resourcecontexts.ResourceContextControl - recorder record.EventRecorder + XsetController api.XSetController + XsetLabelAnnoMgr api.XSetLabelAnnotationManager + Client client.Client + TargetControl xcontrol.TargetControl + ResourceContextControl resourcecontexts.ResourceContextControl + Recorder record.EventRecorder scaleInLifecycleAdapter api.LifecycleAdapter updateLifecycleAdapter api.LifecycleAdapter - cacheExpectations expectations.CacheExpectationsInterface - targetGVK schema.GroupVersionKind + CacheExpectations expectations.CacheExpectationsInterface + TargetGVK schema.GroupVersionKind } type TargetUpdater interface { Setup(config *UpdateConfig, xset api.XSetObject) - FulfillTargetUpdatedInfo(ctx context.Context, revision *appsv1.ControllerRevision, targetUpdateInfo *targetUpdateInfo) error - BeginUpdateTarget(ctx context.Context, syncContext *SyncContext, targetCh chan *targetUpdateInfo) (bool, error) - FilterAllowOpsTargets(ctx context.Context, targetToUpdate []*targetUpdateInfo, ownedIDs map[int]*api.ContextDetail, syncContext *SyncContext, targetCh chan *targetUpdateInfo) (*time.Duration, error) - UpgradeTarget(ctx context.Context, targetInfo *targetUpdateInfo) error - GetTargetUpdateFinishStatus(ctx context.Context, targetUpdateInfo *targetUpdateInfo) (bool, string, error) - FinishUpdateTarget(ctx context.Context, targetInfo *targetUpdateInfo, finishByCancelUpdate bool) error + FulfillTargetUpdatedInfo(ctx context.Context, revision *appsv1.ControllerRevision, targetUpdateInfo *TargetUpdateInfo) error + BeginUpdateTarget(ctx context.Context, syncContext *SyncContext, targetCh chan *TargetUpdateInfo) (bool, error) + FilterAllowOpsTargets(ctx context.Context, targetToUpdate []*TargetUpdateInfo, ownedIDs map[int]*api.ContextDetail, syncContext *SyncContext, targetCh chan *TargetUpdateInfo) (*time.Duration, error) + UpgradeTarget(ctx context.Context, targetInfo *TargetUpdateInfo) error + GetTargetUpdateFinishStatus(ctx context.Context, targetUpdateInfo *TargetUpdateInfo) (bool, string, error) + FinishUpdateTarget(ctx context.Context, targetInfo *TargetUpdateInfo, finishByCancelUpdate bool) error } type GenericTargetUpdater struct { @@ -323,21 +343,21 @@ func (u *GenericTargetUpdater) Setup(config *UpdateConfig, xset api.XSetObject) u.OwnerObject = xset } -func (u *GenericTargetUpdater) BeginUpdateTarget(_ context.Context, syncContext *SyncContext, targetCh chan *targetUpdateInfo) (bool, error) { +func (u *GenericTargetUpdater) BeginUpdateTarget(_ context.Context, syncContext *SyncContext, targetCh chan *TargetUpdateInfo) (bool, error) { succCount, err := controllerutils.SlowStartBatch(len(targetCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { targetInfo := <-targetCh - u.recorder.Eventf(targetInfo.Object, corev1.EventTypeNormal, "TargetUpdateLifecycle", "try to begin TargetOpsLifecycle for updating Target of XSet") + u.Recorder.Eventf(targetInfo.Object, corev1.EventTypeNormal, "TargetUpdateLifecycle", "try to begin TargetOpsLifecycle for updating Target of XSet") - if updated, err := opslifecycle.BeginWithCleaningOld(u.xsetLabelAnnoMgr, u.client, u.updateLifecycleAdapter, targetInfo.Object, func(obj client.Object) (bool, error) { + if updated, err := opslifecycle.BeginWithCleaningOld(u.XsetLabelAnnoMgr, u.Client, u.updateLifecycleAdapter, targetInfo.Object, func(obj client.Object) (bool, error) { if !targetInfo.OnlyMetadataChanged && !targetInfo.InPlaceUpdateSupport { - return opslifecycle.WhenBeginDelete(u.xsetLabelAnnoMgr, obj) + return opslifecycle.WhenBeginDelete(u.XsetLabelAnnoMgr, obj) } return false, nil }); err != nil { return fmt.Errorf("fail to begin TargetOpsLifecycle for updating Target %s/%s: %s", targetInfo.GetNamespace(), targetInfo.GetName(), err.Error()) } else if updated { // add an expectation for this target update, before next reconciling - if err := u.cacheExpectations.ExpectUpdation(clientutil.ObjectKeyString(u.OwnerObject), u.targetGVK, targetInfo.GetNamespace(), targetInfo.GetName(), targetInfo.GetResourceVersion()); err != nil { + if err := u.CacheExpectations.ExpectUpdation(clientutil.ObjectKeyString(u.OwnerObject), u.TargetGVK, targetInfo.GetNamespace(), targetInfo.GetName(), targetInfo.GetResourceVersion()); err != nil { return err } } @@ -354,7 +374,7 @@ func (u *GenericTargetUpdater) BeginUpdateTarget(_ context.Context, syncContext return updating, nil } -func (u *GenericTargetUpdater) FilterAllowOpsTargets(ctx context.Context, candidates []*targetUpdateInfo, ownedIDs map[int]*api.ContextDetail, _ *SyncContext, targetCh chan *targetUpdateInfo) (*time.Duration, error) { +func (u *GenericTargetUpdater) FilterAllowOpsTargets(ctx context.Context, candidates []*TargetUpdateInfo, ownedIDs map[int]*api.ContextDetail, _ *SyncContext, targetCh chan *TargetUpdateInfo) (*time.Duration, error) { var recordedRequeueAfter *time.Duration needUpdateContext := false for i := range candidates { @@ -365,7 +385,7 @@ func (u *GenericTargetUpdater) FilterAllowOpsTargets(ctx context.Context, candid continue } if targetInfo.RequeueForOperationDelay != nil { - u.recorder.Eventf(targetInfo, corev1.EventTypeNormal, "TargetUpdateLifecycle", "delay Target update for %f seconds", targetInfo.RequeueForOperationDelay.Seconds()) + u.Recorder.Eventf(targetInfo, corev1.EventTypeNormal, "TargetUpdateLifecycle", "delay Target update for %f seconds", targetInfo.RequeueForOperationDelay.Seconds()) if recordedRequeueAfter == nil || *targetInfo.RequeueForOperationDelay < *recordedRequeueAfter { recordedRequeueAfter = targetInfo.RequeueForOperationDelay } @@ -375,26 +395,34 @@ func (u *GenericTargetUpdater) FilterAllowOpsTargets(ctx context.Context, candid targetInfo.IsAllowUpdateOps = true - if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged { + if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged && !targetInfo.DecorationChanged { continue } if _, exist := ownedIDs[targetInfo.ID]; !exist { - u.recorder.Eventf(u.OwnerObject, corev1.EventTypeWarning, "TargetBeforeUpdate", "target %s/%s is not allowed to update because cannot find context id %s in resourceContext", targetInfo.GetNamespace(), targetInfo.GetName(), targetInfo.GetLabels()[u.xsetLabelAnnoMgr.Value(api.XInstanceIdLabelKey)]) + u.Recorder.Eventf(u.OwnerObject, corev1.EventTypeWarning, "TargetBeforeUpdate", "target %s/%s is not allowed to update because cannot find context id %s in resourceContext", targetInfo.GetNamespace(), targetInfo.GetName(), targetInfo.GetLabels()[u.XsetLabelAnnoMgr.Value(api.XInstanceIdLabelKey)]) continue } - if !u.resourceContextControl.Contains(ownedIDs[targetInfo.ID], api.EnumRevisionContextDataKey, targetInfo.UpdateRevision.GetName()) { + if !u.ResourceContextControl.Contains(ownedIDs[targetInfo.ID], api.EnumRevisionContextDataKey, targetInfo.UpdateRevision.GetName()) { needUpdateContext = true - u.resourceContextControl.Put(ownedIDs[targetInfo.ID], api.EnumRevisionContextDataKey, targetInfo.UpdateRevision.GetName()) + u.ResourceContextControl.Put(ownedIDs[targetInfo.ID], api.EnumRevisionContextDataKey, targetInfo.UpdateRevision.GetName()) } - spec := u.xsetController.GetXSetSpec(u.OwnerObject) + spec := u.XsetController.GetXSetSpec(u.OwnerObject) // mark targetContext "TargetRecreateUpgrade" if upgrade by recreate isRecreateUpdatePolicy := spec.UpdateStrategy.UpdatePolicy == api.XSetRecreateTargetUpdateStrategyType if (!targetInfo.OnlyMetadataChanged && !targetInfo.InPlaceUpdateSupport) || isRecreateUpdatePolicy { - u.resourceContextControl.Put(ownedIDs[targetInfo.ID], api.EnumRecreateUpdateContextDataKey, "true") + u.ResourceContextControl.Put(ownedIDs[targetInfo.ID], api.EnumRecreateUpdateContextDataKey, "true") + } + + // add decoration revision to target context + if targetInfo.DecorationChanged { + if val, ok := u.ResourceContextControl.Get(ownedIDs[targetInfo.ID], api.EnumTargetDecorationRevisionKey); !ok || val != targetInfo.DecorationUpdatedRevisions { + needUpdateContext = true + u.ResourceContextControl.Put(ownedIDs[targetInfo.ID], api.EnumTargetDecorationRevisionKey, targetInfo.DecorationUpdatedRevisions) + } } if targetInfo.PlaceHolder { @@ -406,30 +434,30 @@ func (u *GenericTargetUpdater) FilterAllowOpsTargets(ctx context.Context, candid } // mark Target to use updated revision before updating it. if needUpdateContext { - u.recorder.Eventf(u.OwnerObject, corev1.EventTypeNormal, "UpdateToTargetContext", "try to update ResourceContext for XSet") + u.Recorder.Eventf(u.OwnerObject, corev1.EventTypeNormal, "UpdateToTargetContext", "try to update ResourceContext for XSet") err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - return u.resourceContextControl.UpdateToTargetContext(ctx, u.OwnerObject, ownedIDs) + return u.ResourceContextControl.UpdateToTargetContext(ctx, u.OwnerObject, ownedIDs) }) return recordedRequeueAfter, err } return recordedRequeueAfter, nil } -func (u *GenericTargetUpdater) FinishUpdateTarget(_ context.Context, targetInfo *targetUpdateInfo, finishByCancelUpdate bool) error { +func (u *GenericTargetUpdater) FinishUpdateTarget(_ context.Context, targetInfo *TargetUpdateInfo, finishByCancelUpdate bool) error { if finishByCancelUpdate { // cancel update lifecycle - return opslifecycle.CancelOpsLifecycle(u.xsetLabelAnnoMgr, u.client, u.updateLifecycleAdapter, targetInfo.Object) + return opslifecycle.CancelOpsLifecycle(u.XsetLabelAnnoMgr, u.Client, u.updateLifecycleAdapter, targetInfo.Object) } // target is ops finished, finish the lifecycle gracefully - if updated, err := opslifecycle.Finish(u.xsetLabelAnnoMgr, u.client, u.updateLifecycleAdapter, targetInfo.Object); err != nil { + if updated, err := opslifecycle.Finish(u.XsetLabelAnnoMgr, u.Client, u.updateLifecycleAdapter, targetInfo.Object); err != nil { return fmt.Errorf("failed to finish TargetOpsLifecycle for updating Target %s/%s: %s", targetInfo.GetNamespace(), targetInfo.GetName(), err.Error()) } else if updated { // add an expectation for this target update, before next reconciling - if err := u.cacheExpectations.ExpectUpdation(clientutil.ObjectKeyString(u.OwnerObject), u.targetGVK, targetInfo.GetNamespace(), targetInfo.GetName(), targetInfo.GetResourceVersion()); err != nil { + if err := u.CacheExpectations.ExpectUpdation(clientutil.ObjectKeyString(u.OwnerObject), u.TargetGVK, targetInfo.GetNamespace(), targetInfo.GetName(), targetInfo.GetResourceVersion()); err != nil { return err } - u.recorder.Eventf(targetInfo.Object, + u.Recorder.Eventf(targetInfo.Object, corev1.EventTypeNormal, "UpdateReady", "target %s/%s update finished", targetInfo.GetNamespace(), targetInfo.GetName()) } @@ -443,6 +471,13 @@ func RegisterInPlaceOnlyUpdater(targetUpdater TargetUpdater) { inPlaceOnlyTargetUpdater = targetUpdater } +// Support users to define inPlaceIfPossibleUpdater and register through RegistryInPlaceIfPossibleUpdater +var inPlaceIfPossibleUpdater TargetUpdater + +func RegisterInPlaceIfPossibleUpdater(targetUpdater TargetUpdater) { + inPlaceIfPossibleUpdater = targetUpdater +} + func (r *RealSyncControl) newTargetUpdater(xset api.XSetObject) TargetUpdater { spec := r.xsetController.GetXSetSpec(xset) var targetUpdater TargetUpdater @@ -452,88 +487,33 @@ func (r *RealSyncControl) newTargetUpdater(xset api.XSetObject) TargetUpdater { case api.XSetInPlaceOnlyTargetUpdateStrategyType: if inPlaceOnlyTargetUpdater != nil { targetUpdater = inPlaceOnlyTargetUpdater - } else { + } else if inPlaceIfPossibleUpdater != nil { // In case of using native K8s, Target is only allowed to update with container image, so InPlaceOnly policy is // implemented with InPlaceIfPossible policy as default for compatibility. - targetUpdater = &inPlaceIfPossibleUpdater{} + targetUpdater = inPlaceIfPossibleUpdater + } else { + // if none of InplaceOnly and InplaceIfPossible updater is registered, use default Recreate updater + targetUpdater = &recreateTargetUpdater{} } case api.XSetReplaceTargetUpdateStrategyType: targetUpdater = &replaceUpdateTargetUpdater{} default: - targetUpdater = &inPlaceIfPossibleUpdater{} + if inPlaceIfPossibleUpdater != nil { + targetUpdater = inPlaceIfPossibleUpdater + } else { + targetUpdater = &recreateTargetUpdater{} + } } targetUpdater.Setup(r.updateConfig, xset) return targetUpdater } -type TargetStatus struct { - ContainerStates map[string]*ContainerStatus `json:"containerStates,omitempty"` -} - -type ContainerStatus struct { - LatestImage string `json:"latestImage,omitempty"` - LastImageID string `json:"lastImageID,omitempty"` -} - -type inPlaceIfPossibleUpdater struct { - GenericTargetUpdater -} - -func (u *inPlaceIfPossibleUpdater) FulfillTargetUpdatedInfo(_ context.Context, revision *appsv1.ControllerRevision, targetUpdateInfo *targetUpdateInfo) error { - // 1. build target from current and updated revision - // TODO: use cache - currentTarget, err := NewTargetFrom(u.xsetController, u.xsetLabelAnnoMgr, u.OwnerObject, targetUpdateInfo.CurrentRevision, targetUpdateInfo.ID) - if err != nil { - return fmt.Errorf("fail to build Target from current revision %s: %v", targetUpdateInfo.CurrentRevision.GetName(), err.Error()) - } - - // TODO: use cache - - UpdatedTarget, err := NewTargetFrom(u.xsetController, u.xsetLabelAnnoMgr, u.OwnerObject, targetUpdateInfo.UpdateRevision, targetUpdateInfo.ID) - if err != nil { - return fmt.Errorf("fail to build Target from updated revision %s: %v", targetUpdateInfo.UpdateRevision.GetName(), err.Error()) - } - - if targetUpdateInfo.PvcTmpHashChanged { - targetUpdateInfo.InPlaceUpdateSupport, targetUpdateInfo.OnlyMetadataChanged = false, false - } - - newUpdatedTarget := targetUpdateInfo.targetWrapper.Object.DeepCopyObject().(client.Object) - if err = merge.ThreeWayMergeToTarget(currentTarget, UpdatedTarget, newUpdatedTarget, u.xsetController.NewXObject()); err != nil { - return fmt.Errorf("fail to patch Target %s/%s: %v", targetUpdateInfo.GetNamespace(), targetUpdateInfo.GetName(), err.Error()) - } - targetUpdateInfo.UpdatedTarget = newUpdatedTarget - - return nil -} - -func (u *inPlaceIfPossibleUpdater) UpgradeTarget(ctx context.Context, targetInfo *targetUpdateInfo) error { - if targetInfo.OnlyMetadataChanged || targetInfo.InPlaceUpdateSupport { - // if target template changes only include metadata or support in-place update, just apply these changes to target directly - if err := u.targetControl.UpdateTarget(ctx, targetInfo.UpdatedTarget); err != nil { - return fmt.Errorf("fail to update Target %s/%s when updating by in-place: %s", targetInfo.GetNamespace(), targetInfo.GetName(), err.Error()) - } - targetInfo.Object = targetInfo.UpdatedTarget - u.recorder.Eventf(targetInfo.Object, - corev1.EventTypeNormal, - "UpdateTarget", - "succeed to update Target %s/%s to from revision %s to revision %s by in-place", - targetInfo.GetNamespace(), targetInfo.GetName(), - targetInfo.CurrentRevision.GetName(), - targetInfo.UpdateRevision.GetName()) - return u.cacheExpectations.ExpectUpdation(clientutil.ObjectKeyString(u.OwnerObject), u.targetGVK, targetInfo.Object.GetNamespace(), targetInfo.Object.GetName(), targetInfo.Object.GetResourceVersion()) - } else { - // if target has changes not in-place supported, recreate it - return u.GenericTargetUpdater.RecreateTarget(ctx, targetInfo) - } -} - -func (u *GenericTargetUpdater) RecreateTarget(ctx context.Context, targetInfo *targetUpdateInfo) error { - if err := u.targetControl.DeleteTarget(ctx, targetInfo.Object); err != nil { +func (u *GenericTargetUpdater) RecreateTarget(ctx context.Context, targetInfo *TargetUpdateInfo) error { + if err := u.TargetControl.DeleteTarget(ctx, targetInfo.Object); err != nil { return fmt.Errorf("fail to delete Target %s/%s when updating by recreate: %v", targetInfo.GetNamespace(), targetInfo.GetName(), err.Error()) } - u.recorder.Eventf(targetInfo.Object, + u.Recorder.Eventf(targetInfo.Object, corev1.EventTypeNormal, "UpdateTarget", "succeed to update Target %s/%s to from revision %s to revision %s by recreate", @@ -545,41 +525,21 @@ func (u *GenericTargetUpdater) RecreateTarget(ctx context.Context, targetInfo *t return nil } -func (u *inPlaceIfPossibleUpdater) GetTargetUpdateFinishStatus(_ context.Context, targetUpdateInfo *targetUpdateInfo) (finished bool, msg string, err error) { - if targetUpdateInfo.GetAnnotations() == nil { - return false, "no annotations for last container status", nil - } - - targetLastState := &TargetStatus{} - if lastStateJson, exist := u.xsetLabelAnnoMgr.Get(targetUpdateInfo.GetAnnotations(), api.LastXStatusAnnotationKey); !exist { - return false, "no target last state annotation", nil - } else if err := json.Unmarshal([]byte(lastStateJson), targetLastState); err != nil { - msg := fmt.Sprintf("malformat target last state annotation [%s]: %s", lastStateJson, err.Error()) - return false, msg, errors.New(msg) - } - - if targetLastState.ContainerStates == nil { - return true, "empty last container state recorded", nil - } - - return true, "", nil -} - type recreateTargetUpdater struct { GenericTargetUpdater } -func (u *recreateTargetUpdater) FulfillTargetUpdatedInfo(_ context.Context, _ *appsv1.ControllerRevision, _ *targetUpdateInfo) error { +func (u *recreateTargetUpdater) FulfillTargetUpdatedInfo(_ context.Context, _ *appsv1.ControllerRevision, _ *TargetUpdateInfo) error { return nil } -func (u *recreateTargetUpdater) UpgradeTarget(ctx context.Context, targetInfo *targetUpdateInfo) error { +func (u *recreateTargetUpdater) UpgradeTarget(ctx context.Context, targetInfo *TargetUpdateInfo) error { return u.GenericTargetUpdater.RecreateTarget(ctx, targetInfo) } -func (u *recreateTargetUpdater) GetTargetUpdateFinishStatus(_ context.Context, targetInfo *targetUpdateInfo) (finished bool, msg string, err error) { +func (u *recreateTargetUpdater) GetTargetUpdateFinishStatus(_ context.Context, targetInfo *TargetUpdateInfo) (finished bool, msg string, err error) { // Recreate policy always treat Target as update not finished - return targetInfo.IsUpdatedRevision, "", nil + return targetInfo.IsUpdatedRevision && !targetInfo.DecorationChanged, "", nil } type replaceUpdateTargetUpdater struct { @@ -590,7 +550,7 @@ func (u *replaceUpdateTargetUpdater) Setup(config *UpdateConfig, xset api.XSetOb u.GenericTargetUpdater.Setup(config, xset) } -func (u *replaceUpdateTargetUpdater) BeginUpdateTarget(ctx context.Context, syncContext *SyncContext, targetCh chan *targetUpdateInfo) (bool, error) { +func (u *replaceUpdateTargetUpdater) BeginUpdateTarget(ctx context.Context, syncContext *SyncContext, targetCh chan *TargetUpdateInfo) (bool, error) { succCount, err := controllerutils.SlowStartBatch(len(targetCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { targetInfo := <-targetCh if targetInfo.ReplacePairNewTargetInfo != nil { @@ -599,11 +559,11 @@ func (u *replaceUpdateTargetUpdater) BeginUpdateTarget(ctx context.Context, sync if exist && newTargetRevision == targetInfo.UpdateRevision.GetName() { return nil } - if _, exist := u.xsetLabelAnnoMgr.Get(replacePairNewTarget.GetLabels(), api.XDeletionIndicationLabelKey); exist { + if _, exist := u.XsetLabelAnnoMgr.Get(replacePairNewTarget.GetLabels(), api.XDeletionIndicationLabelKey); exist { return nil } - u.recorder.Eventf(targetInfo.Object, + u.Recorder.Eventf(targetInfo.Object, corev1.EventTypeNormal, "ReplaceUpdateTarget", "label to-delete on new pair target %s/%s because it is not updated revision, current revision: %s, updated revision: %s", @@ -611,8 +571,8 @@ func (u *replaceUpdateTargetUpdater) BeginUpdateTarget(ctx context.Context, sync replacePairNewTarget.GetName(), newTargetRevision, syncContext.UpdatedRevision.GetName()) - patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{%q:"%d"}}}`, u.xsetLabelAnnoMgr.Value(api.XDeletionIndicationLabelKey), time.Now().UnixNano()))) - if patchErr := u.client.Patch(ctx, targetInfo.ReplacePairNewTargetInfo.Object, patch); patchErr != nil { + patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{%q:"%d"}}}`, u.XsetLabelAnnoMgr.Value(api.XDeletionIndicationLabelKey), time.Now().UnixNano()))) + if patchErr := u.Client.Patch(ctx, targetInfo.ReplacePairNewTargetInfo.Object, patch); patchErr != nil { err := fmt.Errorf("failed to delete replace pair new target %s/%s %s", targetInfo.ReplacePairNewTargetInfo.GetNamespace(), targetInfo.ReplacePairNewTargetInfo.GetName(), patchErr.Error()) return err @@ -624,10 +584,10 @@ func (u *replaceUpdateTargetUpdater) BeginUpdateTarget(ctx context.Context, sync return succCount > 0, err } -func (u *replaceUpdateTargetUpdater) FilterAllowOpsTargets(_ context.Context, candidates []*targetUpdateInfo, _ map[int]*api.ContextDetail, _ *SyncContext, targetCh chan *targetUpdateInfo) (requeueAfter *time.Duration, err error) { +func (u *replaceUpdateTargetUpdater) FilterAllowOpsTargets(_ context.Context, candidates []*TargetUpdateInfo, _ map[int]*api.ContextDetail, _ *SyncContext, targetCh chan *TargetUpdateInfo) (requeueAfter *time.Duration, err error) { activeTargetToUpdate := filterOutPlaceHolderUpdateInfos(candidates) for i, targetInfo := range activeTargetToUpdate { - if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged { + if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged && !targetInfo.DecorationChanged { continue } @@ -636,15 +596,15 @@ func (u *replaceUpdateTargetUpdater) FilterAllowOpsTargets(_ context.Context, ca return nil, err } -func (u *replaceUpdateTargetUpdater) FulfillTargetUpdatedInfo(_ context.Context, _ *appsv1.ControllerRevision, _ *targetUpdateInfo) (err error) { +func (u *replaceUpdateTargetUpdater) FulfillTargetUpdatedInfo(_ context.Context, _ *appsv1.ControllerRevision, _ *TargetUpdateInfo) (err error) { return } -func (u *replaceUpdateTargetUpdater) UpgradeTarget(ctx context.Context, targetInfo *targetUpdateInfo) error { - return updateReplaceOriginTarget(ctx, u.client, u.recorder, u.xsetLabelAnnoMgr, targetInfo, targetInfo.ReplacePairNewTargetInfo) +func (u *replaceUpdateTargetUpdater) UpgradeTarget(ctx context.Context, targetInfo *TargetUpdateInfo) error { + return updateReplaceOriginTarget(ctx, u.Client, u.Recorder, u.XsetLabelAnnoMgr, targetInfo, targetInfo.ReplacePairNewTargetInfo) } -func (u *replaceUpdateTargetUpdater) GetTargetUpdateFinishStatus(_ context.Context, targetUpdateInfo *targetUpdateInfo) (finished bool, msg string, err error) { +func (u *replaceUpdateTargetUpdater) GetTargetUpdateFinishStatus(_ context.Context, targetUpdateInfo *TargetUpdateInfo) (finished bool, msg string, err error) { replaceNewTargetInfo := targetUpdateInfo.ReplacePairNewTargetInfo if replaceNewTargetInfo == nil { return @@ -653,12 +613,12 @@ func (u *replaceUpdateTargetUpdater) GetTargetUpdateFinishStatus(_ context.Conte return u.isTargetUpdatedServiceAvailable(replaceNewTargetInfo) } -func (u *replaceUpdateTargetUpdater) FinishUpdateTarget(ctx context.Context, targetInfo *targetUpdateInfo, finishByCancelUpdate bool) error { +func (u *replaceUpdateTargetUpdater) FinishUpdateTarget(ctx context.Context, targetInfo *TargetUpdateInfo, finishByCancelUpdate bool) error { if finishByCancelUpdate { // cancel replace update by removing to-replace and replace-by-update label from origin target if targetInfo.IsInReplace { - patch := client.RawPatch(types.MergePatchType, fmt.Appendf(nil, `{"metadata":{"labels":{"%s":null, "%s":null}}}`, u.xsetLabelAnnoMgr.Value(api.XReplaceIndicationLabelKey), u.xsetLabelAnnoMgr.Value(api.XReplaceByReplaceUpdateLabelKey))) - if err := u.targetControl.PatchTarget(ctx, targetInfo.Object, patch); err != nil { + patch := client.RawPatch(types.MergePatchType, fmt.Appendf(nil, `{"metadata":{"labels":{"%s":null, "%s":null}}}`, u.XsetLabelAnnoMgr.Value(api.XReplaceIndicationLabelKey), u.XsetLabelAnnoMgr.Value(api.XReplaceByReplaceUpdateLabelKey))) + if err := u.TargetControl.PatchTarget(ctx, targetInfo.Object, patch); err != nil { return fmt.Errorf("failed to patch replace pair target %s/%s %w when cancel replace update", targetInfo.GetNamespace(), targetInfo.GetName(), err) } } @@ -667,9 +627,9 @@ func (u *replaceUpdateTargetUpdater) FinishUpdateTarget(ctx context.Context, tar ReplacePairNewTargetInfo := targetInfo.ReplacePairNewTargetInfo if ReplacePairNewTargetInfo != nil { - if _, exist := u.xsetLabelAnnoMgr.Get(targetInfo.GetLabels(), api.XDeletionIndicationLabelKey); !exist { - patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{%q:"%d"}}}`, u.xsetLabelAnnoMgr.Value(api.XDeletionIndicationLabelKey), time.Now().UnixNano()))) - if err := u.targetControl.PatchTarget(ctx, targetInfo.Object, patch); err != nil { + if _, exist := u.XsetLabelAnnoMgr.Get(targetInfo.GetLabels(), api.XDeletionIndicationLabelKey); !exist { + patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{%q:"%d"}}}`, u.XsetLabelAnnoMgr.Value(api.XDeletionIndicationLabelKey), time.Now().UnixNano()))) + if err := u.TargetControl.PatchTarget(ctx, targetInfo.Object, patch); err != nil { return fmt.Errorf("failed to delete replace pair origin target %s/%s %s", targetInfo.GetNamespace(), targetInfo.ReplacePairNewTargetInfo.GetName(), err.Error()) } } @@ -677,8 +637,12 @@ func (u *replaceUpdateTargetUpdater) FinishUpdateTarget(ctx context.Context, tar return nil } -func (u *GenericTargetUpdater) isTargetUpdatedServiceAvailable(targetInfo *targetUpdateInfo) (finished bool, msg string, err error) { - // TODO check decoration changed +func (u *GenericTargetUpdater) isTargetUpdatedServiceAvailable(targetInfo *TargetUpdateInfo) (finished bool, msg string, err error) { + // check decoration changed + if targetInfo.DecorationChanged { + return false, "decoration changed", nil + } + if targetInfo.GetLabels() == nil { return false, "no labels on target", nil } @@ -686,9 +650,17 @@ func (u *GenericTargetUpdater) isTargetUpdatedServiceAvailable(targetInfo *targe return false, "replace origin target", nil } - if u.xsetController.CheckAvailable(targetInfo.Object) { + if u.XsetController.CheckAvailable(targetInfo.Object) { return true, "", nil } return false, "target not service available", nil } + +func int32Min(l, r int32) int32 { + if l < r { + return l + } + + return r +} diff --git a/xset/synccontrols/x_utils.go b/xset/synccontrols/x_utils.go index 4e0f9767..3fc72718 100644 --- a/xset/synccontrols/x_utils.go +++ b/xset/synccontrols/x_utils.go @@ -65,6 +65,10 @@ func NewTargetFrom(setController api.XSetController, xsetLabelAnnoMgr api.XSetLa targetObj.SetNamespace(owner.GetNamespace()) targetObj.SetGenerateName(GetTargetsPrefix(owner.GetName())) + if IsTargetNamingSuffixPolicyPersistentSequence(setController.GetXSetSpec(owner)) { + targetObj.SetName(fmt.Sprintf("%s%d", targetObj.GetGenerateName(), id)) + } + xsetLabelAnnoMgr.Set(targetObj, api.XInstanceIdLabelKey, fmt.Sprintf("%d", id)) targetObj.GetLabels()[appsv1.ControllerRevisionHashLabelKey] = revision.GetName() controlByXSet(xsetLabelAnnoMgr, targetObj) @@ -96,6 +100,15 @@ func AddOrUpdateCondition(status *api.XSetStatus, conditionType api.XSetConditio cond := condition.NewCondition(string(conditionType), condStatus, reason, message) status.Conditions = condition.SetCondition(status.Conditions, *cond) + + // update condition last transition time + for i := range status.Conditions { + c := status.Conditions[i] + if c.Type == string(conditionType) { + status.Conditions[i].LastTransitionTime = metav1.Now() + return + } + } } func GetTargetsPrefix(controllerName string) string { @@ -140,7 +153,7 @@ func IsControlledByXSet(xsetLabelManager api.XSetLabelAnnotationManager, obj cli return ok && v == "true" } -func ApplyTemplatePatcher(ctx context.Context, xsetController api.XSetController, c client.Client, xset api.XSetObject, targets []*targetWrapper) error { +func ApplyTemplatePatcher(ctx context.Context, xsetController api.XSetController, c client.Client, xset api.XSetObject, targets []*TargetWrapper) error { _, patchErr := controllerutils.SlowStartBatch(len(targets), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error { if targets[i].Object == nil || targets[i].PlaceHolder { return nil @@ -172,3 +185,10 @@ func afterOrZero(t1, t2 *metav1.Time) bool { } return t1.After(t2.Time) } + +func IsTargetNamingSuffixPolicyPersistentSequence(xsetSpec *api.XSetSpec) bool { + if xsetSpec == nil || xsetSpec.NamingStrategy == nil { + return false + } + return xsetSpec.NamingStrategy.TargetNamingSuffixPolicy == api.TargetNamingSuffixPolicyPersistentSequence +} diff --git a/xset/xcontrol/target_control.go b/xset/xcontrol/target_control.go index a2a9fe33..1d5378e3 100644 --- a/xset/xcontrol/target_control.go +++ b/xset/xcontrol/target_control.go @@ -39,7 +39,7 @@ const ( ) type TargetControl interface { - GetFilteredTargets(ctx context.Context, selector *metav1.LabelSelector, owner api.XSetObject) ([]client.Object, error) + GetFilteredTargets(ctx context.Context, selector *metav1.LabelSelector, owner api.XSetObject) ([]client.Object, []client.Object, error) CreateTarget(ctx context.Context, target client.Object) (client.Object, error) DeleteTarget(ctx context.Context, target client.Object) error UpdateTarget(ctx context.Context, target client.Object) error @@ -71,19 +71,19 @@ func NewTargetControl(mixin *mixin.ReconcilerMixin, xsetController api.XSetContr }, nil } -func (r *targetControl) GetFilteredTargets(ctx context.Context, selector *metav1.LabelSelector, owner api.XSetObject) ([]client.Object, error) { +func (r *targetControl) GetFilteredTargets(ctx context.Context, selector *metav1.LabelSelector, owner api.XSetObject) ([]client.Object, []client.Object, error) { targetList := r.xsetController.NewXObjectList() if err := r.client.List(ctx, targetList, &client.ListOptions{ Namespace: owner.GetNamespace(), FieldSelector: fields.OneTermEqualSelector(FieldIndexOwnerRefUID, string(owner.GetUID())), }); err != nil { - return nil, err + return nil, nil, err } targetListVal := reflect.Indirect(reflect.ValueOf(targetList)) itemsVal := targetListVal.FieldByName("Items") if !itemsVal.IsValid() { - return nil, fmt.Errorf("target list items is invalid") + return nil, nil, fmt.Errorf("target list items is invalid") } var items []client.Object @@ -94,12 +94,18 @@ func (r *targetControl) GetFilteredTargets(ctx context.Context, selector *metav1 items[i] = itemVal.(client.Object) } } else { - return nil, fmt.Errorf("target list items is invalid") + return nil, nil, fmt.Errorf("target list items is invalid") + } + + allTargets, err := r.getTargets(items, selector, owner) + if err != nil { + return nil, nil, err } items = filterOutInactiveTargets(r.xsetController, items) - targets, err := r.getTargets(items, selector, owner) - return targets, err + filteredTargets, err := r.getTargets(items, selector, owner) + + return filteredTargets, allTargets, err } func (r *targetControl) CreateTarget(ctx context.Context, target client.Object) (client.Object, error) { diff --git a/xset/xset_controller.go b/xset/xset_controller.go index 0cf30b09..20f72ec7 100644 --- a/xset/xset_controller.go +++ b/xset/xset_controller.go @@ -146,6 +146,14 @@ func SetUpWithManager(mgr ctrl.Manager, xsetController api.XSetController) error return fmt.Errorf("failed to watch %s: %s", targetMeta.Kind, err.Error()) } + // watch for decoration changed + if adapter, ok := xsetController.(api.DecorationAdapter); ok { + err = adapter.WatchDecoration(c) + if err != nil { + return err + } + } + return nil } @@ -174,16 +182,20 @@ func (r *xSetCommonReconciler) Reconcile(ctx context.Context, req reconcile.Requ if instance.GetDeletionTimestamp() != nil { if controllerutil.ContainsFinalizer(instance, r.finalizerName) { - // reclaim owner IDs in ResourceContextControl - if err := r.resourceContextControl.UpdateToTargetContext(ctx, instance, nil); err != nil { + // reclaim target sub resources before remove finalizers + if err := r.ensureReclaimTargetSubResources(ctx, instance); err != nil { + return ctrl.Result{}, err + } + // reclaim decoration ownerReferences before remove finalizers + if err := r.ensureReclaimOwnerReferences(ctx, instance); err != nil { return ctrl.Result{}, err } if cleaned, err := r.ensureReclaimTargetsDeletion(ctx, instance); !cleaned || err != nil { // reclaim targets deletion before remove finalizers return ctrl.Result{}, err } - // reclaim target sub resources before remove finalizers - if err := r.ensureReclaimTargetSubResources(ctx, instance); err != nil { + // reclaim owner IDs in ResourceContextControl + if err := r.resourceContextControl.UpdateToTargetContext(ctx, instance, nil); err != nil { return ctrl.Result{}, err } } @@ -284,7 +296,7 @@ func (r *xSetCommonReconciler) ensureReclaimPvcs(ctx context.Context, xset api.X func (r *xSetCommonReconciler) ensureReclaimTargetsDeletion(ctx context.Context, instance api.XSetObject) (bool, error) { xSetSpec := r.XSetController.GetXSetSpec(instance) - targets, err := r.targetControl.GetFilteredTargets(ctx, xSetSpec.Selector, instance) + _, targets, err := r.targetControl.GetFilteredTargets(ctx, xSetSpec.Selector, instance) if err != nil { return false, fmt.Errorf("fail to get filtered Targets: %s", err.Error()) } @@ -303,6 +315,40 @@ func (r *xSetCommonReconciler) ensureReclaimTargetsDeletion(ctx context.Context, return true, nil } +// ensureReclaimOwnerReferences removes decoration ownerReference from filteredPods if xset is deleting. +func (r *xSetCommonReconciler) ensureReclaimOwnerReferences(ctx context.Context, instance api.XSetObject) error { + decorationAdapter, ok := r.XSetController.(api.DecorationAdapter) + if !ok { + return nil + } + xSetSpec := r.XSetController.GetXSetSpec(instance) + _, filteredTargets, err := r.targetControl.GetFilteredTargets(ctx, xSetSpec.Selector, instance) + if err != nil { + return fmt.Errorf("fail to get filtered Targets: %s", err.Error()) + } + // reclaim decoration ownerReferences on filteredPods + gvk := decorationAdapter.GetDecorationGroupVersionKind() + for i := range filteredTargets { + if len(filteredTargets[i].GetOwnerReferences()) == 0 { + continue + } + var newOwnerRefs []metav1.OwnerReference + for j := range filteredTargets[i].GetOwnerReferences() { + if filteredTargets[i].GetOwnerReferences()[j].Kind == gvk.Kind { + continue + } + newOwnerRefs = append(newOwnerRefs, filteredTargets[i].GetOwnerReferences()[j]) + } + if len(newOwnerRefs) != len(filteredTargets[i].GetOwnerReferences()) { + filteredTargets[i].SetOwnerReferences(newOwnerRefs) + if err := r.targetControl.UpdateTarget(ctx, filteredTargets[i]); err != nil { + return err + } + } + } + return nil +} + func (r *xSetCommonReconciler) updateStatus(ctx context.Context, instance api.XSetObject, status *api.XSetStatus) error { r.XSetController.SetXSetStatus(instance, status) if err := r.Client.Status().Update(ctx, instance); err != nil {