Skip to content

Commit ef1eae0

Browse files
committed
Allow more control over the arguments to pg_upgrade
This also enables the PGUpgradeCPUConcurrency feature by default. Users may set PGUpgrade.spec.jobs to disable that behavior case by case. Issue: PGO-2022 Resolves: #4039
1 parent 85636a8 commit ef1eae0

File tree

12 files changed

+165
-111
lines changed

12 files changed

+165
-111
lines changed

config/crd/bases/postgres-operator.crunchydata.com_pgupgrades.yaml

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,7 @@ spec:
963963
type: object
964964
fromPostgresVersion:
965965
description: The major version of PostgreSQL before the upgrade.
966+
format: int32
966967
maximum: 17
967968
minimum: 11
968969
type: integer
@@ -984,7 +985,7 @@ spec:
984985
description: |-
985986
The image pull secrets used to pull from a private registry.
986987
Changing this value causes all running PGUpgrade pods to restart.
987-
https://k8s.io/docs/tasks/configure-pod-container/pull-image-private-registry/
988+
https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry
988989
items:
989990
description: |-
990991
LocalObjectReference contains enough information to let you locate the
@@ -1002,6 +1003,13 @@ spec:
10021003
type: object
10031004
x-kubernetes-map-type: atomic
10041005
type: array
1006+
jobs:
1007+
description: |-
1008+
The number of simultaneous processes pg_upgrade should use.
1009+
More info: https://www.postgresql.org/docs/current/pgupgrade.html
1010+
format: int32
1011+
minimum: 0
1012+
type: integer
10051013
metadata:
10061014
description: Metadata contains metadata for custom resources
10071015
properties:
@@ -1015,14 +1023,14 @@ spec:
10151023
type: object
10161024
type: object
10171025
postgresClusterName:
1018-
description: The name of the cluster to be updated
1026+
description: The name of the Postgres cluster to upgrade.
10191027
minLength: 1
10201028
type: string
10211029
priorityClassName:
10221030
description: |-
10231031
Priority class name for the PGUpgrade pod. Changing this
10241032
value causes PGUpgrade pod to restart.
1025-
More info: https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/
1033+
More info: https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption
10261034
type: string
10271035
resources:
10281036
description: Resource requirements for the PGUpgrade container.
@@ -1083,13 +1091,9 @@ spec:
10831091
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
10841092
type: object
10851093
type: object
1086-
toPostgresImage:
1087-
description: |-
1088-
The image name to use for PostgreSQL containers after upgrade.
1089-
When omitted, the value comes from an operator environment variable.
1090-
type: string
10911094
toPostgresVersion:
10921095
description: The major version of PostgreSQL to be upgraded to.
1096+
format: int32
10931097
maximum: 17
10941098
minimum: 11
10951099
type: integer
@@ -1134,11 +1138,29 @@ spec:
11341138
type: string
11351139
type: object
11361140
type: array
1141+
transferMethod:
1142+
description: |-
1143+
The method pg_upgrade should use to transfer files to the new cluster.
1144+
More info: https://www.postgresql.org/docs/current/pgupgrade.html
1145+
enum:
1146+
- Clone
1147+
- Copy
1148+
- CopyFileRange
1149+
- Link
1150+
maxLength: 15
1151+
type: string
11371152
required:
11381153
- fromPostgresVersion
11391154
- postgresClusterName
11401155
- toPostgresVersion
11411156
type: object
1157+
x-kubernetes-validations:
1158+
- message: Only Copy or Link before PostgreSQL 12
1159+
rule: '!has(self.transferMethod) || (self.toPostgresVersion < 12 ? self.transferMethod
1160+
in ["Copy","Link"] : true)'
1161+
- message: Only Clone, Copy, or Link before PostgreSQL 17
1162+
rule: '!has(self.transferMethod) || (self.toPostgresVersion < 17 ? self.transferMethod
1163+
in ["Clone","Copy","Link"] : true)'
11421164
status:
11431165
description: PGUpgradeStatus defines the observed state of PGUpgrade
11441166
properties:

