Skip to content

Commit

Permalink
Fix the unified image setup during bootstrap when a custom environmen…
Browse files Browse the repository at this point in the history
…t variable is used for the zone (#2156)
  • Loading branch information
johscheuer authored Oct 24, 2024
1 parent e426237 commit 025f974
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 21 deletions.
5 changes: 2 additions & 3 deletions controllers/generate_initial_cluster_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (g generateInitialClusterFile) reconcile(ctx context.Context, r *Foundation
}

logger.Info("Generating initial cluster file")
r.Recorder.Event(cluster, corev1.EventTypeNormal, "ChangingCoordinators", "Choosing initial coordinators")
r.Recorder.Event(cluster, corev1.EventTypeNormal, "GenerateInitialCoordinators", "Choosing initial coordinators")

processCounts, err := cluster.GetProcessCountsWithDefaults()
if err != nil {
Expand Down Expand Up @@ -130,8 +130,7 @@ func (g generateInitialClusterFile) reconcile(ctx context.Context, r *Foundation
}

coordinators, err := locality.ChooseDistributedProcesses(cluster, processLocality, count, locality.ProcessSelectionConstraint{
HardLimits: locality.GetHardLimits(cluster),
SelectingCoordinators: true,
HardLimits: locality.GetHardLimits(cluster),
})
if err != nil {
return &requeue{curError: err}
Expand Down
3 changes: 3 additions & 0 deletions e2e/fixtures/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type ClusterConfig struct {
UseDNS bool
// If enabled the cluster will be setup with the unified image.
UseUnifiedImage *bool
// SimulateCustomFaultDomainEnv will simulate the use case that a user has set a custom environment variable to
// be used as zone ID.
SimulateCustomFaultDomainEnv bool
// CreationTracker if specified will be used to log the time between the creations steps.
CreationTracker CreationTrackerLogger
// Number of machines, this is used for calculating the number of Pods and is not correlated to the actual number
Expand Down
12 changes: 9 additions & 3 deletions e2e/fixtures/fdb_cluster_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ func (factory *Factory) createFDBClusterSpec(
imageType = fdbv1beta2.ImageTypeUnified
}

faultDomain := fdbv1beta2.FoundationDBClusterFaultDomain{
Key: "foundationdb.org/none",
}
if config.SimulateCustomFaultDomainEnv {
faultDomain.ValueFrom = "$" + fdbv1beta2.EnvNameInstanceID
faultDomain.Key = corev1.LabelHostname
}

return &fdbv1beta2.FoundationDBCluster{
ObjectMeta: metav1.ObjectMeta{
Name: config.Name,
Expand All @@ -64,9 +72,7 @@ func (factory *Factory) createFDBClusterSpec(
MainContainer: factory.GetMainContainerOverrides(config.DebugSymbols, useUnifiedImage),
ImageType: &imageType,
SidecarContainer: factory.GetSidecarContainerOverrides(config.DebugSymbols),
FaultDomain: fdbv1beta2.FoundationDBClusterFaultDomain{
Key: "foundationdb.org/none",
},
FaultDomain: faultDomain,
AutomationOptions: fdbv1beta2.FoundationDBClusterAutomationOptions{
// We have to wait long enough to ensure the operator is not recreating too many Pods at the same time.
WaitBetweenRemovalsSeconds: pointer.Int(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"log"
"time"

corev1 "k8s.io/api/core/v1"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
"github.com/FoundationDB/fdb-kubernetes-operator/e2e/fixtures"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -119,7 +121,7 @@ func performUpgrade(config testConfig, preUpgradeFunction func(cluster *fixtures
for _, processGroup := range cluster.Status.ProcessGroups {
missingTime := processGroup.GetConditionTime(fdbv1beta2.MissingProcesses)
// If the Pod is missing check if the fdbserver processes are running and check the logs of the fdb-kubernetes-monitor.
if missingTime != nil && time.Since(time.Unix(*missingTime, 0)) > 60*time.Second {
if missingTime != nil && time.Since(time.Unix(*missingTime, 0)) > 120*time.Second && !processGroup.IsMarkedForRemoval() && !processGroup.IsExcluded() {
log.Println("Missing process for:", processGroup.ProcessGroupID)
stdout, stderr, err := factory.ExecuteCmd(context.Background(), cluster.Namespace, processGroup.GetPodName(cluster), fdbv1beta2.MainContainerName, "ps aufx", true)
log.Println("stdout:", stdout, "stderr", stderr, "err", err)
Expand Down Expand Up @@ -336,4 +338,26 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
)

DescribeTable(
"upgrading a cluster with zone ID from a custom environment variable",
func(beforeVersion string, targetVersion string) {
performUpgrade(testConfig{
beforeVersion: beforeVersion,
targetVersion: targetVersion,
clusterConfig: &fixtures.ClusterConfig{
DebugSymbols: false,
SimulateCustomFaultDomainEnv: true,
},
loadData: false,
}, func(cluster *fixtures.FdbCluster) {
spec := cluster.GetCluster().Spec.DeepCopy()

Expect(spec.FaultDomain.Key).To(Equal(corev1.LabelHostname))
Expect(spec.FaultDomain.ValueFrom).To(HaveSuffix(fdbv1beta2.EnvNameInstanceID))
})
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
)
})
3 changes: 1 addition & 2 deletions internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ func selectCoordinatorsLocalities(logger logr.Logger, cluster *fdbv1beta2.Founda
}

coordinators, err := locality.ChooseDistributedProcesses(cluster, candidates, coordinatorCount, locality.ProcessSelectionConstraint{
HardLimits: locality.GetHardLimits(cluster),
SelectingCoordinators: true,
HardLimits: locality.GetHardLimits(cluster),
})

logger.Info("Current coordinators", "coordinators", coordinators, "error", err)
Expand Down
26 changes: 22 additions & 4 deletions internal/locality/locality.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/go-logr/logr"
"math"
"slices"
"strings"
)

// Info captures information about a process for the purposes of
Expand Down Expand Up @@ -114,6 +115,7 @@ func InfoForProcess(process fdbv1beta2.FoundationDBStatusProcessInfo, mainContai

// InfoFromSidecar converts the process information from the sidecar's
// context into locality info for selecting processes.
// This method is only used during the initial bootstrapping of the cluster when no fdbserver processes are running.
func InfoFromSidecar(cluster *fdbv1beta2.FoundationDBCluster, client podclient.FdbPodClient) (Info, error) {
substitutions, err := client.GetVariableSubstitutions()
if err != nil {
Expand All @@ -124,14 +126,33 @@ func InfoFromSidecar(cluster *fdbv1beta2.FoundationDBCluster, client podclient.F
return Info{}, nil
}

// Take the zone ID from the FDB_ZONE_ID if present.
zoneID, present := substitutions[fdbv1beta2.EnvNameZoneID]
if !present {
// If the FDB_ZONE_ID is not present, the user specified another environment variable that represents the
// zone ID.
var zoneVariable string
if strings.HasPrefix(cluster.Spec.FaultDomain.ValueFrom, "$") {
zoneVariable = cluster.Spec.FaultDomain.ValueFrom[1:]
} else {
zoneVariable = fdbv1beta2.EnvNameZoneID
}

zoneID = substitutions[zoneVariable]
}

if zoneID == "" {
return Info{}, errors.New("no zone ID found in Sidecar information")
}

// This locality information is only used during the initial cluster file generation.
// So it should be good to only use the first process address here.
// This has the implication that in the initial cluster file only the first processes will be used.
return Info{
ID: substitutions[fdbv1beta2.EnvNameInstanceID],
Address: cluster.GetFullAddress(substitutions[fdbv1beta2.EnvNamePublicIP], 1),
LocalityData: map[string]string{
fdbv1beta2.FDBLocalityZoneIDKey: substitutions[fdbv1beta2.EnvNameZoneID],
fdbv1beta2.FDBLocalityZoneIDKey: zoneID,
fdbv1beta2.FDBLocalityDNSNameKey: substitutions[fdbv1beta2.EnvNameDNSName],
},
}, nil
Expand Down Expand Up @@ -164,9 +185,6 @@ type ProcessSelectionConstraint struct {
// HardLimits defines a maximum number of processes to recruit on any single
// value for a given locality field.
HardLimits map[string]int

// SelectingCoordinators must be true when the ChooseDistributedProcesses is used to select coordinators.
SelectingCoordinators bool
}

// ChooseDistributedProcesses recruits a maximally well-distributed set of processes from a set of potential candidates.
Expand Down
107 changes: 101 additions & 6 deletions internal/locality/locality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,7 @@ var _ = Describe("Localities", func() {
BeforeEach(func() {
candidates = generateCandidates(dcIDs, 5, 5)
result, err = ChooseDistributedProcesses(cluster, candidates, cluster.DesiredCoordinatorCount(), ProcessSelectionConstraint{
HardLimits: GetHardLimits(cluster),
SelectingCoordinators: true,
HardLimits: GetHardLimits(cluster),
})
Expect(err).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -645,8 +644,7 @@ var _ = Describe("Localities", func() {
// Only measure the actual execution of ChooseDistributedProcesses.
experiment.MeasureDuration("ChooseDistributedProcesses", func() {
_, _ = ChooseDistributedProcesses(cluster, candidates, cluster.DesiredCoordinatorCount(), ProcessSelectionConstraint{
HardLimits: GetHardLimits(cluster),
SelectingCoordinators: true,
HardLimits: GetHardLimits(cluster),
})
})
// We'll sample the function up to 50 times or up to a minute, whichever comes first.
Expand Down Expand Up @@ -754,8 +752,7 @@ var _ = Describe("Localities", func() {
}

result, err = ChooseDistributedProcesses(cluster, candidates, cluster.DesiredCoordinatorCount(), ProcessSelectionConstraint{
HardLimits: GetHardLimits(cluster),
SelectingCoordinators: true,
HardLimits: GetHardLimits(cluster),
})
Expect(err).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -1055,6 +1052,104 @@ var _ = Describe("Localities", func() {
Info{},
true,
),
Entry("locality information is read from a different environment variables",
&fdbv1beta2.FoundationDBCluster{
Spec: fdbv1beta2.FoundationDBClusterSpec{
FaultDomain: fdbv1beta2.FoundationDBClusterFaultDomain{
Key: corev1.LabelHostname,
ValueFrom: "$CUSTOM_ENV",
},
},
Status: fdbv1beta2.FoundationDBClusterStatus{
RequiredAddresses: fdbv1beta2.RequiredAddressSet{
NonTLS: true,
},
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Labels: map[string]string{
fdbv1beta2.FDBProcessGroupIDLabel: "test",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foundationdb-kubernetes-sidecar",
Args: []string{
"--public-ip-family",
"4",
},
Env: []corev1.EnvVar{
{
Name: "CUSTOM_ENV",
Value: "custom-zone-id",
},
},
},
},
},
Status: corev1.PodStatus{
PodIPs: []corev1.PodIP{
{IP: "1.1.1.1"},
},
},
},
Info{
ID: "test",
Address: fdbv1beta2.ProcessAddress{
IPAddress: net.ParseIP("1.1.1.1"),
Port: 4501,
},
LocalityData: map[string]string{
fdbv1beta2.FDBLocalityZoneIDKey: "custom-zone-id",
fdbv1beta2.FDBLocalityDNSNameKey: "",
},
},
false,
),
Entry("locality information is read from a different environment variable which is missing",
&fdbv1beta2.FoundationDBCluster{
Spec: fdbv1beta2.FoundationDBClusterSpec{
FaultDomain: fdbv1beta2.FoundationDBClusterFaultDomain{
Key: corev1.LabelHostname,
ValueFrom: "$CUSTOM_ENV",
},
},
Status: fdbv1beta2.FoundationDBClusterStatus{
RequiredAddresses: fdbv1beta2.RequiredAddressSet{
NonTLS: true,
},
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Labels: map[string]string{
fdbv1beta2.FDBProcessGroupIDLabel: "test",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foundationdb-kubernetes-sidecar",
Args: []string{
"--public-ip-family",
"4",
},
},
},
},
Status: corev1.PodStatus{
PodIPs: []corev1.PodIP{
{IP: "1.1.1.1"},
},
},
},
Info{},
true,
),
)

Describe("checkCoordinatorValidity", func() {
Expand Down
3 changes: 1 addition & 2 deletions internal/pod_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,6 @@ func GetSubstitutionsFromClusterAndPod(logger logr.Logger, cluster *fdbv1beta2.F

if faultDomainSource == "spec.nodeName" {
substitutions[fdbv1beta2.EnvNameZoneID] = pod.Spec.NodeName
} else {
return nil, fmt.Errorf("unsupported fault domain source %s", faultDomainSource)
}
}

Expand All @@ -450,6 +448,7 @@ func GetSubstitutionsFromClusterAndPod(logger logr.Logger, cluster *fdbv1beta2.F
copyableSubstitutions := map[string]fdbv1beta2.None{
fdbv1beta2.EnvNameDNSName: {},
fdbv1beta2.EnvNameInstanceID: {},
"CUSTOM_ENV": {},
}
for _, container := range pod.Spec.Containers {
for _, envVar := range container.Env {
Expand Down

0 comments on commit 025f974

Please sign in to comment.