|  | 
| 4 | 4 | 	"context" | 
| 5 | 5 | 	"encoding/json" | 
| 6 | 6 | 	"fmt" | 
|  | 7 | +	"reflect" | 
|  | 8 | +	"slices" | 
| 7 | 9 | 	"sort" | 
| 8 | 10 | 	"strings" | 
| 9 | 11 | 	"sync" | 
| @@ -562,24 +564,27 @@ func (sc *syncContext) Sync() { | 
| 562 | 564 | 
 | 
| 563 | 565 | 	// remove any tasks not in this wave | 
| 564 | 566 | 	phase := tasks.phase() | 
| 565 |  | -	wave := tasks.wave() | 
| 566 |  | -	finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave() | 
|  | 567 | +	waves, wavesUseBinaryTreeOrdering := tasks.waves() | 
|  | 568 | +	lastWaves, lastWavesUseBinaryTreeOrdering := tasks.lastWaves() | 
|  | 569 | +	finalWaves := phase == tasks.lastPhase() && reflect.DeepEqual(waves, lastWaves) && wavesUseBinaryTreeOrdering == lastWavesUseBinaryTreeOrdering | 
| 567 | 570 | 
 | 
| 568 | 571 | 	// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful | 
| 569 | 572 | 	// EVEN if those objects subsequently degraded | 
| 570 | 573 | 	// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately. | 
| 571 |  | -	remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() }) | 
|  | 574 | +	remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || !slices.Contains(waves, t.wave()) || t.isHook() }) | 
| 572 | 575 | 
 | 
| 573 |  | -	sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") | 
| 574 |  | -	tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave }) | 
|  | 576 | +	sc.log.WithValues("phase", phase, "wave", waves, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") | 
|  | 577 | +	tasks = tasks.Filter(func(t *syncTask) bool { | 
|  | 578 | +		return t.phase == phase && slices.Contains(waves, t.wave()) && t.waveUseBinaryTreeOrdering() == wavesUseBinaryTreeOrdering | 
|  | 579 | +	}) | 
| 575 | 580 | 
 | 
| 576 | 581 | 	sc.setOperationPhase(common.OperationRunning, "one or more tasks are running") | 
| 577 | 582 | 
 | 
| 578 | 583 | 	sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run") | 
| 579 | 584 | 	runState := sc.runTasks(tasks, false) | 
| 580 | 585 | 
 | 
| 581 | 586 | 	if sc.syncWaveHook != nil && runState != failed { | 
| 582 |  | -		err := sc.syncWaveHook(phase, wave, finalWave) | 
|  | 587 | +		err := sc.syncWaveHook(phase, waves, finalWaves) | 
| 583 | 588 | 		if err != nil { | 
| 584 | 589 | 			sc.deleteHooks(hooksPendingDeletionFailed) | 
| 585 | 590 | 			sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err)) | 
| @@ -899,52 +904,133 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { | 
| 899 | 904 | 		} | 
| 900 | 905 | 	} | 
| 901 | 906 | 
 | 
| 902 |  | -	// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) | 
| 903 |  | -	pruneTasks := make(map[int][]*syncTask) | 
|  | 907 | +	// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order). | 
|  | 908 | +	// if all prune tasks use normal wave ordering, use the legacy method. Otherwise, use a binary tree wave ordering | 
|  | 909 | +	// on all prune tasks and modify the waves to decreasing power of 2. | 
|  | 910 | +	// For prune tasks which already use binary tree wave ordering, set an identical syncWave to tasks which | 
|  | 911 | +	// have the same level in a complete binary tree rooted at 1 where each node n has 2*n and 2*n+1 as children. | 
|  | 912 | + | 
|  | 913 | +	pruntTasksUsingNormalOrdering := make(map[int][]*syncTask) | 
| 904 | 914 | 	for _, task := range tasks { | 
| 905 |  | -		if task.isPrune() { | 
| 906 |  | -			pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) | 
|  | 915 | +		if task.isPrune() && task.waveUseBinaryTreeOrdering() == "false" { | 
|  | 916 | +			pruntTasksUsingNormalOrdering[task.wave()] = append(pruntTasksUsingNormalOrdering[task.wave()], task) | 
| 907 | 917 | 		} | 
| 908 | 918 | 	} | 
|  | 919 | +	var uniquePruneWavesUsingNormalOrdering []int | 
|  | 920 | +	for k := range pruntTasksUsingNormalOrdering { | 
|  | 921 | +		uniquePruneWavesUsingNormalOrdering = append(uniquePruneWavesUsingNormalOrdering, k) | 
|  | 922 | +	} | 
| 909 | 923 | 
 | 
