Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions xset/api/well_knowns.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ const (
// SubResourcePvcTemplateHashLabelKey is used to attach hash of pvc template to pvc subresource
SubResourcePvcTemplateHashLabelKey

// LastXStatusAnnotationKey is used to record the last status of a target by xset
LastXStatusAnnotationKey

// wellKnownCount is the number of XSetLabelAnnotationEnum
wellKnownCount
)
Expand Down Expand Up @@ -130,8 +127,6 @@ var defaultXSetLabelAnnotationManager = map[XSetLabelAnnotationEnum]string{
XExcludeIndicationLabelKey: appsv1alpha1.PodExcludeIndicationLabelKey,
SubResourcePvcTemplateLabelKey: appsv1alpha1.PvcTemplateLabelKey,
SubResourcePvcTemplateHashLabelKey: appsv1alpha1.PvcTemplateHashLabelKey,

LastXStatusAnnotationKey: appsv1alpha1.LastPodStatusAnnotationKey,
}

func NewXSetLabelAnnotationManager() XSetLabelAnnotationManager {
Expand Down
19 changes: 19 additions & 0 deletions xset/api/xset_controller_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

type XSetController interface {
Expand Down Expand Up @@ -95,3 +96,21 @@ type SubResourcePvcAdapter interface {
// SetXSpecVolumes sets spec.volumes to X object.
SetXSpecVolumes(object client.Object, pvcs []corev1.Volume)
}

// DecorationAdapter is used to manage decoration for XSet. Decoration should be a workload to manage patcher on X target.
// Once adapter is implemented, XSetController will (1) watch for decoration change, (2) patch effective decorations on
// X target when creating, (3) manage decoration update when decoration changed.
type DecorationAdapter interface {
// WatchDecoration allows controller to watch decoration change.
WatchDecoration(c controller.Controller) error
// GetDecorationGroupVersionKind returns decoration gvk.
GetDecorationGroupVersionKind() metav1.GroupVersionKind
// GetTargetCurrentDecorationRevisions returns decoration revision on target.
GetTargetCurrentDecorationRevisions(ctx context.Context, c client.Client, target client.Object) (string, error)
// GetTargetUpdatedDecorationRevisions returns decoration revision on target.
GetTargetUpdatedDecorationRevisions(ctx context.Context, c client.Client, target client.Object) (string, error)
// GetDecorationPatcherByRevisions returns patcher for decoration from revisions.
GetDecorationPatcherByRevisions(ctx context.Context, c client.Client, target client.Object, revision string) (func(client.Object) error, error)
// IsTargetDecorationChanged returns true if decoration on target is changed.
IsTargetDecorationChanged(currentRevision, updatedRevision string) (bool, error)
}
23 changes: 23 additions & 0 deletions xset/api/xset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type XSetSpec struct {
// +optional
ScaleStrategy ScaleStrategy `json:"scaleStrategy,omitempty"`

// NamigPolicy indicates the strategy detail that will be used for replica naming
// +optional
NamingStrategy *NamingStrategy `json:"namingStrategy,omitempty"`

// Indicate the number of histories to be conserved
// If unspecified, defaults to 20
// +optional
Expand Down Expand Up @@ -120,6 +124,25 @@ type ScaleStrategy struct {
OperationDelaySeconds *int32 `json:"operationDelaySeconds,omitempty"`
}

// TargetNamingSuffixPolicy indicates how a new pod name suffix part is generated.
type TargetNamingSuffixPolicy string

const (
// TargetNamingSuffixPolicyPersistentSequence uses persistent sequential numbers as pod name suffix.
TargetNamingSuffixPolicyPersistentSequence TargetNamingSuffixPolicy = "PersistentSequence"
// TargetNamingSuffixPolicyRandom uses collaset name as pod generateName, which is the prefix
// of pod name. Kubernetes then adds a random string as suffix after the generateName.
// This is defaulting policy.
TargetNamingSuffixPolicyRandom TargetNamingSuffixPolicy = "Random"
)

type NamingStrategy struct {
// TargetNamingSuffixPolicy is a string enumeration that determaines how pod name suffix will be generated.
// A collaset pod name contains two parts to be placed in a string formation %s-%s; the prefix is collaset
// name, and the suffix is determined by TargetNamingSuffixPolicy.
TargetNamingSuffixPolicy TargetNamingSuffixPolicy `json:"TargetNamingSuffixPolicy,omitempty"`
}

// UpdateStrategyType is a string enumeration type that enumerates
// all possible ways we can update a Target when updating application
type UpdateStrategyType string
Expand Down
2 changes: 1 addition & 1 deletion xset/revisionowner/revision_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *revisionOwner) GetInUsedRevisions(parent metav1.Object) (sets.String, e
res.Insert(status.UpdatedRevision)
res.Insert(status.CurrentRevision)

targets, err := r.TargetControl.GetFilteredTargets(context.TODO(), spec.Selector, xSetObject)
targets, _, err := r.TargetControl.GetFilteredTargets(context.TODO(), spec.Selector, xSetObject)
if err != nil {
return nil, err
}
Expand Down
133 changes: 79 additions & 54 deletions xset/synccontrols/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ func NewRealSyncControl(reconcileMixIn *mixin.ReconcilerMixin,
updateLifecycleAdapter, scaleInOpsLifecycleAdapter := opslifecycle.GetLifecycleAdapters(xsetController, xsetLabelAnnoManager, xsetMeta)

updateConfig := &UpdateConfig{
xsetController: xsetController,
xsetLabelAnnoMgr: xsetLabelAnnoManager,
client: reconcileMixIn.Client,
targetControl: xControl,
resourceContextControl: resourceContexts,
recorder: reconcileMixIn.Recorder,
XsetController: xsetController,
XsetLabelAnnoMgr: xsetLabelAnnoManager,
Client: reconcileMixIn.Client,
TargetControl: xControl,
ResourceContextControl: resourceContexts,
Recorder: reconcileMixIn.Recorder,

scaleInLifecycleAdapter: scaleInOpsLifecycleAdapter,
updateLifecycleAdapter: updateLifecycleAdapter,
cacheExpectations: cacheExpectations,
targetGVK: targetGVK,
CacheExpectations: cacheExpectations,
TargetGVK: targetGVK,
}
return &RealSyncControl{
ReconcilerMixin: *reconcileMixIn,
Expand Down Expand Up @@ -135,12 +135,19 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje
return false, fmt.Errorf("fail to get XSetSpec")
}

var err error
syncContext.FilteredTarget, err = r.xControl.GetFilteredTargets(ctx, xspec.Selector, instance)
filteredTargets, allTargets, err := r.xControl.GetFilteredTargets(ctx, xspec.Selector, instance)
if err != nil {
return false, fmt.Errorf("fail to get filtered Targets: %w", err)
}

if IsTargetNamingSuffixPolicyPersistentSequence(xspec) {
// for naming with persistent sequences suffix, targets with same name should not exist at same time
syncContext.FilteredTarget = allTargets
} else {
// for naming with random suffix, targets with random names can be created at same time
syncContext.FilteredTarget = filteredTargets
}

// sync subresource
// 1. list pvcs using ownerReference
// 2. adopt and retain orphaned pvcs according to PVC retention policy
Expand Down Expand Up @@ -173,7 +180,7 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje
}

// stateless case
var targetWrappers []*targetWrapper
var targetWrappers []*TargetWrapper
syncContext.CurrentIDs = sets.Int{}
idToReclaim := sets.Int{}
toDeleteTargetNames := sets.NewString(xspec.ScaleStrategy.TargetToDelete...)
Expand All @@ -199,7 +206,8 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje
}
}

if target.GetDeletionTimestamp() != nil {
// for naming with persistent sequences suffix, targets with same name should not exist at same time
if target.GetDeletionTimestamp() != nil && !IsTargetNamingSuffixPolicyPersistentSequence(xspec) {
// 1. Reclaim ID from Target which is scaling in and terminating.
if contextDetail, exist := ownedIDs[id]; exist && r.resourceContextControl.Contains(contextDetail, api.EnumScaleInContextDataKey, "true") {
idToReclaim.Insert(id)
Expand All @@ -220,7 +228,27 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje
}
}

targetWrappers = append(targetWrappers, &targetWrapper{
// sync decoration revisions
var decorationInfo DecorationInfo
if decorationAdapter, enabled := r.xsetController.(api.DecorationAdapter); enabled {
if decorationInfo.DecorationCurrentRevisions, err = decorationAdapter.GetTargetCurrentDecorationRevisions(ctx, r.Client, target); err != nil {
return false, err
}
if decorationInfo.DecorationUpdatedRevisions, err = decorationAdapter.GetTargetUpdatedDecorationRevisions(ctx, r.Client, target); err != nil {
return false, err
}
if decorationInfo.DecorationChanged, err = decorationAdapter.IsTargetDecorationChanged(decorationInfo.DecorationCurrentRevisions, decorationInfo.DecorationUpdatedRevisions); err != nil {
return false, err
}
}

// sync target ops priority
var opsPriority *api.OpsPriority
if opsPriority, err = r.xsetController.GetXOpsPriority(ctx, r.Client, target); err != nil {
return false, err
}

targetWrappers = append(targetWrappers, &TargetWrapper{
Object: target,
ID: id,
ContextDetail: ownedIDs[id],
Expand All @@ -229,8 +257,11 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje
ToDelete: toDelete,
ToExclude: toExclude,

IsDuringScaleInOps: opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, target),
IsDuringUpdateOps: opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.updateLifecycleAdapter, target),
IsDuringScaleInOps: opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, target),
IsDuringUpdateOps: opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.updateLifecycleAdapter, target),

DecorationInfo: decorationInfo,
OpsPriority: opsPriority,
})