internal/controller/pgupgrade/jobs.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
package pgupgrade
66

77
import (
8+
"cmp"
89
"context"
910
"fmt"
11+
"math"
1012
"strings"
1113

1214
appsv1 "k8s.io/api/apps/v1"
@@ -35,9 +37,16 @@ func pgUpgradeJob(upgrade *v1beta1.PGUpgrade) metav1.ObjectMeta {
3537

3638
// upgradeCommand returns an entrypoint that prepares the filesystem for
3739
// and performs a PostgreSQL major version upgrade using pg_upgrade.
38-
func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availableCPUs int) []string {
39-
// Use multiple CPUs when three or more are available.
40-
argJobs := fmt.Sprintf(` --jobs=%d`, max(1, availableCPUs-1))
40+
func upgradeCommand(spec *v1beta1.PGUpgradeSettings, fetchKeyCommand string) []string {
41+
argJobs := fmt.Sprintf(` --jobs=%d`, max(1, spec.Jobs))
42+
argMethod := cmp.Or(map[string]string{
43+
"Clone": ` --clone`,
44+
"Copy": ` --copy`,
45+
"CopyFileRange": ` --copy-file-range`,
46+
}[spec.TransferMethod], ` --link`)
47+
48+
oldVersion := spec.FromPostgresVersion
49+
newVersion := spec.ToPostgresVersion
4150

4251
// if the fetch key command is set for TDE, provide the value during initialization
4352
initdb := `/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}"`
@@ -99,14 +108,14 @@ func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availabl
99108
`echo -e "Step 5: Running pg_upgrade check...\n"`,
100109
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
101110
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\`,
102-
` --new-datadir /pgdata/pg"${new_version}" --link --check` + argJobs,
111+
` --new-datadir /pgdata/pg"${new_version}" --check` + argMethod + argJobs,
103112

104113
// Assuming the check completes successfully, the pg_upgrade command will
105114
// be run that actually prepares the upgraded pgdata directory.
106115
`echo -e "\nStep 6: Running pg_upgrade...\n"`,
107116
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
108117
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \`,
109-
`--new-datadir /pgdata/pg"${new_version}" --link` + argJobs,
118+
`--new-datadir /pgdata/pg"${new_version}"` + argMethod + argJobs,
110119

111120
// Since we have cleared the Patroni cluster step by removing the EndPoints, we copy patroni.dynamic.json
112121
// from the old data dir to help retain PostgreSQL parameters you had set before.
@@ -122,12 +131,12 @@ func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availabl
122131

123132
// largestWholeCPU returns the maximum CPU request or limit as a non-negative
124133
// integer of CPUs. When resources lacks any CPU, the result is zero.
125-
func largestWholeCPU(resources corev1.ResourceRequirements) int {
134+
func largestWholeCPU(resources corev1.ResourceRequirements) int64 {
126135
// Read CPU quantities as millicores then divide to get the "floor."
127136
// NOTE: [resource.Quantity.Value] looks easier, but it rounds up.
128137
return max(
129-
int(resources.Limits.Cpu().ScaledValue(resource.Milli)/1000),
130-
int(resources.Requests.Cpu().ScaledValue(resource.Milli)/1000),
138+
resources.Limits.Cpu().ScaledValue(resource.Milli)/1000,
139+
resources.Requests.Cpu().ScaledValue(resource.Milli)/1000,
131140
0)
132141
}
133142

@@ -180,10 +189,12 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
180189
job.Spec.BackoffLimit = initialize.Int32(0)
181190
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
182191

183-
// When enabled, calculate the number of CPUs for pg_upgrade.
184-
wholeCPUs := 0
185-
if feature.Enabled(ctx, feature.PGUpgradeCPUConcurrency) {
186-
wholeCPUs = largestWholeCPU(upgrade.Spec.Resources)
192+
settings := upgrade.Spec.PGUpgradeSettings.DeepCopy()
193+
194+
// When jobs is undefined, use one less than the number of CPUs.
195+
if settings.Jobs == 0 && feature.Enabled(ctx, feature.PGUpgradeCPUConcurrency) {
196+
wholeCPUs := int32(min(math.MaxInt32, largestWholeCPU(upgrade.Spec.Resources)))
197+
settings.Jobs = wholeCPUs - 1
187198
}
188199

189200
// Replace all containers with one that does the upgrade.
@@ -198,11 +209,7 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
198209
VolumeMounts: database.VolumeMounts,
199210

200211
// Use our upgrade command and the specified image and resources.
201-
Command: upgradeCommand(
202-
upgrade.Spec.FromPostgresVersion,
203-
upgrade.Spec.ToPostgresVersion,
204-
fetchKeyCommand,
205-
wholeCPUs),
212+
Command: upgradeCommand(settings, fetchKeyCommand),
206213
Image: pgUpgradeContainerImage(upgrade),
207214
ImagePullPolicy: upgrade.Spec.ImagePullPolicy,
208215
Resources: upgrade.Spec.Resources,

internal/controller/pgupgrade/jobs_test.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ import (
2323
)
2424

2525
func TestLargestWholeCPU(t *testing.T) {
26-
assert.Equal(t, 0,
26+
assert.Equal(t, int64(0),
2727
largestWholeCPU(corev1.ResourceRequirements{}),
2828
"expected the zero value to be zero")
2929

3030
for _, tt := range []struct {
3131
Name, ResourcesYAML string
32-
Result int
32+
Result int64
3333
}{
3434
{
3535
Name: "Negatives", ResourcesYAML: `{requests: {cpu: -3}, limits: {cpu: -5}}`,
@@ -72,27 +72,53 @@ func TestUpgradeCommand(t *testing.T) {
7272
})
7373
}
7474

75-
t.Run("CPUs", func(t *testing.T) {
75+
t.Run("Jobs", func(t *testing.T) {
7676
for _, tt := range []struct {
77-
CPUs int
78-
Jobs string
77+
Spec int32
78+
Args string
7979
}{
80-
{CPUs: 0, Jobs: "--jobs=1"},
81-
{CPUs: 1, Jobs: "--jobs=1"},
82-
{CPUs: 2, Jobs: "--jobs=1"},
83-
{CPUs: 3, Jobs: "--jobs=2"},
84-
{CPUs: 10, Jobs: "--jobs=9"},
80+
{Spec: -1, Args: "--jobs=1"},
81+
{Spec: 0, Args: "--jobs=1"},
82+
{Spec: 1, Args: "--jobs=1"},
83+
{Spec: 2, Args: "--jobs=2"},
84+
{Spec: 10, Args: "--jobs=10"},
8585
} {
86-
command := upgradeCommand(10, 11, "", tt.CPUs)
86+
spec := &v1beta1.PGUpgradeSettings{Jobs: tt.Spec}
87+
command := upgradeCommand(spec, "")
8788
assert.Assert(t, len(command) > 3)
8889
assert.DeepEqual(t, []string{"bash", "-ceu", "--"}, command[:3])
8990

9091
script := command[3]
91-
assert.Assert(t, cmp.Contains(script, tt.Jobs))
92+
assert.Assert(t, cmp.Contains(script, tt.Args))
9293

9394
expectScript(t, script)
9495
}
9596
})
97+
98+
t.Run("Method", func(t *testing.T) {
99+
for _, tt := range []struct {
100+
Spec string
101+
Args string
102+
}{
103+
{Spec: "", Args: "--link"},
104+
{Spec: "mystery!", Args: "--link"},
105+
{Spec: "Link", Args: "--link"},
106+
{Spec: "Clone", Args: "--clone"},
107+
{Spec: "Copy", Args: "--copy"},
108+
{Spec: "CopyFileRange", Args: "--copy-file-range"},
109+
} {
110+
spec := &v1beta1.PGUpgradeSettings{TransferMethod: tt.Spec}
111+
command := upgradeCommand(spec, "")
112+
assert.Assert(t, len(command) > 3)
113+
assert.DeepEqual(t, []string{"bash", "-ceu", "--"}, command[:3])
114+
115+
script := command[3]
116+
assert.Assert(t, cmp.Contains(script, tt.Args))
117+
118+
expectScript(t, script)
119+
}
120+
121+
})
96122
}
97123

98124
func TestGenerateUpgradeJob(t *testing.T) {
@@ -194,7 +220,7 @@ spec:
194220
echo -e "Step 5: Running pg_upgrade check...\n"
195221
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
196222
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\
197-
--new-datadir /pgdata/pg"${new_version}" --link --check --jobs=1
223+
--new-datadir /pgdata/pg"${new_version}" --check --link --jobs=1
198224
echo -e "\nStep 6: Running pg_upgrade...\n"
199225
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
200226
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \

internal/controller/pgupgrade/pgupgrade_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ func (r *PGUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
418418
// Set the cluster status when we know the upgrade has completed successfully.
419419
// This will serve to help the user see that the upgrade has completed if they
420420
// are only watching the PostgresCluster
421-
patch.Status.PostgresVersion = upgrade.Spec.ToPostgresVersion
421+
patch.Status.PostgresVersion = int(upgrade.Spec.ToPostgresVersion)
422422

423423
// Set the pgBackRest status for bootstrapping
424424
patch.Status.PGBackRest.Repos = []v1beta1.RepoStatus{}

internal/feature/features.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func NewGate() MutableGate {
113113
OpenTelemetryLogs: {Default: false, PreRelease: featuregate.Alpha},
114114
OpenTelemetryMetrics: {Default: false, PreRelease: featuregate.Alpha},
115115
PGBouncerSidecars: {Default: false, PreRelease: featuregate.Alpha},
116-
PGUpgradeCPUConcurrency: {Default: false, PreRelease: featuregate.Alpha},
116+
PGUpgradeCPUConcurrency: {Default: true, PreRelease: featuregate.Beta},
117117
TablespaceVolumes: {Default: false, PreRelease: featuregate.Alpha},
118118
VolumeSnapshots: {Default: false, PreRelease: featuregate.Alpha},
119119
}); err != nil {

internal/feature/features_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestDefaults(t *testing.T) {
2424
assert.Assert(t, false == gate.Enabled(OpenTelemetryLogs))
2525
assert.Assert(t, false == gate.Enabled(OpenTelemetryMetrics))
2626
assert.Assert(t, false == gate.Enabled(PGBouncerSidecars))
27-
assert.Assert(t, false == gate.Enabled(PGUpgradeCPUConcurrency))
27+
assert.Assert(t, true == gate.Enabled(PGUpgradeCPUConcurrency))
2828
assert.Assert(t, false == gate.Enabled(TablespaceVolumes))
2929
assert.Assert(t, false == gate.Enabled(VolumeSnapshots))
3030
}

internal/upgradecheck/http_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestCheckForUpgrades(t *testing.T) {
6767
assert.Equal(t, data.RegistrationToken, "speakFriend")
6868
assert.Equal(t, data.BridgeClustersTotal, 2)
6969
assert.Equal(t, data.PGOClustersTotal, 2)
70-
assert.Equal(t, data.FeatureGatesEnabled, "AutoCreateUserSchema=true,TablespaceVolumes=true")
70+
assert.Equal(t, data.FeatureGatesEnabled, "AutoCreateUserSchema=true,PGUpgradeCPUConcurrency=true,TablespaceVolumes=true")
7171
}
7272

7373
t.Run("success", func(t *testing.T) {

0 commit comments

Comments
 (0)