| 910 |  | -	var uniquePruneWaves []int | 
| 911 |  | -	for k := range pruneTasks { | 
| 912 |  | -		uniquePruneWaves = append(uniquePruneWaves, k) | 
|  | 924 | +	sort.Ints(uniquePruneWavesUsingNormalOrdering) | 
|  | 925 | + | 
|  | 926 | +	pruneTasksUsingBinaryTreeOrdering := make(map[int][]*syncTask) | 
|  | 927 | +	for _, task := range tasks { | 
|  | 928 | +		if task.isPrune() && task.waveUseBinaryTreeOrdering() == "true" { | 
|  | 929 | +			pruneTasksUsingBinaryTreeOrdering[task.wave()] = append(pruneTasksUsingBinaryTreeOrdering[task.wave()], task) | 
|  | 930 | +		} | 
| 913 | 931 | 	} | 
| 914 |  | -	sort.Ints(uniquePruneWaves) | 
| 915 | 932 | 
 | 
| 916 |  | -	// reorder waves for pruning tasks using symmetric swap on prune waves | 
| 917 |  | -	n := len(uniquePruneWaves) | 
| 918 |  | -	for i := 0; i < n/2; i++ { | 
| 919 |  | -		// waves to swap | 
| 920 |  | -		startWave := uniquePruneWaves[i] | 
| 921 |  | -		endWave := uniquePruneWaves[n-1-i] | 
|  | 933 | +	if len(pruneTasksUsingBinaryTreeOrdering) > 0 { | 
|  | 934 | +		var uniquePruneWavesUsingBinaryTreeOrdering []int | 
|  | 935 | +		for k := range pruneTasksUsingBinaryTreeOrdering { | 
|  | 936 | +			uniquePruneWavesUsingBinaryTreeOrdering = append(uniquePruneWavesUsingBinaryTreeOrdering, k) | 
|  | 937 | +		} | 
|  | 938 | +		sort.Ints(uniquePruneWavesUsingBinaryTreeOrdering) | 
| 922 | 939 | 
 | 
| 923 |  | -		for _, task := range pruneTasks[startWave] { | 
| 924 |  | -			task.waveOverride = &endWave | 
|  | 940 | +		pruneTasksWavesValues := []int{0} | 
|  | 941 | +		for i := 1; i < len(uniquePruneWavesUsingNormalOrdering); i++ { | 
|  | 942 | +			pruneTasksWavesValues = append(pruneTasksWavesValues, i) | 
|  | 943 | +		} | 
|  | 944 | +		nextPotentialWaveValue := len(uniquePruneWavesUsingNormalOrdering) | 
|  | 945 | +		if len(uniquePruneWavesUsingNormalOrdering) != 0 { | 
|  | 946 | +			pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) | 
|  | 947 | +		} | 
|  | 948 | +		for i := 1; i < len(uniquePruneWavesUsingBinaryTreeOrdering); i++ { | 
|  | 949 | +			currentWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i]) | 
|  | 950 | +			previousWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i-1]) | 
|  | 951 | +			if currentWaveValue == previousWaveValue { | 
|  | 952 | +				pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) | 
|  | 953 | +			} else { | 
|  | 954 | +				nextPotentialWaveValue++ | 
|  | 955 | +				pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) | 
|  | 956 | +			} | 
| 925 | 957 | 		} | 
| 926 | 958 | 
 | 
