From 2bdea15aafcacfa3a8949c508deb4d0b9a9c78a5 Mon Sep 17 00:00:00 2001 From: d-kuro Date: Wed, 3 Jul 2024 11:58:52 +0900 Subject: [PATCH] Add statefulset Partition controller Signed-off-by: d-kuro --- api/v1beta2/statefulset_webhhok.go | 99 ++++++ api/v1beta2/statefulset_webhhok_test.go | 170 +++++++++ api/v1beta2/webhook_suite_test.go | 2 + api/v1beta2/zz_generated.deepcopy.go | 15 + .../moco/templates/generated/generated.yaml | 25 ++ cmd/moco-controller/cmd/run.go | 14 + config/rbac/role.yaml | 6 + config/webhook/manifests.yaml | 19 + controllers/mysqlcluster_controller.go | 11 + controllers/mysqlcluster_controller_test.go | 2 + controllers/partition_controller.go | 327 ++++++++++++++++++ controllers/partition_controller_test.go | 237 +++++++++++++ docs/SUMMARY.md | 1 + docs/metrics.md | 3 + docs/rolling-update-strategy.md | 85 +++++ e2e/partition_test.go | 281 +++++++++++++++ e2e/testdata/partition.yaml | 28 ++ e2e/testdata/partition_changed.yaml | 23 ++ .../partition_force_rollingupdate.yaml | 25 ++ .../partition_image_pull_backoff.yaml | 23 ++ pkg/constants/meta.go | 1 + pkg/metrics/metrics.go | 28 ++ 22 files changed, 1425 insertions(+) create mode 100644 api/v1beta2/statefulset_webhhok.go create mode 100644 api/v1beta2/statefulset_webhhok_test.go create mode 100644 controllers/partition_controller.go create mode 100644 controllers/partition_controller_test.go create mode 100644 docs/rolling-update-strategy.md create mode 100644 e2e/partition_test.go create mode 100644 e2e/testdata/partition.yaml create mode 100644 e2e/testdata/partition_changed.yaml create mode 100644 e2e/testdata/partition_force_rollingupdate.yaml create mode 100644 e2e/testdata/partition_image_pull_backoff.yaml diff --git a/api/v1beta2/statefulset_webhhok.go b/api/v1beta2/statefulset_webhhok.go new file mode 100644 index 000000000..ab3e0bf9c --- /dev/null +++ b/api/v1beta2/statefulset_webhhok.go @@ -0,0 +1,99 @@ +package v1beta2 + +import ( + "context" + "fmt" + + "github.com/cybozu-go/moco/pkg/constants" + admissionv1 "k8s.io/api/admission/v1" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +func SetupStatefulSetWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&appsv1.StatefulSet{}). + WithDefaulter(&StatefulSetDefaulter{}). + Complete() +} + +//+kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps,resources=statefulsets,verbs=update,versions=v1,name=statefulset.kb.io,admissionReviewVersions=v1 + +type StatefulSetDefaulter struct{} + +var _ admission.CustomDefaulter = &StatefulSetDefaulter{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type +func (*StatefulSetDefaulter) Default(ctx context.Context, obj runtime.Object) error { + sts, ok := obj.(*appsv1.StatefulSet) + if !ok { + return fmt.Errorf("unknown obj type %T", obj) + } + + req, err := admission.RequestFromContext(ctx) + if err != nil { + return fmt.Errorf("failed to get admission request from context: %w", err) + } + + if req.Operation != admissionv1.Update { + return nil + } + + if len(sts.OwnerReferences) != 1 { + return nil + } + + if sts.OwnerReferences[0].Kind != "MySQLCluster" && sts.OwnerReferences[0].APIVersion != GroupVersion.String() { + return nil + } + + if sts.Annotations[constants.AnnForceRollingUpdate] == "true" { + sts.Spec.UpdateStrategy.RollingUpdate = nil + return nil + } + + if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil { + sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: ptr.To[int32](*sts.Spec.Replicas), + } + return nil + } + + oldSts, err := readStatefulSet(req.OldObject.Raw) + if err != nil { + return fmt.Errorf("failed to read old statefulset: %w", err) + } + + partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition + oldPartition := *oldSts.Spec.UpdateStrategy.RollingUpdate.Partition + + newSts := sts.DeepCopy() + newSts.Spec.UpdateStrategy = oldSts.Spec.UpdateStrategy + + if partition != oldPartition && equality.Semantic.DeepEqual(newSts.Spec, oldSts.Spec) { + return nil + } + + sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: ptr.To[int32](*sts.Spec.Replicas), + } + + return nil +} + +func readStatefulSet(raw []byte) (*appsv1.StatefulSet, error) { + var sts appsv1.StatefulSet + + if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, &sts); err != nil { + return nil, err + } + + sts.TypeMeta.APIVersion = appsv1.SchemeGroupVersion.Group + "/" + appsv1.SchemeGroupVersion.Version + + return &sts, nil +} diff --git a/api/v1beta2/statefulset_webhhok_test.go b/api/v1beta2/statefulset_webhhok_test.go new file mode 100644 index 000000000..f94878805 --- /dev/null +++ b/api/v1beta2/statefulset_webhhok_test.go @@ -0,0 +1,170 @@ +package v1beta2_test + +import ( + "context" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/cybozu-go/moco/pkg/constants" +) + +func makeStatefulSet() *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "moco.cybozu.com/v1beta2", + Kind: "MySQLCluster", + Name: "test", + UID: "uid", + }, + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](3), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "mysql", + Image: "mysql:examle", + }, + }, + }, + }, + }, + } +} + +func deleteStatefulSet() error { + r := &appsv1.StatefulSet{} + err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "default", Name: "test"}, r) + if apierrors.IsNotFound(err) { + return nil + } + + if err != nil { + return err + } + + r.Finalizers = nil + if err := k8sClient.Update(ctx, r); err != nil { + return err + } + + if err := k8sClient.Delete(ctx, r); err != nil { + return err + } + + return nil +} + +var _ = Describe("MySQLCluster Webhook", func() { + ctx := context.TODO() + + BeforeEach(func() { + err := deleteStatefulSet() + Expect(err).NotTo(HaveOccurred()) + }) + + It("should not set partition when creating StatefulSet", func() { + r := makeStatefulSet() + err := k8sClient.Create(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate).To(BeNil()) + }) + + It("should set partition when updating StatefulSet", func() { + r := makeStatefulSet() + err := k8sClient.Create(ctx, r) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas)) + }) + + It("should not set partition when forcing updating StatefulSet", func() { + r := makeStatefulSet() + err := k8sClient.Create(ctx, r) + Expect(err).NotTo(HaveOccurred()) + + r.Annotations = map[string]string{constants.AnnForceRollingUpdate: "true"} + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate).To(BeNil()) + }) + + It("should set partition when forcing updating StatefulSet with invalid value", func() { + r := makeStatefulSet() + err := k8sClient.Create(ctx, r) + Expect(err).NotTo(HaveOccurred()) + + r.Annotations = map[string]string{constants.AnnForceRollingUpdate: "false"} + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas)) + }) + + It("should not update partition when updating StatefulSet with only partition changed", func() { + r := makeStatefulSet() + err := k8sClient.Create(ctx, r) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas)) + + r.Spec.UpdateStrategy.RollingUpdate.Partition = ptr.To[int32](2) + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](2))) + }) + + It("should update partition when updating StatefulSet with partition and same field changed", func() { + r := makeStatefulSet() + err := k8sClient.Create(ctx, r) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas)) + + r.Spec.Replicas = ptr.To[int32](5) + r.Spec.UpdateStrategy.RollingUpdate.Partition = ptr.To[int32](2) + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](5))) + }) + + It("should update partition when updating StatefulSet with partition unchanged", func() { + r := makeStatefulSet() + err := k8sClient.Create(ctx, r) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas)) + + r.Spec.Replicas = ptr.To[int32](5) + err = k8sClient.Update(ctx, r) + Expect(err).NotTo(HaveOccurred()) + Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](5))) + }) +}) diff --git a/api/v1beta2/webhook_suite_test.go b/api/v1beta2/webhook_suite_test.go index 8c9a93656..73317134d 100644 --- a/api/v1beta2/webhook_suite_test.go +++ b/api/v1beta2/webhook_suite_test.go @@ -98,6 +98,8 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) err = (&mocov1beta2.BackupPolicy{}).SetupWebhookWithManager(mgr) Expect(err).NotTo(HaveOccurred()) + err = mocov1beta2.SetupStatefulSetWebhookWithManager(mgr) + Expect(err).NotTo(HaveOccurred()) //+kubebuilder:scaffold:webhook diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index a1f5cfc55..b12bdec49 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -559,6 +559,21 @@ func (in *ServiceTemplate) DeepCopy() *ServiceTemplate { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StatefulSetDefaulter) DeepCopyInto(out *StatefulSetDefaulter) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StatefulSetDefaulter. +func (in *StatefulSetDefaulter) DeepCopy() *StatefulSetDefaulter { + if in == nil { + return nil + } + out := new(StatefulSetDefaulter) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeApplyConfiguration) DeepCopyInto(out *VolumeApplyConfiguration) { clone := in.DeepCopy() diff --git a/charts/moco/templates/generated/generated.yaml b/charts/moco/templates/generated/generated.yaml index 1332da753..29fd2d85a 100644 --- a/charts/moco/templates/generated/generated.yaml +++ b/charts/moco/templates/generated/generated.yaml @@ -155,6 +155,12 @@ rules: - patch - update - watch + - apiGroups: + - "" + resources: + - pods/status + verbs: + - get - apiGroups: - "" resources: @@ -460,6 +466,25 @@ webhooks: resources: - mysqlclusters sideEffects: None + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: moco-webhook-service + namespace: '{{ .Release.Namespace }}' + path: /mutate-apps-v1-statefulset + failurePolicy: Fail + name: statefulset.kb.io + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - UPDATE + resources: + - statefulsets + sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration diff --git a/cmd/moco-controller/cmd/run.go b/cmd/moco-controller/cmd/run.go index ecbd9a5dc..ad6eff143 100644 --- a/cmd/moco-controller/cmd/run.go +++ b/cmd/moco-controller/cmd/run.go @@ -119,6 +119,15 @@ func subMain(ns, addr string, port int) error { return err } + if err = (&controllers.StatefulSetPartitionReconciler{ + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor("moco-controller"), + MaxConcurrentReconciles: config.maxConcurrentReconciles, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Partition") + return err + } + if err = (&controllers.PodWatcher{ Client: mgr.GetClient(), ClusterManager: clusterMgr, @@ -138,6 +147,11 @@ func subMain(ns, addr string, port int) error { return err } + if err := mocov1beta2.SetupStatefulSetWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup webhook", "webhook", "StatefulSet") + return err + } + if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") return err diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 753de4d32..52b5ee731 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -50,6 +50,12 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - pods/status + verbs: + - get - apiGroups: - "" resources: diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 3d9a7003b..fa39bbbcf 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -24,6 +24,25 @@ webhooks: resources: - mysqlclusters sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-apps-v1-statefulset + failurePolicy: Fail + name: statefulset.kb.io + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - UPDATE + resources: + - statefulsets + sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 590dfc8d8..5407a0d2d 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -720,6 +720,10 @@ func (r *MySQLClusterReconciler) reconcileV1StatefulSet(ctx context.Context, req WithType(appsv1.RollingUpdateStatefulSetStrategyType)). WithServiceName(cluster.HeadlessServiceName())) + if isForceRollingUpdate(cluster) { + sts.WithAnnotations(map[string]string{constants.AnnForceRollingUpdate: "true"}) + } + volumeClaimTemplates := make([]*corev1ac.PersistentVolumeClaimApplyConfiguration, 0, len(cluster.Spec.VolumeClaimTemplates)) for _, v := range cluster.Spec.VolumeClaimTemplates { pvc := v.ToCoreV1() @@ -1678,6 +1682,9 @@ func (r *MySQLClusterReconciler) finalizeV1(ctx context.Context, cluster *mocov1 metrics.ClusteringStoppedVec.DeleteLabelValues(cluster.Name, cluster.Namespace) metrics.ReconciliationStoppedVec.DeleteLabelValues(cluster.Name, cluster.Namespace) + metrics.CurrentReplicasVec.DeleteLabelValues(cluster.Name, cluster.Namespace) + metrics.UpdatedReplicasVec.DeleteLabelValues(cluster.Name, cluster.Namespace) + metrics.LastPartitionUpdatedVec.DeleteLabelValues(cluster.Name, cluster.Namespace) return nil } @@ -1831,6 +1838,10 @@ func isClusteringStopped(cluster *mocov1beta2.MySQLCluster) bool { return cluster.Annotations[constants.AnnClusteringStopped] == "true" } +func isForceRollingUpdate(cluster *mocov1beta2.MySQLCluster) bool { + return cluster.Annotations[constants.AnnForceRollingUpdate] == "true" +} + func setControllerReferenceWithConfigMap(cluster *mocov1beta2.MySQLCluster, cm *corev1ac.ConfigMapApplyConfiguration, scheme *runtime.Scheme) error { gvk, err := apiutil.GVKForObject(cluster, scheme) if err != nil { diff --git a/controllers/mysqlcluster_controller_test.go b/controllers/mysqlcluster_controller_test.go index d23fd4985..ffa592b3a 100644 --- a/controllers/mysqlcluster_controller_test.go +++ b/controllers/mysqlcluster_controller_test.go @@ -675,6 +675,7 @@ var _ = Describe("MySQLCluster reconciler", func() { It("should reconcile statefulset", func() { cluster := testNewMySQLCluster("test") + cluster.Annotations = map[string]string{constants.AnnForceRollingUpdate: "true"} cluster.Spec.ReplicationSourceSecretName = ptr.To[string]("source-secret") cluster.Spec.PodTemplate.Annotations = map[string]string{"foo": "bar"} cluster.Spec.PodTemplate.Labels = map[string]string{"foo": "baz"} @@ -702,6 +703,7 @@ var _ = Describe("MySQLCluster reconciler", func() { By("checking new statefulset") Expect(sts.OwnerReferences).NotTo(BeEmpty()) + Expect(sts.Annotations).To(HaveKeyWithValue(constants.AnnForceRollingUpdate, "true")) Expect(sts.Spec.Template.Annotations).To(HaveKeyWithValue("foo", "bar")) Expect(sts.Spec.Template.Labels).To(HaveKeyWithValue("foo", "baz")) Expect(sts.Spec.Replicas).NotTo(BeNil()) diff --git a/controllers/partition_controller.go b/controllers/partition_controller.go new file mode 100644 index 000000000..79008da26 --- /dev/null +++ b/controllers/partition_controller.go @@ -0,0 +1,327 @@ +package controllers + +import ( + "context" + "fmt" + "sort" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/kubectl/pkg/util/podutils" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + crlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + "github.com/cybozu-go/moco/pkg/constants" + "github.com/cybozu-go/moco/pkg/metrics" +) + +var _ reconcile.Reconciler = &StatefulSetPartitionReconciler{} + +// StatefulSetPartitionReconciler reconciles a StatefulSet object +type StatefulSetPartitionReconciler struct { + client.Client + Recorder record.EventRecorder + MaxConcurrentReconciles int +} + +//+kubebuilder:rbac:groups=moco.cybozu.com,resources=mysqlclusters,verbs=get;list;watch +//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;update;patch +//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +//+kubebuilder:rbac:groups="",resources=pods/status,verbs=get +//+kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch + +// Reconcile implements Reconciler interface. +// See https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile#Reconciler +func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := crlog.FromContext(ctx) + + sts := &appsv1.StatefulSet{} + err := r.Get(ctx, req.NamespacedName, sts) + if err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + log.Error(err, "unable to fetch StatefulSet") + return reconcile.Result{}, err + } + + cluster, err := r.getMySQLCluster(ctx, sts) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get MySQLCluster: %w", err) + } + + metrics.CurrentReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.CurrentReplicas)) + metrics.UpdatedReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.UpdatedReplicas)) + + if !r.needPartitionUpdate(sts) { + return reconcile.Result{}, nil + } + + ready, err := r.isRolloutReady(ctx, cluster, sts) + if err != nil { + log.Error(err, "failed to check if rollout is ready") + return reconcile.Result{}, err + } + if !ready { + return reconcile.Result{RequeueAfter: 30 * time.Second}, nil + } + + if err := r.patchNewPartition(ctx, sts); err != nil { + log.Error(err, "failed to apply new partition") + return reconcile.Result{}, err + } + + log.Info("partition is updated") + metrics.LastPartitionUpdatedVec.WithLabelValues(cluster.Name, cluster.Namespace).SetToCurrentTime() + + return reconcile.Result{RequeueAfter: 10 * time.Second}, nil +} + +func (r *StatefulSetPartitionReconciler) SetupWithManager(mgr ctrl.Manager) error { + mapFn := handler.EnqueueRequestsFromMapFunc( + func(ctx context.Context, obj client.Object) []ctrl.Request { + return []ctrl.Request{ + { + NamespacedName: client.ObjectKey{ + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }, + }, + } + }) + + p := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + old := e.ObjectOld.(*mocov1beta2.MySQLCluster) + new := e.ObjectNew.(*mocov1beta2.MySQLCluster) + return old.ResourceVersion != new.ResourceVersion + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + } + + return ctrl.NewControllerManagedBy(mgr). + For(&appsv1.StatefulSet{}). + Owns(&corev1.Pod{}). + Watches( + &mocov1beta2.MySQLCluster{}, + mapFn, + builder.WithPredicates(p), + ). + WithOptions( + controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}, + ). + Complete(r) +} + +// isRolloutReady returns true if the StatefulSet is ready for rolling update. +func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet) (bool, error) { + log := crlog.FromContext(ctx) + + podList, err := r.getSortedPodList(ctx, sts) + if err != nil { + return false, fmt.Errorf("failed to get pod list: %w", err) + } + + var replicas int32 + if sts.Spec.Replicas != nil { + replicas = *sts.Spec.Replicas + } else { + replicas = 1 + } + + if replicas != int32(len(podList.Items)) { + log.Info("replicas is different from the expected number of Pods", "expected", replicas, "actual", len(podList.Items)) + return false, nil + } + + nextRolloutTarget := r.nextRolloutTargetIndex(sts) + if nextRolloutTarget < 0 { + return false, nil + } + + // If not all Pods are ready, the MySQLCluster becomes Unhealthy. + // Even if the MySQLCluster is not healthy, the rollout continues if the rollout target Pod is not ready. + // This is because there is an expectation that restarting the Not Ready Pod might improve its state. + if podutils.IsPodReady(&podList.Items[nextRolloutTarget]) && !r.isMySQLClusterHealthy(cluster) { + log.Info("MySQLCluster is not healthy", "name", cluster.Name, "namespace", cluster.Namespace) + return false, nil + } + + ready, err := r.areAllChildPodsRolloutReady(ctx, sts, podList) + if err != nil { + return false, fmt.Errorf("failed to check if all child pods are ready: %w", err) + } + + return ready, nil +} + +// getSortedPodList returns a sorted child pod list. +// The list is sorted by pod name with ascending order. +func (r *StatefulSetPartitionReconciler) getSortedPodList(ctx context.Context, sts *appsv1.StatefulSet) (*corev1.PodList, error) { + podList := &corev1.PodList{} + listOpts := []client.ListOption{ + client.InNamespace(sts.Namespace), + client.MatchingLabels(sts.Spec.Selector.MatchLabels), + } + + err := r.Client.List(ctx, podList, listOpts...) + if err != nil { + return nil, err + } + + sort.Slice(podList.Items, func(i, j int) bool { + return podList.Items[i].Name < podList.Items[j].Name + }) + + return podList, nil +} + +func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context.Context, sts *appsv1.StatefulSet, sortedPodList *corev1.PodList) (bool, error) { + log := crlog.FromContext(ctx) + + firstReivision := sortedPodList.Items[0].Labels[appsv1.ControllerRevisionHashLabelKey] + lastIndex := len(sortedPodList.Items) - 1 + lastRevision := sortedPodList.Items[lastIndex].Labels[appsv1.ControllerRevisionHashLabelKey] + revisionCounts := make(map[string]int) + + for _, pod := range sortedPodList.Items { + revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey] + revisionCounts[revision]++ + } + + nextRolloutTarget := r.nextRolloutTargetIndex(sts) + + // Proceed with the rollout for the next Pod to be rolled out, even if it is not Ready. + // We expect that the Pod'excludeNextRolloutTagetPodList state will improve by being updated through the rollout. + // All other Pods must be Ready. + excludeNextRolloutTagetPodList := make([]corev1.Pod, 0, len(sortedPodList.Items)-1) + excludeNextRolloutTagetPodList = append(excludeNextRolloutTagetPodList, sortedPodList.Items[:nextRolloutTarget]...) + excludeNextRolloutTagetPodList = append(excludeNextRolloutTagetPodList, sortedPodList.Items[nextRolloutTarget+1:]...) + for _, pod := range excludeNextRolloutTagetPodList { + if pod.DeletionTimestamp != nil { + log.Info("Pod is in the process of being terminated", "name", pod.Name, "namespace", pod.Namespace) + return false, nil + } + if !podutils.IsPodAvailable(&pod, 5, metav1.Now()) { + log.Info("Pod is not ready", "name", pod.Name, "namespace", pod.Namespace) + return false, nil + } + } + + partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition + expectedPodsCount := int(*sts.Spec.Replicas) - int(partition) + + // If the number of partitions is equal to the number of replicas, the rollout has not started. + // Decrease the partition to start the rollout. + // If there is only one revision type in all other cases, the rollout is complete. + if int(*sts.Spec.Replicas) == int(partition) { + return true, nil + } else if len(revisionCounts) == 1 { + return true, nil + } + + // If the first and last revisions are the same, it is a rollback. + // To consider rollbacks, search from the beginning of the Pod list, + // and if a Pod with the same revision as the last one is found, add it to the expected number of Pods. + if firstReivision == lastRevision { + for _, pod := range sortedPodList.Items { + revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey] + if revision == lastRevision { + expectedPodsCount++ + } else { + break + } + } + } + + if revisionCounts[lastRevision] != expectedPodsCount { + log.Info("Pod count is different from the expected number", "revision", lastRevision, "expected", expectedPodsCount, "actual", revisionCounts[lastRevision]) + return false, nil + } + + return true, nil +} + +// isMySQLClusterHealthy checks the health status of a given MySQLCluster. +func (r *StatefulSetPartitionReconciler) isMySQLClusterHealthy(cluster *mocov1beta2.MySQLCluster) bool { + return meta.IsStatusConditionTrue(cluster.Status.Conditions, mocov1beta2.ConditionHealthy) +} + +// nextRolloutTargetIndex returns the index of the next rollout target Pod. +// The index is calculated by subtracting 1 from the current partition. +// If there is no rollout target, it returns -1. +func (r *StatefulSetPartitionReconciler) nextRolloutTargetIndex(sts *appsv1.StatefulSet) int { + if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil { + return -1 + } + + return int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition) - 1 +} + +// getMySQLCluster retrieves the MySQLCluster release that owns a given StatefulSet. +func (r *StatefulSetPartitionReconciler) getMySQLCluster(ctx context.Context, sts *appsv1.StatefulSet) (*mocov1beta2.MySQLCluster, error) { + for _, ownerRef := range sts.GetOwnerReferences() { + if ownerRef.Kind != "MySQLCluster" { + continue + } + + cluster := &mocov1beta2.MySQLCluster{} + if err := r.Get(ctx, types.NamespacedName{Name: ownerRef.Name, Namespace: sts.Namespace}, cluster); err != nil { + return nil, err + } + + return cluster, nil + } + + return nil, fmt.Errorf("StatefulSet %s/%s has no owner reference to MySQLCluster", sts.Namespace, sts.Name) +} + +// needPartitionUpdate returns true if the StatefulSet needs to update partition. +func (r *StatefulSetPartitionReconciler) needPartitionUpdate(sts *appsv1.StatefulSet) bool { + if sts.Annotations[constants.AnnForceRollingUpdate] == "true" { + return false + } + if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil { + return false + } + + return *sts.Spec.UpdateStrategy.RollingUpdate.Partition > 0 +} + +// patchNewPartition patches the new partition of a StatefulSet. +func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error { + oldPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition + newPartition := oldPartition - 1 + + if oldPartition == 0 { + return nil + } + + patch := client.MergeFrom(sts.DeepCopy()) + sts.Spec.UpdateStrategy.RollingUpdate.Partition = &newPartition + + if err := r.Client.Patch(ctx, sts, patch); err != nil { + return fmt.Errorf("failed to patch new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err) + } + + r.Recorder.Eventf(sts, corev1.EventTypeNormal, "PartitionUpdate", "Updated partition from %d to %d", oldPartition, newPartition) + + return nil +} diff --git a/controllers/partition_controller_test.go b/controllers/partition_controller_test.go new file mode 100644 index 000000000..4adb8a397 --- /dev/null +++ b/controllers/partition_controller_test.go @@ -0,0 +1,237 @@ +package controllers + +import ( + "context" + "errors" + "fmt" + "sort" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" +) + +func testNewStatefulSet(cluster *mocov1beta2.MySQLCluster) *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: cluster.PrefixedName(), + Namespace: cluster.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(cluster, mocov1beta2.GroupVersion.WithKind("MySQLCluster")), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](cluster.Spec.Replicas), + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: ptr.To[int32](cluster.Spec.Replicas), + }, + }, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "moco-mysql:latest", + }, + }, + }, + }, + }, + } +} + +func testNewPods(sts *appsv1.StatefulSet) []*corev1.Pod { + pods := make([]*corev1.Pod, 0, *sts.Spec.Replicas) + + for i := 0; i < int(*sts.Spec.Replicas); i++ { + pods = append(pods, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", sts.Name, i), + Namespace: sts.Namespace, + Labels: map[string]string{ + appsv1.ControllerRevisionHashLabelKey: "rev1", + "foo": "bar", + }, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(sts, appsv1.SchemeGroupVersion.WithKind("StatefulSet"))}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test", Image: "moco-mysql:latest"}, + }, + }, + }) + } + return pods +} + +func rolloutPods(ctx context.Context, rev1 int, rev2 int) { + pods := &corev1.PodList{} + err := k8sClient.List(ctx, pods, client.InNamespace("test"), client.MatchingLabels(map[string]string{"foo": "bar"})) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pods.Items)).To(Equal(rev1 + rev2)) + + sort.Slice(pods.Items, func(i, j int) bool { + return pods.Items[i].Name < pods.Items[j].Name + }) + + for _, pod := range pods.Items { + if rev1 > 0 { + pod.Labels[appsv1.ControllerRevisionHashLabelKey] = "rev1" + rev1-- + } else if rev2 > 0 { + pod.Labels[appsv1.ControllerRevisionHashLabelKey] = "rev2" + rev2-- + } else { + break + } + + err = k8sClient.Update(ctx, &pod) + Expect(err).NotTo(HaveOccurred()) + } +} + +var _ = Describe("StatefulSet reconciler", func() { + ctx := context.Background() + var stopFunc func() + + BeforeEach(func() { + err := k8sClient.DeleteAllOf(ctx, &mocov1beta2.MySQLCluster{}, client.InNamespace("test")) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.DeleteAllOf(ctx, &appsv1.StatefulSet{}, client.InNamespace("test")) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace("test")) + Expect(err).NotTo(HaveOccurred()) + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme, + LeaderElection: false, + Metrics: metricsserver.Options{ + BindAddress: "0", + }, + }) + Expect(err).ToNot(HaveOccurred()) + + r := &StatefulSetPartitionReconciler{ + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor("moco-controller"), + } + err = r.SetupWithManager(mgr) + Expect(err).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(ctx) + stopFunc = cancel + go func() { + err := mgr.Start(ctx) + if err != nil { + panic(err) + } + }() + time.Sleep(100 * time.Millisecond) + }) + + AfterEach(func() { + stopFunc() + time.Sleep(100 * time.Millisecond) + }) + + It("should partition to 0", func() { + cluster := testNewMySQLCluster("test") + err := k8sClient.Create(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + meta.SetStatusCondition(&cluster.Status.Conditions, + metav1.Condition{ + Type: mocov1beta2.ConditionHealthy, + Status: metav1.ConditionTrue, + Reason: "healthy", + }, + ) + err = k8sClient.Status().Update(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + sts := testNewStatefulSet(cluster) + err = k8sClient.Create(ctx, sts) + Expect(err).NotTo(HaveOccurred()) + sts.Status = appsv1.StatefulSetStatus{ + ObservedGeneration: 2, + CurrentRevision: "rev1", + UpdateRevision: "rev1", + Replicas: 3, + UpdatedReplicas: 3, + } + err = k8sClient.Status().Update(ctx, sts) + Expect(err).NotTo(HaveOccurred()) + + for _, pod := range testNewPods(sts) { + err = k8sClient.Create(ctx, pod) + Expect(err).NotTo(HaveOccurred()) + pod.Status = corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + Reason: "PodReady", + LastTransitionTime: metav1.Time{ + Time: time.Now().Add(-24 * time.Hour), + }, + }, + }, + } + err = k8sClient.Status().Update(ctx, pod) + Expect(err).NotTo(HaveOccurred()) + } + + Eventually(func() error { + sts := &appsv1.StatefulSet{} + key := client.ObjectKey{Namespace: "test", Name: "moco-test"} + if err := k8sClient.Get(ctx, key, sts); err != nil { + return err + } + if sts.Spec.UpdateStrategy.RollingUpdate == nil { + return errors.New("partition is nil") + } + + switch *sts.Spec.UpdateStrategy.RollingUpdate.Partition { + case 3: + rolloutPods(ctx, 2, 1) + case 2: + rolloutPods(ctx, 1, 2) + case 1: + rolloutPods(ctx, 0, 3) + case 0: + return nil + } + + return errors.New("unexpected partition") + }).Should(Succeed()) + + events := &corev1.EventList{} + err = k8sClient.List(ctx, events, client.InNamespace("test")) + Expect(err).NotTo(HaveOccurred()) + sort.Slice(events.Items, func(i, j int) bool { + return events.Items[i].CreationTimestamp.Before(&events.Items[j].CreationTimestamp) + }) + Expect(events.Items).To(HaveLen(3)) + Expect(events.Items[0].Message).To(Equal("Updated partition from 3 to 2")) + Expect(events.Items[1].Message).To(Equal("Updated partition from 2 to 1")) + Expect(events.Items[2].Message).To(Equal("Updated partition from 1 to 0")) + }) +}) diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 09608a20b..9511f6235 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -13,6 +13,7 @@ - [Building your own imge](custom-mysqld.md) - [Customize system container](customize-system-container.md) - [Change volumeClaimTemplates](change-pvc-template.md) + - [Rollout strategy](rolling-update-strategy.md) - [Known issues](known_issues.md) # References diff --git a/docs/metrics.md b/docs/metrics.md index fa56c3896..8da864de1 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -26,6 +26,9 @@ All these metrics are prefixed with `moco_cluster_` and have `name` and `namespa | `failover_total` | The number of times MOCO changed the failed primary instance | Counter | | `replicas` | The number of mysqld instances in the cluster | Gauge | | `ready_replicas` | The number of ready mysqld Pods in the cluster | Gauge | +| `current_replicas` | The number of current replicas | Gauge | +| `updated_replicas` | The number of updated replicas | Gauge | +| `last_partition_updated` | The timestamp of the last successful partition update | Gauge | | `clustering_stopped` | 1 if the cluster is clustering stopped, 0 otherwise | Gauge | | `reconciliation_stopped` | 1 if the cluster is reconciliation stopped, 0 otherwise | Gauge | | `errant_replicas` | The number of mysqld instances that have [errant transactions][errant] | Gauge | diff --git a/docs/rolling-update-strategy.md b/docs/rolling-update-strategy.md new file mode 100644 index 000000000..7c499d4fb --- /dev/null +++ b/docs/rolling-update-strategy.md @@ -0,0 +1,85 @@ +# RollingUpdate strategy + +MOCO manages MySQLCluster pods using StatefulSets. + +```text +MySQLCluster/test +└─StatefulSet/moco-test + ├─ControllerRevision/moco-test-554c56f456 + ├─ControllerRevision/moco-test-5794c57c7c + ├─Pod/moco-test-0 + ├─Pod/moco-test-1 + └─Pod/moco-test-2 +``` + +By default, StatefulSet's standard rolling update does not consider whether MySQLCluster is Healthy during pod updates. +This can sometimes cause problems, as a rolling update may proceed even if MySQLCluster becomes UnHealthy during the process. + +To address this issue, MOCO controls StatefulSet partitions to perform rolling updates. This behavior is enabled by default. + +## Partitions + +By setting a number in `.spec.updateStrategy.rollingUpdate.partition` of a StatefulSet, you can divide the rolling update into partitions. +When a partition is specified, pods with a pod number equal to or greater than the partition value are updated. +Pods with a pod number smaller than the partition value are not updated, and even if those pods are deleted, they will be recreated with the previous version. + +## Architecture + +### When Creating a StatefulSet + +When creating a StatefulSet, MOCO does not assign a partition. + +### When Updating a StatefulSet + +When a StatefulSet is updated, MOCO determines the contents of the StatefulSet update and controls partitions using AdmissionWebhook. + +1. If the StatefulSet update is only the partition number + * The MutatingAdmissionWebhook does nothing. +2. If fields other than the partition of the StatefulSet are updated + * The MutatingAdmissionWebhook updates the partition of the StatefulSet to the same value as the replica using MutatingAdmissionWebhook. + + ```yaml + replicas: 3 + ... + updateStrategy: + type: RollingUpdate + rollingUpdate: + partition: 3 + ... + ``` + +### Updating Partitions + +MOCO monitors the rollout status of the StatefulSet and the status of MySQLCluster. +If the update of pods based on the current partition value is completed successfully and the containers are Running, and the status of MySQLCluster is Healthy, MOCO decrements the partition of the StatefulSet by 1. +This operation is repeated until the partition value reaches 0. + +### Forcefully Rolling Out + +By setting the annotation `moco.cybozu.com/force-rolling-update` to `true`, you can update the StatefulSet without partition control. + +```yaml +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: default + name: test + annotations: + moco.cybozu.com/force-rolling-update: "true" +... +``` + +When creating or updating a StatefulSet with the annotation `moco.cybozu.com/force-rolling-update` set, MOCO deletes the partition setting using MutatingAdmissionWebhook. + +### Metrics + +MOCO outputs the following metrics related to rolling updates: + +* `moco_cluster_current_replicas` + * The same as `.status.currentReplicas` of the StatefulSet. +* `moco_cluster_updated_replicas` + * The same as `.status.updatedReplicas` of the StatefulSet. +* `moco_cluster_last_partition_updated` + * The time the partition was last updated. + +By setting an alert with the condition that `moco_cluster_updated_replicas` is not equal to `moco_cluster_replicas` and a certain amount of time has passed since `moco_cluster_last_partition_updated`, you can detect MySQLClusters where the rolling update is stopped. diff --git a/e2e/partition_test.go b/e2e/partition_test.go new file mode 100644 index 000000000..0b0ffdff1 --- /dev/null +++ b/e2e/partition_test.go @@ -0,0 +1,281 @@ +package e2e + +import ( + "bytes" + _ "embed" + "encoding/json" + "errors" + "fmt" + "sort" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/expfmt" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" +) + +//go:embed testdata/partition.yaml +var partitionTestYAML string + +//go:embed testdata/partition_changed.yaml +var partitionApplyYAML string + +//go:embed testdata/partition_force_rollingupdate.yaml +var forceRollingUpdateApplyYAML string + +//go:embed testdata/partition_image_pull_backoff.yaml +var imagePullBackoffApplyYAML string + +var _ = Context("partition_test", func() { + if doUpgrade { + return + } + + It("should construct a cluster", func() { + kubectlSafe(fillTemplate(partitionTestYAML), "apply", "-f", "-") + Eventually(func() error { + cluster, err := getCluster("partition", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + }) + + It("should pod template change succeed", func() { + // cpu request 1m -> 2m + kubectlSafe(fillTemplate(partitionApplyYAML), "apply", "-f", "-") + Eventually(func() error { + out, err := kubectl(nil, "get", "-n", "partition", "pod", "-o", "json") + if err != nil { + return err + } + pods := &corev1.PodList{} + if err := json.Unmarshal(out, pods); err != nil { + return err + } + + for _, pod := range pods.Items { + for _, c := range pod.Spec.Containers { + if c.Name != "mysqld" { + continue + } + if c.Resources.Requests.Cpu().Cmp(resource.MustParse("2m")) != 0 { + return fmt.Errorf("pod %s is not changed", pod.Name) + } + } + } + + cluster, err := getCluster("partition", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + + return errors.New("no health condition") + }).WithTimeout(time.Minute * 10).Should(Succeed()) + }) + + It("should partition changes succeed", func() { + out, err := kubectl(nil, "get", "-n", "partition", "event", "-o", "json") + Expect(err).NotTo(HaveOccurred()) + events := &corev1.EventList{} + err = json.Unmarshal(out, events) + Expect(err).NotTo(HaveOccurred()) + + partitionEvents := []corev1.Event{} + for _, event := range events.Items { + if event.Reason == "PartitionUpdate" { + partitionEvents = append(partitionEvents, event) + } + } + + sort.Slice(partitionEvents, func(i, j int) bool { + return partitionEvents[i].CreationTimestamp.Before(&partitionEvents[j].CreationTimestamp) + }) + Expect(partitionEvents).To(HaveLen(3)) + Expect(partitionEvents[0].Message).To(Equal("Updated partition from 3 to 2")) + Expect(partitionEvents[1].Message).To(Equal("Updated partition from 2 to 1")) + Expect(partitionEvents[2].Message).To(Equal("Updated partition from 1 to 0")) + }) + + It("metrics", func() { + out := kubectlSafe(nil, "-n", "moco-system", "get", "pods", "-l", "app.kubernetes.io/component=moco-controller", "-o", "json") + pods := &corev1.PodList{} + err := json.Unmarshal(out, pods) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + addr := pods.Items[0].Status.PodIP + out, err = runInPod("curl", "-sf", fmt.Sprintf("http://%s:8080/metrics", addr)) + Expect(err).NotTo(HaveOccurred()) + + mfs, err := (&expfmt.TextParser{}).TextToMetricFamilies(bytes.NewReader(out)) + Expect(err).NotTo(HaveOccurred()) + + stsOut, err := kubectl(nil, "get", "-n", "partition", "statefulset", "moco-test", "-o", "json") + Expect(err).NotTo(HaveOccurred()) + sts := &appsv1.StatefulSet{} + err = json.Unmarshal(stsOut, sts) + Expect(err).NotTo(HaveOccurred()) + + curReplicasMf := mfs["moco_cluster_current_replicas"] + Expect(curReplicasMf).NotTo(BeNil()) + curReplicasMetric := findMetric(curReplicasMf, map[string]string{"namespace": "partition", "name": "test"}) + Expect(curReplicasMetric).NotTo(BeNil()) + Expect(curReplicasMetric.GetGauge().GetValue()).To(BeNumerically("==", float64(sts.Status.CurrentReplicas))) + + updatedReplicasMf := mfs["moco_cluster_updated_replicas"] + Expect(updatedReplicasMf).NotTo(BeNil()) + updatedReplicasMetric := findMetric(updatedReplicasMf, map[string]string{"namespace": "partition", "name": "test"}) + Expect(updatedReplicasMetric).NotTo(BeNil()) + Expect(updatedReplicasMetric.GetGauge().GetValue()).To(BeNumerically("==", float64(sts.Status.UpdatedReplicas))) + + partitionUpdatedMf := mfs["moco_cluster_last_partition_updated"] + Expect(partitionUpdatedMf).NotTo(BeNil()) + partitionUpdatedMetric := findMetric(partitionUpdatedMf, map[string]string{"namespace": "partition", "name": "test"}) + Expect(partitionUpdatedMetric).NotTo(BeNil()) + Expect(updatedReplicasMetric.GetGauge().GetValue()).To(BeNumerically(">", 0)) + }) + + It("should image pull backoff", func() { + kubectlSafe(fillTemplate(imagePullBackoffApplyYAML), "apply", "-f", "-") + Eventually(func() error { + cluster, err := getCluster("partition", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionFalse { + return nil + } + return fmt.Errorf("cluster is healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + }) + + It("should partition updates have stopped", func() { + out, err := kubectl(nil, "get", "-n", "partition", "statefulset", "moco-test", "-o", "json") + Expect(err).NotTo(HaveOccurred()) + sts := &appsv1.StatefulSet{} + err = json.Unmarshal(out, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(sts.Spec.UpdateStrategy.RollingUpdate).NotTo(BeNil()) + Expect(sts.Spec.UpdateStrategy.RollingUpdate.Partition).NotTo(BeNil()) + Expect(*sts.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(2))) + }) + + It("should rollback succeed", func() { + kubectlSafe(fillTemplate(partitionApplyYAML), "apply", "-f", "-") + Eventually(func() error { + cluster, err := getCluster("partition", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + }) + + It("should partition updates succeed", func() { + Eventually(func() error { + out, err := kubectl(nil, "get", "-n", "partition", "statefulset", "moco-test", "-o", "json") + if err != nil { + return err + } + sts := &appsv1.StatefulSet{} + if err := json.Unmarshal(out, sts); err != nil { + return err + } + if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil { + return errors.New("partition is nil") + } + if *sts.Spec.UpdateStrategy.RollingUpdate.Partition == int32(0) { + return nil + } + return errors.New("partition is not 0") + }).Should(Succeed()) + }) + + It("should pod template change succeed with force rolling update", func() { + kubectlSafe(fillTemplate(forceRollingUpdateApplyYAML), "apply", "-f", "-") + Eventually(func() error { + cluster, err := getCluster("partition", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + }) + + It("should partition removed", func() { + out, err := kubectl(nil, "get", "-n", "partition", "statefulset", "moco-test", "-o", "json") + Expect(err).NotTo(HaveOccurred()) + sts := &appsv1.StatefulSet{} + err = json.Unmarshal(out, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(sts.Spec.UpdateStrategy.RollingUpdate).To(BeNil()) + }) + + It("should delete clusters", func() { + kubectlSafe(nil, "delete", "-n", "partition", "mysqlclusters", "--all") + + Eventually(func() error { + out, err := kubectl(nil, "get", "-n", "partition", "pod", "-o", "json") + if err != nil { + return err + } + pods := &corev1.PodList{} + if err := json.Unmarshal(out, pods); err != nil { + return err + } + if len(pods.Items) > 0 { + return errors.New("wait until all Pods are deleted") + } + return nil + }).Should(Succeed()) + }) +}) diff --git a/e2e/testdata/partition.yaml b/e2e/testdata/partition.yaml new file mode 100644 index 000000000..cc3e226a0 --- /dev/null +++ b/e2e/testdata/partition.yaml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: partition +--- +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: partition + name: test +spec: + replicas: 3 + podTemplate: + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:{{ . }} + resources: + requests: + cpu: 1m + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi diff --git a/e2e/testdata/partition_changed.yaml b/e2e/testdata/partition_changed.yaml new file mode 100644 index 000000000..a1d11c407 --- /dev/null +++ b/e2e/testdata/partition_changed.yaml @@ -0,0 +1,23 @@ +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: partition + name: test +spec: + replicas: 3 + podTemplate: + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:{{ . }} + resources: + requests: + cpu: 2m + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi diff --git a/e2e/testdata/partition_force_rollingupdate.yaml b/e2e/testdata/partition_force_rollingupdate.yaml new file mode 100644 index 000000000..29a43613c --- /dev/null +++ b/e2e/testdata/partition_force_rollingupdate.yaml @@ -0,0 +1,25 @@ +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: partition + name: test + annotations: + moco.cybozu.com/force-rolling-update: "true" +spec: + replicas: 3 + podTemplate: + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:{{ . }} + resources: + requests: + cpu: 3m + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi diff --git a/e2e/testdata/partition_image_pull_backoff.yaml b/e2e/testdata/partition_image_pull_backoff.yaml new file mode 100644 index 000000000..4f12514d5 --- /dev/null +++ b/e2e/testdata/partition_image_pull_backoff.yaml @@ -0,0 +1,23 @@ +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: partition + name: test +spec: + replicas: 3 + podTemplate: + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:invalid-image + resources: + requests: + cpu: 1m + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi diff --git a/pkg/constants/meta.go b/pkg/constants/meta.go index 061284ee7..f7cabf8e6 100644 --- a/pkg/constants/meta.go +++ b/pkg/constants/meta.go @@ -21,6 +21,7 @@ const ( AnnSecretVersion = "moco.cybozu.com/secret-version" AnnClusteringStopped = "moco.cybozu.com/clustering-stopped" AnnReconciliationStopped = "moco.cybozu.com/reconciliation-stopped" + AnnForceRollingUpdate = "moco.cybozu.com/force-rolling-update" ) // MySQLClusterFinalizer is the finalizer specifier for MySQLCluster. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 18ddc0df2..bf095bca3 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -30,6 +30,10 @@ var ( ClusteringStoppedVec *prometheus.GaugeVec ReconciliationStoppedVec *prometheus.GaugeVec + + CurrentReplicasVec *prometheus.GaugeVec + UpdatedReplicasVec *prometheus.GaugeVec + LastPartitionUpdatedVec *prometheus.GaugeVec ) // Backup related metrics @@ -220,4 +224,28 @@ func Register(registry prometheus.Registerer) { Help: "Indicates if reconciliation has stopped", }, []string{"name", "namespace"}) registry.MustRegister(ReconciliationStoppedVec) + + CurrentReplicasVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: clusteringSubsystem, + Name: "current_replicas", + Help: "The number of current replicas", + }, []string{"name", "namespace"}) + registry.MustRegister(CurrentReplicasVec) + + UpdatedReplicasVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: clusteringSubsystem, + Name: "updated_replicas", + Help: "The number of updated replicas", + }, []string{"name", "namespace"}) + registry.MustRegister(UpdatedReplicasVec) + + LastPartitionUpdatedVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: clusteringSubsystem, + Name: "last_partition_updated", + Help: "The timestamp of the last successful partition update", + }, []string{"name", "namespace"}) + registry.MustRegister(LastPartitionUpdatedVec) }