55package pgupgrade
66
77import (
8+ "cmp"
89 "context"
910 "fmt"
11+ "math"
1012 "strings"
1113
1214 appsv1 "k8s.io/api/apps/v1"
@@ -35,9 +37,16 @@ func pgUpgradeJob(upgrade *v1beta1.PGUpgrade) metav1.ObjectMeta {
3537
3638// upgradeCommand returns an entrypoint that prepares the filesystem for
3739// and performs a PostgreSQL major version upgrade using pg_upgrade.
38- func upgradeCommand (oldVersion , newVersion int , fetchKeyCommand string , availableCPUs int ) []string {
39- // Use multiple CPUs when three or more are available.
40- argJobs := fmt .Sprintf (` --jobs=%d` , max (1 , availableCPUs - 1 ))
40+ func upgradeCommand (spec * v1beta1.PGUpgradeSettings , fetchKeyCommand string ) []string {
41+ argJobs := fmt .Sprintf (` --jobs=%d` , max (1 , spec .Jobs ))
42+ argMethod := cmp .Or (map [string ]string {
43+ "Clone" : ` --clone` ,
44+ "Copy" : ` --copy` ,
45+ "CopyFileRange" : ` --copy-file-range` ,
46+ }[spec .TransferMethod ], ` --link` )
47+
48+ oldVersion := spec .FromPostgresVersion
49+ newVersion := spec .ToPostgresVersion
4150
4251 // if the fetch key command is set for TDE, provide the value during initialization
4352 initdb := `/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}"`
@@ -99,14 +108,14 @@ func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availabl
99108 `echo -e "Step 5: Running pg_upgrade check...\n"` ,
100109 `time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \` ,
101110 `--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\` ,
102- ` --new-datadir /pgdata/pg"${new_version}" --link -- check` + argJobs ,
111+ ` --new-datadir /pgdata/pg"${new_version}" --check` + argMethod + argJobs ,
103112
104113 // Assuming the check completes successfully, the pg_upgrade command will
105114 // be run that actually prepares the upgraded pgdata directory.
106115 `echo -e "\nStep 6: Running pg_upgrade...\n"` ,
107116 `time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \` ,
108117 `--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \` ,
109- `--new-datadir /pgdata/pg"${new_version}" --link` + argJobs ,
118+ `--new-datadir /pgdata/pg"${new_version}"` + argMethod + argJobs ,
110119
111120 // Since we have cleared the Patroni cluster step by removing the EndPoints, we copy patroni.dynamic.json
112121 // from the old data dir to help retain PostgreSQL parameters you had set before.
@@ -122,12 +131,12 @@ func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availabl
122131
123132// largestWholeCPU returns the maximum CPU request or limit as a non-negative
124133// integer of CPUs. When resources lacks any CPU, the result is zero.
125- func largestWholeCPU (resources corev1.ResourceRequirements ) int {
134+ func largestWholeCPU (resources corev1.ResourceRequirements ) int64 {
126135 // Read CPU quantities as millicores then divide to get the "floor."
127136 // NOTE: [resource.Quantity.Value] looks easier, but it rounds up.
128137 return max (
129- int ( resources .Limits .Cpu ().ScaledValue (resource .Milli )/ 1000 ) ,
130- int ( resources .Requests .Cpu ().ScaledValue (resource .Milli )/ 1000 ) ,
138+ resources .Limits .Cpu ().ScaledValue (resource .Milli )/ 1000 ,
139+ resources .Requests .Cpu ().ScaledValue (resource .Milli )/ 1000 ,
131140 0 )
132141}
133142
@@ -180,10 +189,12 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
180189 job .Spec .BackoffLimit = initialize .Int32 (0 )
181190 job .Spec .Template .Spec .RestartPolicy = corev1 .RestartPolicyNever
182191
183- // When enabled, calculate the number of CPUs for pg_upgrade.
184- wholeCPUs := 0
185- if feature .Enabled (ctx , feature .PGUpgradeCPUConcurrency ) {
186- wholeCPUs = largestWholeCPU (upgrade .Spec .Resources )
192+ settings := upgrade .Spec .PGUpgradeSettings .DeepCopy ()
193+
194+ // When jobs is undefined, use one less than the number of CPUs.
195+ if settings .Jobs == 0 && feature .Enabled (ctx , feature .PGUpgradeCPUConcurrency ) {
196+ wholeCPUs := int32 (min (math .MaxInt32 , largestWholeCPU (upgrade .Spec .Resources )))
197+ settings .Jobs = wholeCPUs - 1
187198 }
188199
189200 // Replace all containers with one that does the upgrade.
@@ -198,11 +209,7 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
198209 VolumeMounts : database .VolumeMounts ,
199210
200211 // Use our upgrade command and the specified image and resources.
201- Command : upgradeCommand (
202- upgrade .Spec .FromPostgresVersion ,
203- upgrade .Spec .ToPostgresVersion ,
204- fetchKeyCommand ,
205- wholeCPUs ),
212+ Command : upgradeCommand (settings , fetchKeyCommand ),
206213 Image : pgUpgradeContainerImage (upgrade ),
207214 ImagePullPolicy : upgrade .Spec .ImagePullPolicy ,
208215 Resources : upgrade .Spec .Resources ,
0 commit comments