if id >= 0 {
Expand Down Expand Up @@ -395,7 +426,7 @@ func (r *RealSyncControl) Replace(ctx context.Context, xsetObject api.XSetObject
syncContext.replacingMap = classifyTargetReplacingMapping(r.xsetLabelAnnoMgr, syncContext.activeTargets)
}()

needReplaceOriginTargets, needCleanLabelTargets, targetsNeedCleanLabels, needDeleteTargets := r.dealReplaceTargets(ctx, syncContext.FilteredTarget)
needReplaceOriginTargets, needCleanLabelTargets, targetsNeedCleanLabels, needDeleteTargets := r.dealReplaceTargets(ctx, syncContext.TargetWrappers)

// delete origin targets for replace
err = r.BatchDeleteTargetsByLabel(ctx, r.xControl, needDeleteTargets)
Expand Down Expand Up @@ -438,7 +469,7 @@ func (r *RealSyncControl) Replace(ctx context.Context, xsetObject api.XSetObject
if _, inUsed := syncContext.CurrentIDs[id]; inUsed {
continue
}
syncContext.TargetWrappers = append(syncContext.TargetWrappers, &targetWrapper{
syncContext.TargetWrappers = append(syncContext.TargetWrappers, &TargetWrapper{
ID: id,
Object: nil,
ContextDetail: contextDetail,
Expand Down Expand Up @@ -504,14 +535,32 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject,
}
// scale out new Targets with updatedRevision
// TODO use cache
// TODO decoration for target template
target, err := NewTargetFrom(r.xsetController, r.xsetLabelAnnoMgr, xsetObject, revision, availableIDContext.ID,
func(object client.Object) error {
if _, exist := r.resourceContextControl.Get(availableIDContext, api.EnumJustCreateContextDataKey); exist {
r.xsetLabelAnnoMgr.Set(object, api.XCreatingLabel, strconv.FormatInt(time.Now().UnixNano(), 10))
} else {
r.xsetLabelAnnoMgr.Set(object, api.XCompletingLabel, strconv.FormatInt(time.Now().UnixNano(), 10))
}

// decoration for target template
if decorationAdapter, ok := r.xsetController.(api.DecorationAdapter); ok {
revisionsInfo, ok := r.resourceContextControl.Get(availableIDContext, api.EnumTargetDecorationRevisionKey)
if !ok {
// get updated decoration revisions from target and write to resource context
if revisionsInfo, err = decorationAdapter.GetTargetUpdatedDecorationRevisions(ctx, r.Client, object); err != nil {
return err
}
r.resourceContextControl.Put(availableIDContext, api.EnumTargetDecorationRevisionKey, revisionsInfo)
needUpdateContext.Store(true)
}
// get patcher from decoration revisions and patch target
if fn, err := decorationAdapter.GetDecorationPatcherByRevisions(ctx, r.Client, object, revisionsInfo); err != nil {
return err
} else {
return fn(object)
}
}
return nil
},
r.xsetController.GetXSetTemplatePatcher(xsetObject),
Expand Down Expand Up @@ -553,14 +602,10 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject,
}

if diff <= 0 {
// get targets ops priority
if err := r.getTargetsOpsPriority(ctx, r.Client, syncContext.activeTargets); err != nil {
return false, recordedRequeueAfter, err
}
// chose the targets to scale in
targetsToScaleIn := r.getTargetsToDelete(xsetObject, syncContext.activeTargets, syncContext.replacingMap, diff*-1)
// filter out Targets need to trigger TargetOpsLifecycle
wrapperCh := make(chan *targetWrapper, len(targetsToScaleIn))
wrapperCh := make(chan *TargetWrapper, len(targetsToScaleIn))
for i := range targetsToScaleIn {
if targetsToScaleIn[i].IsDuringScaleInOps {
continue
Expand All @@ -575,7 +620,7 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject,

// trigger TargetOpsLifecycle with scaleIn OperationType
logger.V(1).Info("try to begin TargetOpsLifecycle for scaling in Target in XSet", "wrapper", ObjectKeyString(object))
if updated, err := opslifecycle.Begin(r.updateConfig.xsetLabelAnnoMgr, r.Client, r.scaleInLifecycleAdapter, object); err != nil {
if updated, err := opslifecycle.Begin(r.updateConfig.XsetLabelAnnoMgr, r.Client, r.scaleInLifecycleAdapter, object); err != nil {
return fmt.Errorf("fail to begin TargetOpsLifecycle for Scaling in Target %s/%s: %w", object.GetNamespace(), object.GetName(), err)
} else if updated {
wrapper.IsDuringScaleInOps = true
Expand All @@ -599,7 +644,7 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject,

needUpdateContext := false
for i, targetWrapper := range targetsToScaleIn {
requeueAfter, allowed := opslifecycle.AllowOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, ptr.Deref(spec.ScaleStrategy.OperationDelaySeconds, 0), targetWrapper.Object)
requeueAfter, allowed := opslifecycle.AllowOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, ptr.Deref(spec.ScaleStrategy.OperationDelaySeconds, 0), targetWrapper.Object)
if !allowed && targetWrapper.Object.GetDeletionTimestamp() == nil {
r.Recorder.Eventf(targetWrapper.Object, corev1.EventTypeNormal, "TargetScaleInLifecycle", "Target is not allowed to scale in")
continue
Expand Down Expand Up @@ -707,28 +752,23 @@ func (r *RealSyncControl) Update(ctx context.Context, xsetObject api.XSetObject,
var err error
var recordedRequeueAfter *time.Duration

// 0. get targets ops priority
if err := r.getTargetsOpsPriority(ctx, r.Client, syncContext.TargetWrappers); err != nil {
return false, recordedRequeueAfter, err
}

// 1. scan and analysis targets update info for active targets and PlaceHolder targets
targetUpdateInfos, err := r.attachTargetUpdateInfo(xsetObject, syncContext)
targetUpdateInfos, err := r.attachTargetUpdateInfo(ctx, xsetObject, syncContext)
if err != nil {
return false, recordedRequeueAfter, err
}

// 2. decide Target update candidates
candidates := r.decideTargetToUpdate(r.xsetController, xsetObject, targetUpdateInfos)
targetToUpdate := filterOutPlaceHolderUpdateInfos(candidates)
targetCh := make(chan *targetUpdateInfo, len(targetToUpdate))
targetCh := make(chan *TargetUpdateInfo, len(targetToUpdate))
updater := r.newTargetUpdater(xsetObject)
updating := false

// 3. filter already updated revision,
for i, targetInfo := range targetToUpdate {
// TODO check decoration and pvc template changed
if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged {
if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged && !targetInfo.DecorationChanged {
continue
}

Expand Down Expand Up @@ -865,7 +905,8 @@ func (r *RealSyncControl) CalculateStatus(_ context.Context, instance api.XSetOb

activeTargets := FilterOutActiveTargetWrappers(syncContext.TargetWrappers)
for _, targetWrapper := range activeTargets {
if targetWrapper.GetDeletionTimestamp() != nil {
// for naming with persistent sequences suffix, terminating targets can be shown in status
if targetWrapper.GetDeletionTimestamp() != nil && !IsTargetNamingSuffixPolicyPersistentSequence(r.xsetController.GetXSetSpec(instance)) {
continue
}

Expand All @@ -876,8 +917,8 @@ func (r *RealSyncControl) CalculateStatus(_ context.Context, instance api.XSetOb
updatedReplicas++
}

if opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, targetWrapper) ||
opslifecycle.IsDuringOps(r.updateConfig.xsetLabelAnnoMgr, r.updateLifecycleAdapter, targetWrapper) {
if opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.scaleInLifecycleAdapter, targetWrapper) ||
opslifecycle.IsDuringOps(r.updateConfig.XsetLabelAnnoMgr, r.updateLifecycleAdapter, targetWrapper) {
operatingReplicas++
}

Expand Down Expand Up @@ -989,25 +1030,9 @@ func (r *RealSyncControl) reclaimOwnedIDs(
return nil
}

// getTargetsOpsPriority try to set targets' ops priority
func (r *RealSyncControl) getTargetsOpsPriority(ctx context.Context, c client.Client, targets []*targetWrapper) error {
_, err := controllerutils.SlowStartBatch(len(targets), controllerutils.SlowStartInitialBatchSize, true, func(i int, _ error) error {
if targets[i].PlaceHolder || targets[i].Object == nil || targets[i].OpsPriority != nil {
return nil
}
var iErr error
targets[i].OpsPriority, iErr = r.xsetController.GetXOpsPriority(ctx, c, targets[i].Object)
if iErr != nil {
return fmt.Errorf("failed to get target %s/%s ops priority: %w", targets[i].Object.GetNamespace(), targets[i].Object.GetName(), iErr)
}
return nil
})
return err
}

// FilterOutActiveTargetWrappers filter out non placeholder targets
func FilterOutActiveTargetWrappers(targets []*targetWrapper) []*targetWrapper {
var filteredTargetWrappers []*targetWrapper
func FilterOutActiveTargetWrappers(targets []*TargetWrapper) []*TargetWrapper {
var filteredTargetWrappers []*TargetWrapper
for i, target := range targets {
if target.PlaceHolder {
continue
Expand Down
Loading
Loading