| 927 |  | -		for _, task := range pruneTasks[endWave] { | 
| 928 |  | -			task.waveOverride = &startWave | 
|  | 959 | +		pruneTasksWavesNewValues := PowInt(2, pruneTasksWavesValues[len(pruneTasksWavesValues)-1]) | 
|  | 960 | +		newPruneWaves := []int{pruneTasksWavesNewValues} | 
|  | 961 | +		for i := 1; i < len(pruneTasksWavesValues); i++ { | 
|  | 962 | +			if pruneTasksWavesValues[i] == pruneTasksWavesValues[i-1] { | 
|  | 963 | +				newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues) | 
|  | 964 | +			} else { | 
|  | 965 | +				pruneTasksWavesNewValues /= 2 | 
|  | 966 | +				newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues) | 
|  | 967 | +			} | 
|  | 968 | +		} | 
|  | 969 | + | 
|  | 970 | +		syncTaskUseBinaryTreeOrdering := "true" | 
|  | 971 | + | 
|  | 972 | +		for i := range uniquePruneWavesUsingNormalOrdering { | 
|  | 973 | +			// tasks using normal wave ordering to reorder | 
|  | 974 | +			iWave := uniquePruneWavesUsingNormalOrdering[i] | 
|  | 975 | + | 
|  | 976 | +			for _, task := range pruntTasksUsingNormalOrdering[iWave] { | 
|  | 977 | +				task.waveOverride = &newPruneWaves[i] | 
|  | 978 | +				task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering | 
|  | 979 | +			} | 
|  | 980 | +		} | 
|  | 981 | + | 
|  | 982 | +		n := len(uniquePruneWavesUsingNormalOrdering) | 
|  | 983 | +		for i := range uniquePruneWavesUsingBinaryTreeOrdering { | 
|  | 984 | +			// tasks using binary tree wave ordering to reorder | 
|  | 985 | +			iWave := uniquePruneWavesUsingBinaryTreeOrdering[i] | 
|  | 986 | + | 
|  | 987 | +			for _, task := range pruneTasksUsingBinaryTreeOrdering[iWave] { | 
|  | 988 | +				task.waveOverride = &(newPruneWaves[n+i]) | 
|  | 989 | +				task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering | 
|  | 990 | +			} | 
|  | 991 | +		} | 
|  | 992 | +	} else { | 
|  | 993 | +		// reorder waves for pruning tasks using symmetric swap on prune waves | 
|  | 994 | +		n := len(uniquePruneWavesUsingNormalOrdering) | 
|  | 995 | +		for i := 0; i < n/2; i++ { | 
|  | 996 | +			// waves to swap | 
|  | 997 | +			startWave := uniquePruneWavesUsingNormalOrdering[i] | 
|  | 998 | +			endWave := uniquePruneWavesUsingNormalOrdering[n-1-i] | 
|  | 999 | + | 
|  | 1000 | +			for _, task := range pruntTasksUsingNormalOrdering[startWave] { | 
|  | 1001 | +				task.waveOverride = &endWave | 
|  | 1002 | +			} | 
|  | 1003 | + | 
|  | 1004 | +			for _, task := range pruntTasksUsingNormalOrdering[endWave] { | 
|  | 1005 | +				task.waveOverride = &startWave | 
|  | 1006 | +			} | 
| 929 | 1007 | 		} | 
| 930 | 1008 | 	} | 
| 931 | 1009 | 
 | 
| 932 | 1010 | 	// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 | 
| 933 | 1011 | 	// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave | 
| 934 | 1012 | 	syncPhaseLastWave := 0 | 
|  | 1013 | +	syncPhaseLastWaveUseBinaryTreeOrdering := "false" | 
| 935 | 1014 | 	for _, task := range tasks { | 
| 936 | 1015 | 		if task.phase == common.SyncPhaseSync { | 
| 937 | 1016 | 			if task.wave() > syncPhaseLastWave { | 
| 938 | 1017 | 				syncPhaseLastWave = task.wave() | 
|  | 1018 | +				syncPhaseLastWaveUseBinaryTreeOrdering = task.waveUseBinaryTreeOrdering() | 
| 939 | 1019 | 			} | 
| 940 | 1020 | 		} | 
| 941 | 1021 | 	} | 
| 942 |  | -	syncPhaseLastWave = syncPhaseLastWave + 1 | 
|  | 1022 | + | 
|  | 1023 | +	if syncPhaseLastWaveUseBinaryTreeOrdering == "false" { | 
|  | 1024 | +		syncPhaseLastWave++ | 
|  | 1025 | +	} else { | 
|  | 1026 | +		syncPhaseLastWave *= 2 | 
|  | 1027 | +	} | 
| 943 | 1028 | 
 | 
| 944 | 1029 | 	for _, task := range tasks { | 
| 945 | 1030 | 		if task.isPrune() && | 
| 946 | 1031 | 			(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) { | 
| 947 | 1032 | 			task.waveOverride = &syncPhaseLastWave | 
|  | 1033 | +			task.waveUseBinaryTreeOrderingOverride = &syncPhaseLastWaveUseBinaryTreeOrdering | 
| 948 | 1034 | 		} | 
| 949 | 1035 | 	} | 
| 950 | 1036 | 
 | 
|  | 
0 commit comments