diff --git a/.secrets.baseline b/.secrets.baseline index 0559fadd6..771fa4ba4 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -374,14 +374,14 @@ "filename": "apis/clusters/v1beta1/zookeeper_types.go", "hashed_secret": "5ffe533b830f08a0326348a9160afafc8ada44db", "is_verified": false, - "line_number": 235 + "line_number": 214 }, { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/zookeeper_types.go", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 240 + "line_number": 219 } ], "apis/clusters/v1beta1/zz_generated.deepcopy.go": [ @@ -574,7 +574,7 @@ "filename": "controllers/clusters/postgresql_controller.go", "hashed_secret": "5ffe533b830f08a0326348a9160afafc8ada44db", "is_verified": false, - "line_number": 1272 + "line_number": 1265 } ], "controllers/clusters/zookeeper_controller_test.go": [ @@ -739,7 +739,7 @@ "filename": "pkg/instaclustr/client.go", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 2072 + "line_number": 2078 } ], "pkg/instaclustr/mock/client.go": [ @@ -1146,5 +1146,5 @@ } ] }, - "generated_at": "2024-02-26T10:23:28Z" + "generated_at": "2024-02-27T14:03:16Z" } diff --git a/apis/clusters/v1beta1/zookeeper_types.go b/apis/clusters/v1beta1/zookeeper_types.go index 2dece32a9..4b7bd4878 100644 --- a/apis/clusters/v1beta1/zookeeper_types.go +++ b/apis/clusters/v1beta1/zookeeper_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - "encoding/json" "fmt" "k8s.io/api/core/v1" @@ -25,25 +24,35 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/utils/slices" ) type ZookeeperDataCentre struct { - DataCentre `json:",inline"` + GenericDataCentreSpec `json:",inline"` + + NumberOfNodes int `json:"numberOfNodes"` + NodeSize string `json:"nodeSize"` ClientToServerEncryption bool `json:"clientToServerEncryption"` - EnforceAuthSchemes []string `json:"enforceAuthSchemes,omitempty"` EnforceAuthEnabled bool `json:"enforceAuthEnabled,omitempty"` + EnforceAuthSchemes []string `json:"enforceAuthSchemes,omitempty"` } // ZookeeperSpec defines the desired state of Zookeeper type ZookeeperSpec struct { - Cluster `json:",inline"` - DataCentres []*ZookeeperDataCentre `json:"dataCentres"` + GenericClusterSpec `json:",inline"` + DataCentres []*ZookeeperDataCentre `json:"dataCentres"` } // ZookeeperStatus defines the observed state of Zookeeper type ZookeeperStatus struct { - ClusterStatus `json:",inline"` - DefaultUserSecretRef *Reference `json:"defaultUserSecretRef,omitempty"` + GenericStatus `json:",inline"` + DataCentres []*ZookeeperDataCentreStatus `json:"dataCentres,omitempty"` + DefaultUserSecretRef *Reference `json:"defaultUserSecretRef,omitempty"` +} + +type ZookeeperDataCentreStatus struct { + GenericDataCentreStatus `json:",inline"` + Nodes []*Node `json:"nodes"` } //+kubebuilder:object:root=true @@ -85,19 +94,9 @@ func (z *Zookeeper) NewPatch() client.Patch { return client.MergeFrom(old) } -func (z *Zookeeper) FromInstAPI(iData []byte) (*Zookeeper, error) { - iZook := &models.ZookeeperCluster{} - err := json.Unmarshal(iData, iZook) - if err != nil { - return nil, err - } - - return &Zookeeper{ - TypeMeta: z.TypeMeta, - ObjectMeta: z.ObjectMeta, - Spec: z.Spec.FromInstAPI(iZook), - Status: z.Status.FromInstAPI(iZook), - }, nil +func (z *Zookeeper) FromInstAPI(instaModel *models.ZookeeperCluster) { + z.Spec.FromInstAPI(instaModel) + z.Status.FromInstAPI(instaModel) } func (z *Zookeeper) GetDataCentreID(cdcName string) string { @@ -116,60 +115,41 @@ func (z *Zookeeper) GetClusterID() string { return z.Status.ID } -func (zs *ZookeeperSpec) FromInstAPI(iZook *models.ZookeeperCluster) ZookeeperSpec { - return ZookeeperSpec{ - Cluster: Cluster{ - Name: iZook.Name, - Version: iZook.ZookeeperVersion, - Description: iZook.Description, - PrivateNetworkCluster: iZook.PrivateNetworkCluster, - SLATier: iZook.SLATier, - TwoFactorDelete: zs.Cluster.TwoFactorDeleteFromInstAPI(iZook.TwoFactorDelete), - }, - DataCentres: zs.DCsFromInstAPI(iZook.DataCentres), - } +func (zs *ZookeeperSpec) FromInstAPI(instaModel *models.ZookeeperCluster) { + zs.GenericClusterSpec.FromInstAPI(&instaModel.GenericClusterFields, instaModel.ZookeeperVersion) + zs.DCsFromInstAPI(instaModel.DataCentres) } -func (zs *ZookeeperStatus) FromInstAPI(iZook *models.ZookeeperCluster) ZookeeperStatus { - return ZookeeperStatus{ - ClusterStatus: ClusterStatus{ - ID: iZook.ID, - State: iZook.Status, - DataCentres: zs.DCsFromInstAPI(iZook.DataCentres), - CurrentClusterOperationStatus: iZook.CurrentClusterOperationStatus, - MaintenanceEvents: zs.MaintenanceEvents, - }, - } +func (zs *ZookeeperStatus) FromInstAPI(instaModel *models.ZookeeperCluster) { + zs.GenericStatus.FromInstAPI(&instaModel.GenericClusterFields) + zs.DCsFromInstAPI(instaModel.DataCentres) } -func (zs *ZookeeperSpec) DCsFromInstAPI(iDCs []*models.ZookeeperDataCentre) (dcs []*ZookeeperDataCentre) { - for _, iDC := range iDCs { - dcs = append(dcs, &ZookeeperDataCentre{ - DataCentre: zs.Cluster.DCFromInstAPI(iDC.DataCentre), - ClientToServerEncryption: iDC.ClientToServerEncryption, - EnforceAuthSchemes: iDC.EnforceAuthSchemes, - EnforceAuthEnabled: iDC.EnforceAuthEnabled, - }) +func (zs *ZookeeperSpec) DCsFromInstAPI(instaModels []*models.ZookeeperDataCentre) { + dcs := make([]*ZookeeperDataCentre, len(instaModels)) + for i, instaModel := range instaModels { + dc := &ZookeeperDataCentre{} + dc.FromInstAPI(instaModel) + dcs[i] = dc } - return + zs.DataCentres = dcs } -func (zs *ZookeeperStatus) DCsFromInstAPI(iDCs []*models.ZookeeperDataCentre) (dcs []*DataCentreStatus) { - for _, iDC := range iDCs { - dcs = append(dcs, zs.ClusterStatus.DCFromInstAPI(iDC.DataCentre)) +func (zs *ZookeeperStatus) DCsFromInstAPI(instaModels []*models.ZookeeperDataCentre) { + dcs := make([]*ZookeeperDataCentreStatus, len(instaModels)) + for i, instaModel := range instaModels { + dc := &ZookeeperDataCentreStatus{} + dc.FromInstAPI(instaModel) + dcs[i] = dc } - return + zs.DataCentres = dcs } func (zs *ZookeeperSpec) ToInstAPI() *models.ZookeeperCluster { return &models.ZookeeperCluster{ - Name: zs.Name, - ZookeeperVersion: zs.Version, - PrivateNetworkCluster: zs.PrivateNetworkCluster, - SLATier: zs.SLATier, - TwoFactorDelete: zs.Cluster.TwoFactorDeletesToInstAPI(), - DataCentres: zs.DCsToInstAPI(), - Description: zs.Description, + GenericClusterFields: zs.GenericClusterSpec.ToInstAPI(), + ZookeeperVersion: zs.Version, + DataCentres: zs.DCsToInstAPI(), } } @@ -182,10 +162,12 @@ func (zs *ZookeeperSpec) DCsToInstAPI() (dcs []*models.ZookeeperDataCentre) { func (zdc *ZookeeperDataCentre) ToInstAPI() *models.ZookeeperDataCentre { return &models.ZookeeperDataCentre{ - DataCentre: zdc.DataCentre.ToInstAPI(), + GenericDataCentreFields: zdc.GenericDataCentreSpec.ToInstAPI(), ClientToServerEncryption: zdc.ClientToServerEncryption, EnforceAuthSchemes: zdc.EnforceAuthSchemes, EnforceAuthEnabled: zdc.EnforceAuthEnabled, + NumberOfNodes: zdc.NumberOfNodes, + NodeSize: zdc.NodeSize, } } @@ -194,23 +176,23 @@ func (z *Zookeeper) GetSpec() ZookeeperSpec { return z.Spec } func (z *Zookeeper) IsSpecEqual(spec ZookeeperSpec) bool { return z.Spec.IsEqual(spec) } func (a *ZookeeperSpec) IsEqual(b ZookeeperSpec) bool { - return a.Cluster.IsEqual(b.Cluster) && - a.areDCsEqual(b.DataCentres) + return a.GenericClusterSpec.Equals(&b.GenericClusterSpec) && + a.DCsEqual(b.DataCentres) } -func (rs *ZookeeperSpec) areDCsEqual(b []*ZookeeperDataCentre) bool { - a := rs.DataCentres - if len(a) != len(b) { +func (rs *ZookeeperSpec) DCsEqual(o []*ZookeeperDataCentre) bool { + if len(rs.DataCentres) != len(o) { return false } - for i := range b { - if a[i].Name != b[i].Name { - continue - } + m := map[string]*ZookeeperDataCentre{} + for _, dc := range rs.DataCentres { + m[dc.Name] = dc + } - if !a[i].DataCentre.IsEqual(b[i].DataCentre) || - a[i].ClientToServerEncryption != b[i].ClientToServerEncryption { + for _, iDC := range o { + dc, ok := m[iDC.Name] + if !ok || !dc.Equals(iDC) { return false } } @@ -238,3 +220,56 @@ func (a *Zookeeper) NewDefaultUserSecret(username, password string) *v1.Secret { }, } } + +func (zdc *ZookeeperDataCentre) FromInstAPI(instaModel *models.ZookeeperDataCentre) { + zdc.GenericDataCentreSpec.FromInstAPI(&instaModel.GenericDataCentreFields) + zdc.NodeSize = instaModel.NodeSize + zdc.NumberOfNodes = instaModel.NumberOfNodes + zdc.ClientToServerEncryption = instaModel.ClientToServerEncryption + zdc.EnforceAuthEnabled = instaModel.EnforceAuthEnabled + zdc.EnforceAuthSchemes = instaModel.EnforceAuthSchemes +} + +func (s *ZookeeperDataCentreStatus) FromInstAPI(instaModel *models.ZookeeperDataCentre) { + s.GenericDataCentreStatus.FromInstAPI(&instaModel.GenericDataCentreFields) + s.Nodes = nodesFromInstAPI(instaModel.Nodes) +} + +func (zdc *ZookeeperDataCentre) Equals(o *ZookeeperDataCentre) bool { + return zdc.GenericDataCentreSpec.Equals(&o.GenericDataCentreSpec) && + zdc.NumberOfNodes == o.NumberOfNodes && + zdc.NodeSize == o.NodeSize && + zdc.ClientToServerEncryption == o.ClientToServerEncryption && + zdc.EnforceAuthEnabled == o.EnforceAuthEnabled && + slices.Equals(zdc.EnforceAuthSchemes, o.EnforceAuthSchemes) +} + +func (zs *ZookeeperStatus) Equals(o *ZookeeperStatus) bool { + return zs.GenericStatus.Equals(&o.GenericStatus) && + zs.DCsEquals(o.DataCentres) +} + +func (zs *ZookeeperStatus) DCsEquals(o []*ZookeeperDataCentreStatus) bool { + if len(zs.DataCentres) != len(o) { + return false + } + + m := map[string]*ZookeeperDataCentreStatus{} + for _, dc := range zs.DataCentres { + m[dc.Name] = dc + } + + for _, iDC := range o { + dc, ok := m[iDC.Name] + if !ok || !dc.Equals(iDC) { + return false + } + } + + return true +} + +func (s *ZookeeperDataCentreStatus) Equals(o *ZookeeperDataCentreStatus) bool { + return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && + nodesEqual(s.Nodes, o.Nodes) +} diff --git a/apis/clusters/v1beta1/zookeeper_webhook.go b/apis/clusters/v1beta1/zookeeper_webhook.go index c12fb67db..a4c67fe18 100644 --- a/apis/clusters/v1beta1/zookeeper_webhook.go +++ b/apis/clusters/v1beta1/zookeeper_webhook.go @@ -61,10 +61,6 @@ func (z *Zookeeper) Default() { models.ResourceStateAnnotation: "", }) } - - for _, dataCentre := range z.Spec.DataCentres { - dataCentre.SetDefaultValues() - } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -86,7 +82,7 @@ func (zv *zookeeperValidator) ValidateCreate(ctx context.Context, obj runtime.Ob return err } - err = z.Spec.Cluster.ValidateCreation() + err = z.Spec.GenericClusterSpec.ValidateCreation() if err != nil { return err } @@ -107,7 +103,7 @@ func (zv *zookeeperValidator) ValidateCreate(ctx context.Context, obj runtime.Ob } for _, dc := range z.Spec.DataCentres { - err = dc.DataCentre.ValidateCreation() + err = dc.GenericDataCentreSpec.validateCreation() if err != nil { return err } @@ -127,17 +123,21 @@ func (zv *zookeeperValidator) ValidateUpdate(ctx context.Context, old runtime.Ob return fmt.Errorf("cannot assert object %v to zookeeper", new.GetObjectKind()) } - zookeeperlog.Info("validate update", "name", newZookeeper.Name) - - if newZookeeper.Status.ID == "" { - return zv.ValidateCreate(ctx, newZookeeper) + if newZookeeper.Annotations[models.ResourceStateAnnotation] == models.SyncingEvent { + return nil } if newZookeeper.Annotations[models.ExternalChangesAnnotation] == models.True { return nil } - if newZookeeper.Generation != oldZookeeper.Generation && !oldZookeeper.Spec.ClusterSettingsNeedUpdate(newZookeeper.Spec.Cluster) { + if newZookeeper.Status.ID == "" { + return zv.ValidateCreate(ctx, newZookeeper) + } + + zookeeperlog.Info("validate update", "name", newZookeeper.Name) + + if newZookeeper.Generation != oldZookeeper.Generation && !oldZookeeper.Spec.ClusterSettingsNeedUpdate(&newZookeeper.Spec.GenericClusterSpec) { return fmt.Errorf("update is not allowed") } diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index d46e85410..fc7fc4e07 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -3066,7 +3066,7 @@ func (in *Zookeeper) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZookeeperDataCentre) DeepCopyInto(out *ZookeeperDataCentre) { *out = *in - in.DataCentre.DeepCopyInto(&out.DataCentre) + in.GenericDataCentreSpec.DeepCopyInto(&out.GenericDataCentreSpec) if in.EnforceAuthSchemes != nil { in, out := &in.EnforceAuthSchemes, &out.EnforceAuthSchemes *out = make([]string, len(*in)) @@ -3084,6 +3084,33 @@ func (in *ZookeeperDataCentre) DeepCopy() *ZookeeperDataCentre { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ZookeeperDataCentreStatus) DeepCopyInto(out *ZookeeperDataCentreStatus) { + *out = *in + in.GenericDataCentreStatus.DeepCopyInto(&out.GenericDataCentreStatus) + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + (*in).DeepCopyInto(*out) + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ZookeeperDataCentreStatus. +func (in *ZookeeperDataCentreStatus) DeepCopy() *ZookeeperDataCentreStatus { + if in == nil { + return nil + } + out := new(ZookeeperDataCentreStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZookeeperList) DeepCopyInto(out *ZookeeperList) { *out = *in @@ -3119,7 +3146,7 @@ func (in *ZookeeperList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZookeeperSpec) DeepCopyInto(out *ZookeeperSpec) { *out = *in - in.Cluster.DeepCopyInto(&out.Cluster) + in.GenericClusterSpec.DeepCopyInto(&out.GenericClusterSpec) if in.DataCentres != nil { in, out := &in.DataCentres, &out.DataCentres *out = make([]*ZookeeperDataCentre, len(*in)) @@ -3146,7 +3173,18 @@ func (in *ZookeeperSpec) DeepCopy() *ZookeeperSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZookeeperStatus) DeepCopyInto(out *ZookeeperStatus) { *out = *in - in.ClusterStatus.DeepCopyInto(&out.ClusterStatus) + in.GenericStatus.DeepCopyInto(&out.GenericStatus) + if in.DataCentres != nil { + in, out := &in.DataCentres, &out.DataCentres + *out = make([]*ZookeeperDataCentreStatus, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(ZookeeperDataCentreStatus) + (*in).DeepCopyInto(*out) + } + } + } if in.DefaultUserSecretRef != nil { in, out := &in.DefaultUserSecretRef, &out.DefaultUserSecretRef *out = new(apiextensions.ObjectReference) diff --git a/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml b/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml index 5118677e6..89d0279c7 100644 --- a/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml +++ b/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml @@ -52,52 +52,138 @@ spec: items: properties: accountName: + default: INSTACLUSTR + description: For customers running in their own account. Your + provider account can be found on the Create Cluster page on + the Instaclustr Console, or the "Provider Account" property + on any existing cluster. For customers provisioning on Instaclustr's + cloud provider accounts, this property may be omitted. type: string - clientToServerEncryption: - type: boolean - cloudProvider: - type: string - cloudProviderSettings: + awsSettings: + description: AWS specific settings for the Data Centre. Cannot + be provided with GCP or Azure settings. items: properties: backupBucket: + description: Specify the S3 bucket to use for storing + backup data for the cluster data centre. Only available + for customers running in their own cloud provider accounts. + Currently supported for OpenSearch clusters only. type: string customVirtualNetworkId: + description: VPC ID into which the Data Centre will be + provisioned. The Data Centre's network allocation must + match the IPv4 CIDR block of the specified VPC. type: string - disableSnapshotAutoExpiry: - type: boolean - diskEncryptionKey: + encryptionKey: + description: ID of a KMS encryption key to encrypt data + on nodes. KMS encryption key must be set in Cluster + Resources through the Instaclustr Console before provisioning + an encrypted Data Centre. + type: string + type: object + maxItems: 1 + type: array + azureSettings: + description: Azure specific settings for the Data Centre. Cannot + be provided with AWS or GCP settings. + items: + properties: + customVirtualNetworkId: + description: VNet ID into which the Data Centre will be + provisioned. The VNet must have an available address + space for the Data Centre's network allocation to be + appended to the VNet. Currently supported for PostgreSQL + clusters only. type: string resourceGroup: + description: The name of the Azure Resource Group into + which the Data Centre will be provisioned. + type: string + storageNetwork: + description: 'The private network address block to be + used for the storage network. This is only used for + certain node sizes, currently limited to those which + use Azure NetApp Files: for all other node sizes, this + field should not be provided. The network must have + a prefix length between /16 and /28, and must be part + of a private address range.' type: string type: object + maxItems: 1 type: array + clientToServerEncryption: + type: boolean + cloudProvider: + description: Name of a cloud provider service. + type: string enforceAuthEnabled: type: boolean enforceAuthSchemes: items: type: string type: array + gcpSettings: + description: GCP specific settings for the Data Centre. Cannot + be provided with AWS or Azure settings. + items: + properties: + customVirtualNetworkId: + description: "Network name or a relative Network or Subnetwork + URI. The Data Centre's network allocation must match + the IPv4 CIDR block of the specified subnet. \n Examples: + Network URI: projects/{riyoa-gcp-project-name}/global/networks/{network-name}. + Network name: {network-name}, equivalent to projects/{riyoa-gcp-project-name}/global/networks/{network-name}. + Same-project subnetwork URI: projects/{riyoa-gcp-project-name}/regions/{region-id}/subnetworks/{subnetwork-name}. + Shared VPC subnetwork URI: projects/{riyoa-gcp-host-project-name}/regions/{region-id}/subnetworks/{subnetwork-name}." + type: string + disableSnapshotAutoExpiry: + description: Specify whether the GCS backup bucket should + automatically expire data after 7 days or not. Setting + this to true will disable automatic expiry and will + allow for creation of custom snapshot repositories with + customisable retention using the Index Management Plugin. + The storage will have to be manually cleared after the + cluster is deleted. Only available for customers running + in their own cloud provider accounts. Currently supported + for OpenSearch clusters only. + type: boolean + type: object + maxItems: 1 + type: array name: + description: A logical name for the data centre within a cluster. + These names must be unique in the cluster. type: string network: + description: The private network address block for the Data + Centre specified using CIDR address notation. The network + must have a prefix length between /12 and /22 and must be + part of a private address space. type: string nodeSize: type: string - nodesNumber: + numberOfNodes: type: integer region: + description: Region of the Data Centre. type: string tags: additionalProperties: type: string + description: List of tags to apply to the Data Centre. Tags + are metadata labels which allow you to identify, categorize + and filter clusters. This can be useful for grouping together + clusters into applications, environments, or any category + that you require. type: object required: - clientToServerEncryption - cloudProvider + - name - network - nodeSize - - nodesNumber + - numberOfNodes - region type: object type: array @@ -106,13 +192,7 @@ spec: name: description: Name [ 3 .. 32 ] characters. type: string - pciCompliance: - description: The PCI compliance standards relate to the security of - user data and transactional information. Can only be applied clusters - provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch - and Redis. - type: boolean - privateNetworkCluster: + privateNetwork: type: boolean slaTier: description: 'Non-production clusters may receive lower priority support @@ -141,15 +221,11 @@ spec: status: description: ZookeeperStatus defines the observed state of Zookeeper properties: - cdcid: - type: string currentClusterOperationStatus: type: string dataCentres: items: properties: - encryptionKeyId: - type: string id: type: string name: @@ -175,21 +251,6 @@ spec: type: string type: object type: array - nodesNumber: - type: integer - privateLink: - items: - properties: - advertisedHostname: - type: string - endPointServiceId: - type: string - endPointServiceName: - type: string - required: - - advertisedHostname - type: object - type: array resizeOperations: items: properties: @@ -246,6 +307,8 @@ spec: type: array status: type: string + required: + - nodes type: object type: array defaultUserSecretRef: @@ -347,19 +410,8 @@ spec: type: array type: object type: array - options: - properties: - dataNodeSize: - type: string - masterNodeSize: - type: string - openSearchDashboardsNodeSize: - type: string - type: object state: type: string - twoFactorDeleteEnabled: - type: boolean type: object type: object served: true diff --git a/config/samples/clusters_v1beta1_zookeeper.yaml b/config/samples/clusters_v1beta1_zookeeper.yaml index 570d4582e..7f6132325 100644 --- a/config/samples/clusters_v1beta1_zookeeper.yaml +++ b/config/samples/clusters_v1beta1_zookeeper.yaml @@ -3,7 +3,7 @@ kind: Zookeeper metadata: name: zookeeper-sample spec: - name: "username-zookeeper" + name: "bohdan-zookeeper" # description: "some description" dataCentres: - clientToServerEncryption: false @@ -12,8 +12,8 @@ spec: network: "10.0.0.0/16" nodeSize: "zookeeper-developer-t3.small-20" # nodeSize: "zookeeper-production-m5.large-60" - nodesNumber: 3 + numberOfNodes: 3 region: "US_EAST_1" - privateNetworkCluster: false + privateNetwork: false slaTier: "NON_PRODUCTION" version: "3.8.2" diff --git a/controllers/clusterresources/awsencryptionkey_controller.go b/controllers/clusterresources/awsencryptionkey_controller.go index f19f2cd19..631963afd 100644 --- a/controllers/clusterresources/awsencryptionkey_controller.go +++ b/controllers/clusterresources/awsencryptionkey_controller.go @@ -228,7 +228,7 @@ func (r *AWSEncryptionKeyReconciler) handleDelete( ) } - r.Scheduler.RemoveJob(encryptionKey.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(encryptionKey.GetJobID(scheduler.SyncJob)) patch := encryptionKey.NewPatch() controllerutil.RemoveFinalizer(encryptionKey, models.DeletionFinalizer) encryptionKey.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent @@ -265,7 +265,7 @@ func (r *AWSEncryptionKeyReconciler) handleDelete( func (r *AWSEncryptionKeyReconciler) startEncryptionKeyStatusJob(encryptionKey *v1beta1.AWSEncryptionKey) error { job := r.newWatchStatusJob(encryptionKey) - err := r.Scheduler.ScheduleJob(encryptionKey.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(encryptionKey.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -286,7 +286,7 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW "namespaced name", key, ) - r.Scheduler.RemoveJob(encryptionKey.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(encryptionKey.GetJobID(scheduler.SyncJob)) return nil } @@ -333,7 +333,7 @@ func (r *AWSEncryptionKeyReconciler) handleExternalDelete(ctx context.Context, k l.Info(instaclustr.MsgInstaclustrResourceNotFound) r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) - r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(key.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusterresources/awsendpointserviceprincipal_controller.go b/controllers/clusterresources/awsendpointserviceprincipal_controller.go index 3f5c9b783..e4fc26b8d 100644 --- a/controllers/clusterresources/awsendpointserviceprincipal_controller.go +++ b/controllers/clusterresources/awsendpointserviceprincipal_controller.go @@ -167,7 +167,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context return ctrl.Result{}, err } r.EventRecorder.Eventf(principal, models.Normal, models.Created, - "Status check job %s has been started", principal.GetJobID(scheduler.StatusChecker), + "Status check job %s has been started", principal.GetJobID(scheduler.SyncJob), ) return ctrl.Result{}, nil @@ -203,7 +203,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context func (r *AWSEndpointServicePrincipalReconciler) startWatchStatusJob(ctx context.Context, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error { job := r.newWatchStatusJob(ctx, resource) - return r.Scheduler.ScheduleJob(resource.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + return r.Scheduler.ScheduleJob(resource.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) } func (r *AWSEndpointServicePrincipalReconciler) newWatchStatusJob(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) scheduler.Job { @@ -218,7 +218,7 @@ func (r *AWSEndpointServicePrincipalReconciler) newWatchStatusJob(ctx context.Co "namespaced name", key, ) - r.Scheduler.RemoveJob(principal.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(principal.GetJobID(scheduler.SyncJob)) return nil } @@ -252,7 +252,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleExternalDelete(ctx context l.Info(instaclustr.MsgInstaclustrResourceNotFound) r.EventRecorder.Eventf(principal, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) - r.Scheduler.RemoveJob(principal.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(principal.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go index 0818463f1..f035ca80c 100644 --- a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go +++ b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go @@ -256,7 +256,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( ) } - r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.SyncJob)) controllerutil.RemoveFinalizer(firewallRule, models.DeletionFinalizer) firewallRule.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent err = r.Patch(ctx, firewallRule, patch) @@ -292,7 +292,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( func (r *AWSSecurityGroupFirewallRuleReconciler) startFirewallRuleStatusJob(firewallRule *v1beta1.AWSSecurityGroupFirewallRule) error { job := r.newWatchStatusJob(firewallRule) - err := r.Scheduler.ScheduleJob(firewallRule.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(firewallRule.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -313,7 +313,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule "namespaced name", key, ) - r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.SyncJob)) return nil } @@ -360,7 +360,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleExternalDelete(ctx contex l.Info(instaclustr.MsgInstaclustrResourceNotFound) r.EventRecorder.Eventf(rule, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) - r.Scheduler.RemoveJob(rule.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(rule.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusterresources/awsvpcpeering_controller.go b/controllers/clusterresources/awsvpcpeering_controller.go index 9d6d20d5d..e1a7d7466 100644 --- a/controllers/clusterresources/awsvpcpeering_controller.go +++ b/controllers/clusterresources/awsvpcpeering_controller.go @@ -362,7 +362,7 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( return ctrl.Result{}, err } - r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(aws.GetJobID(scheduler.SyncJob)) patch := aws.NewPatch() controllerutil.RemoveFinalizer(aws, models.DeletionFinalizer) @@ -398,7 +398,7 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( func (r *AWSVPCPeeringReconciler) startAWSVPCPeeringStatusJob(awsPeering *v1beta1.AWSVPCPeering) error { job := r.newWatchStatusJob(awsPeering) - err := r.Scheduler.ScheduleJob(awsPeering.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(awsPeering.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -418,7 +418,7 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe "namespaced name", namespacedName, ) - r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.SyncJob)) return nil } @@ -459,7 +459,7 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe "The AWSPeering was deleted on AWS, stopping job...", ) - r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.SyncJob)) return nil } @@ -546,7 +546,7 @@ func (r *AWSVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key l.Info(instaclustr.MsgInstaclustrResourceNotFound) r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) - r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(key.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusterresources/azurevnetpeering_controller.go b/controllers/clusterresources/azurevnetpeering_controller.go index 233a22675..c92144b19 100644 --- a/controllers/clusterresources/azurevnetpeering_controller.go +++ b/controllers/clusterresources/azurevnetpeering_controller.go @@ -247,7 +247,7 @@ func (r *AzureVNetPeeringReconciler) handleDeletePeering( } if status != nil { - r.Scheduler.RemoveJob(azure.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(azure.GetJobID(scheduler.SyncJob)) err = r.API.DeletePeering(azure.Status.ID, instaclustr.AzurePeeringEndpoint) if err != nil { l.Error(err, "cannot update Azure VNet Peering resource status", @@ -312,7 +312,7 @@ func (r *AzureVNetPeeringReconciler) startAzureVNetPeeringStatusJob(azurePeering ) error { job := r.newWatchStatusJob(azurePeering) - err := r.Scheduler.ScheduleJob(azurePeering.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(azurePeering.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -334,7 +334,7 @@ func (r *AzureVNetPeeringReconciler) newWatchStatusJob(azureVNetPeering *v1beta1 "namespaced name", key, ) - r.Scheduler.RemoveJob(azureVNetPeering.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(azureVNetPeering.GetJobID(scheduler.SyncJob)) return nil } @@ -382,7 +382,7 @@ func (r *AzureVNetPeeringReconciler) handleExternalDelete(ctx context.Context, k l.Info(instaclustr.MsgInstaclustrResourceNotFound) r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) - r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(key.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusterresources/clusternetworkfirewallrule_controller.go b/controllers/clusterresources/clusternetworkfirewallrule_controller.go index 287cec570..2baca470d 100644 --- a/controllers/clusterresources/clusternetworkfirewallrule_controller.go +++ b/controllers/clusterresources/clusternetworkfirewallrule_controller.go @@ -276,7 +276,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( ) } - r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.SyncJob)) controllerutil.RemoveFinalizer(firewallRule, models.DeletionFinalizer) firewallRule.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent err = r.Patch(ctx, firewallRule, patch) @@ -311,7 +311,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( func (r *ClusterNetworkFirewallRuleReconciler) startFirewallRuleStatusJob(firewallRule *v1beta1.ClusterNetworkFirewallRule) error { job := r.newWatchStatusJob(firewallRule) - err := r.Scheduler.ScheduleJob(firewallRule.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(firewallRule.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } diff --git a/controllers/clusterresources/gcpvpcpeering_controller.go b/controllers/clusterresources/gcpvpcpeering_controller.go index 61b5b7eab..0c269a0e1 100644 --- a/controllers/clusterresources/gcpvpcpeering_controller.go +++ b/controllers/clusterresources/gcpvpcpeering_controller.go @@ -257,7 +257,7 @@ func (r *GCPVPCPeeringReconciler) handleDeleteCluster( } patch := gcp.NewPatch() - r.Scheduler.RemoveJob(gcp.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(gcp.GetJobID(scheduler.SyncJob)) controllerutil.RemoveFinalizer(gcp, models.DeletionFinalizer) gcp.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent err = r.Patch(ctx, gcp, patch) @@ -295,7 +295,7 @@ func (r *GCPVPCPeeringReconciler) handleDeleteCluster( func (r *GCPVPCPeeringReconciler) startGCPVPCPeeringStatusJob(gcpPeering *v1beta1.GCPVPCPeering) error { job := r.newWatchStatusJob(gcpPeering) - err := r.Scheduler.ScheduleJob(gcpPeering.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(gcpPeering.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -316,7 +316,7 @@ func (r *GCPVPCPeeringReconciler) newWatchStatusJob(gcpPeering *v1beta1.GCPVPCPe "namespaced name", key, ) - r.Scheduler.RemoveJob(gcpPeering.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(gcpPeering.GetJobID(scheduler.SyncJob)) return nil } @@ -364,7 +364,7 @@ func (r *GCPVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key l.Info(instaclustr.MsgInstaclustrResourceNotFound) r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) - r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(key.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/cadence_controller.go b/controllers/clusters/cadence_controller.go index 872513845..15cd446e5 100644 --- a/controllers/clusters/cadence_controller.go +++ b/controllers/clusters/cadence_controller.go @@ -247,7 +247,7 @@ func (r *CadenceReconciler) handleCreateCluster( } r.EventRecorder.Event(c, models.Normal, models.Created, - "Cluster status check job is started") + "Cluster sync job is started") } return ctrl.Result{}, nil @@ -424,7 +424,7 @@ func (r *CadenceReconciler) handleDeleteCluster( } } - r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.SyncJob)) patch := c.NewPatch() controllerutil.RemoveFinalizer(c, models.DeletionFinalizer) c.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent @@ -689,7 +689,7 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra func (r *CadenceReconciler) startSyncJob(c *v1beta1.Cadence) error { job := r.newSyncJob(c) - err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -698,14 +698,14 @@ func (r *CadenceReconciler) startSyncJob(c *v1beta1.Cadence) error { } func (r *CadenceReconciler) newSyncJob(c *v1beta1.Cadence) scheduler.Job { - l := log.Log.WithValues("syncJob", c.GetJobID(scheduler.StatusChecker), "clusterID", c.Status.ID) + l := log.Log.WithValues("syncJob", c.GetJobID(scheduler.SyncJob), "clusterID", c.Status.ID) return func() error { namespacedName := client.ObjectKeyFromObject(c) err := r.Get(context.Background(), namespacedName, c) if k8serrors.IsNotFound(err) { l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) - r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.SyncJob)) return nil } if err != nil { @@ -1150,7 +1150,7 @@ func (r *CadenceReconciler) handleExternalDelete(ctx context.Context, c *v1beta1 r.EventRecorder.Eventf(c, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index cef870773..d97d788cb 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -264,7 +264,7 @@ func (r *CassandraReconciler) startClusterJobs(c *v1beta1.Cassandra, l logr.Logg r.EventRecorder.Eventf( c, models.Normal, models.Created, - "Cluster status check job is started", + "Cluster sync job is started", ) err = r.startClusterBackupsJob(c) @@ -517,7 +517,7 @@ func (r *CassandraReconciler) handleDeleteCluster( r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator)) r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.SyncJob)) l.Info("Deleting cluster backup resources", "cluster ID", c.Status.ID) @@ -600,7 +600,7 @@ func (r *CassandraReconciler) handleDeleteCluster( func (r *CassandraReconciler) startSyncJob(c *v1beta1.Cassandra) error { job := r.newSyncJob(c) - err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -643,7 +643,7 @@ func (r *CassandraReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cassandra, } func (r *CassandraReconciler) newSyncJob(c *v1beta1.Cassandra) scheduler.Job { - l := log.Log.WithValues("syncJob", c.GetJobID(scheduler.StatusChecker), "clusterID", c.Status.ID) + l := log.Log.WithValues("syncJob", c.GetJobID(scheduler.SyncJob), "clusterID", c.Status.ID) return func() error { namespacedName := client.ObjectKeyFromObject(c) @@ -653,7 +653,7 @@ func (r *CassandraReconciler) newSyncJob(c *v1beta1.Cassandra) scheduler.Job { "namespaced name", namespacedName) r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker)) r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator)) - r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.SyncJob)) return nil } @@ -1015,7 +1015,7 @@ func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1bet r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker)) r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator)) - r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/kafka_controller.go b/controllers/clusters/kafka_controller.go index b82b98a0b..037a38108 100644 --- a/controllers/clusters/kafka_controller.go +++ b/controllers/clusters/kafka_controller.go @@ -156,7 +156,7 @@ func (r *KafkaReconciler) startJobs(k *v1beta1.Kafka) error { r.EventRecorder.Eventf( k, models.Normal, models.Created, - "Cluster status check job is started", + "Cluster sync job is started", ) if k.Spec.UserRefs != nil && k.Status.AvailableUsers == nil { @@ -376,7 +376,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, k *v1beta1.Ka return reconcile.Result{}, err } - r.Scheduler.RemoveJob(k.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(k.GetJobID(scheduler.SyncJob)) r.Scheduler.RemoveJob(k.GetJobID(scheduler.UserCreator)) controllerutil.RemoveFinalizer(k, models.DeletionFinalizer) k.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent @@ -429,7 +429,7 @@ func (r *KafkaReconciler) startClusterOnPremisesIPsJob(k *v1beta1.Kafka, b *onPr func (r *KafkaReconciler) startSyncJob(kafka *v1beta1.Kafka) error { job := r.newSyncJob(kafka) - err := r.Scheduler.ScheduleJob(kafka.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(kafka.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -438,7 +438,7 @@ func (r *KafkaReconciler) startSyncJob(kafka *v1beta1.Kafka) error { } func (r *KafkaReconciler) newSyncJob(k *v1beta1.Kafka) scheduler.Job { - l := log.Log.WithValues("syncJob", k.GetJobID(scheduler.StatusChecker), "clusterID", k.Status.ID) + l := log.Log.WithValues("syncJob", k.GetJobID(scheduler.SyncJob), "clusterID", k.Status.ID) return func() error { namespacedName := client.ObjectKeyFromObject(k) @@ -446,7 +446,7 @@ func (r *KafkaReconciler) newSyncJob(k *v1beta1.Kafka) scheduler.Job { if k8serrors.IsNotFound(err) { l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) - r.Scheduler.RemoveJob(k.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(k.GetJobID(scheduler.SyncJob)) r.Scheduler.RemoveJob(k.GetJobID(scheduler.UserCreator)) r.Scheduler.RemoveJob(k.GetJobID(scheduler.BackupsChecker)) return nil @@ -655,7 +655,7 @@ func (r *KafkaReconciler) handleExternalDelete(ctx context.Context, k *v1beta1.K r.Scheduler.RemoveJob(k.GetJobID(scheduler.BackupsChecker)) r.Scheduler.RemoveJob(k.GetJobID(scheduler.UserCreator)) - r.Scheduler.RemoveJob(k.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(k.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/kafkaconnect_controller.go b/controllers/clusters/kafkaconnect_controller.go index 87985dd09..43f6c790c 100644 --- a/controllers/clusters/kafkaconnect_controller.go +++ b/controllers/clusters/kafkaconnect_controller.go @@ -362,7 +362,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 return reconcile.Result{}, err } - r.Scheduler.RemoveJob(kc.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(kc.GetJobID(scheduler.SyncJob)) controllerutil.RemoveFinalizer(kc, models.DeletionFinalizer) kc.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent err = r.Patch(ctx, kc, patch) @@ -467,7 +467,7 @@ func (r *KafkaConnectReconciler) startClusterOnPremisesIPsJob(k *v1beta1.KafkaCo func (r *KafkaConnectReconciler) startSyncJob(kc *v1beta1.KafkaConnect) error { job := r.newSyncJob(kc) - err := r.Scheduler.ScheduleJob(kc.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(kc.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -484,7 +484,7 @@ func (r *KafkaConnectReconciler) newSyncJob(kc *v1beta1.KafkaConnect) scheduler. if k8serrors.IsNotFound(err) { l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) - r.Scheduler.RemoveJob(kc.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(kc.GetJobID(scheduler.SyncJob)) return nil } if err != nil { @@ -673,7 +673,7 @@ func (r *KafkaConnectReconciler) handleExternalDelete(ctx context.Context, kc *v r.EventRecorder.Eventf(kc, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) r.Scheduler.RemoveJob(kc.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(kc.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(kc.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/opensearch_controller.go b/controllers/clusters/opensearch_controller.go index 696294cb9..c7fa5a41e 100644 --- a/controllers/clusters/opensearch_controller.go +++ b/controllers/clusters/opensearch_controller.go @@ -448,7 +448,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( r.Scheduler.RemoveJob(o.GetJobID(scheduler.UserCreator)) r.Scheduler.RemoveJob(o.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(o.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(o.GetJobID(scheduler.SyncJob)) logger.Info("Deleting cluster backup resources", "cluster ID", o.Status.ID, @@ -518,7 +518,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( func (r *OpenSearchReconciler) startClusterSyncJob(cluster *v1beta1.OpenSearch) error { job := r.newSyncJob(cluster) - err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -549,7 +549,7 @@ func (r *OpenSearchReconciler) startUsersCreationJob(cluster *v1beta1.OpenSearch } func (r *OpenSearchReconciler) newSyncJob(o *v1beta1.OpenSearch) scheduler.Job { - l := log.Log.WithValues("syncJob", o.GetJobID(scheduler.StatusChecker), "clusterID", o.Status.ID) + l := log.Log.WithValues("syncJob", o.GetJobID(scheduler.SyncJob), "clusterID", o.Status.ID) return func() error { namespacedName := client.ObjectKeyFromObject(o) @@ -559,7 +559,7 @@ func (r *OpenSearchReconciler) newSyncJob(o *v1beta1.OpenSearch) scheduler.Job { "namespaced name", namespacedName) r.Scheduler.RemoveJob(o.GetJobID(scheduler.UserCreator)) r.Scheduler.RemoveJob(o.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(o.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(o.GetJobID(scheduler.SyncJob)) return nil } if err != nil { @@ -993,7 +993,7 @@ func (r *OpenSearchReconciler) handleExternalDelete(ctx context.Context, o *v1be r.Scheduler.RemoveJob(o.GetJobID(scheduler.BackupsChecker)) r.Scheduler.RemoveJob(o.GetJobID(scheduler.UserCreator)) - r.Scheduler.RemoveJob(o.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(o.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/postgresql_controller.go b/controllers/clusters/postgresql_controller.go index 6d2c50b73..f7f9c9ba7 100644 --- a/controllers/clusters/postgresql_controller.go +++ b/controllers/clusters/postgresql_controller.go @@ -255,7 +255,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( r.EventRecorder.Eventf( pg, models.Normal, models.Created, - "Cluster status check job is started", + "Cluster sync job is started", ) if pg.Spec.DataCentres[0].CloudProvider == models.ONPREMISES { @@ -549,7 +549,7 @@ func (r *PostgreSQLReconciler) handleDeleteCluster( ) r.Scheduler.RemoveJob(pg.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(pg.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(pg.GetJobID(scheduler.SyncJob)) controllerutil.RemoveFinalizer(pg, models.DeletionFinalizer) pg.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent @@ -716,7 +716,7 @@ func (r *PostgreSQLReconciler) startClusterOnPremisesIPsJob(pg *v1beta1.PostgreS func (r *PostgreSQLReconciler) startClusterStatusJob(pg *v1beta1.PostgreSQL) error { job := r.newWatchStatusJob(pg) - err := r.Scheduler.ScheduleJob(pg.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(pg.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -745,7 +745,7 @@ func (r *PostgreSQLReconciler) newWatchStatusJob(pg *v1beta1.PostgreSQL) schedul l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) r.Scheduler.RemoveJob(pg.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(pg.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(pg.GetJobID(scheduler.SyncJob)) r.Scheduler.RemoveJob(pg.GetJobID(scheduler.UserCreator)) return nil } @@ -1388,7 +1388,7 @@ func (r *PostgreSQLReconciler) handleExternalDelete(ctx context.Context, pg *v1b r.Scheduler.RemoveJob(pg.GetJobID(scheduler.BackupsChecker)) r.Scheduler.RemoveJob(pg.GetJobID(scheduler.UserCreator)) - r.Scheduler.RemoveJob(pg.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(pg.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/redis_controller.go b/controllers/clusters/redis_controller.go index 89eda25db..68b451d8c 100644 --- a/controllers/clusters/redis_controller.go +++ b/controllers/clusters/redis_controller.go @@ -473,7 +473,7 @@ func (r *RedisReconciler) handleDeleteCluster( } } - r.Scheduler.RemoveJob(redis.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(redis.GetJobID(scheduler.SyncJob)) r.Scheduler.RemoveJob(redis.GetJobID(scheduler.BackupsChecker)) l.Info("Deleting cluster backup resources", @@ -567,7 +567,7 @@ func (r *RedisReconciler) startClusterOnPremisesIPsJob(redis *v1beta1.Redis, b * func (r *RedisReconciler) startSyncJob(cluster *v1beta1.Redis) error { job := r.newSyncJob(cluster) - err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -632,7 +632,7 @@ func (r *RedisReconciler) newUsersCreationJob(redis *v1beta1.Redis) scheduler.Jo } func (r *RedisReconciler) newSyncJob(redis *v1beta1.Redis) scheduler.Job { - l := log.Log.WithValues("syncJob", redis.GetJobID(scheduler.StatusChecker), "clusterID", redis.Status.ID) + l := log.Log.WithValues("syncJob", redis.GetJobID(scheduler.SyncJob), "clusterID", redis.Status.ID) return func() error { namespacedName := client.ObjectKeyFromObject(redis) @@ -642,7 +642,7 @@ func (r *RedisReconciler) newSyncJob(redis *v1beta1.Redis) scheduler.Job { "namespaced name", namespacedName) r.Scheduler.RemoveJob(redis.GetJobID(scheduler.UserCreator)) r.Scheduler.RemoveJob(redis.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(redis.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(redis.GetJobID(scheduler.SyncJob)) return nil } @@ -1022,7 +1022,7 @@ func (r *RedisReconciler) handleExternalDelete(ctx context.Context, redis *v1bet r.Scheduler.RemoveJob(redis.GetJobID(scheduler.BackupsChecker)) r.Scheduler.RemoveJob(redis.GetJobID(scheduler.UserCreator)) - r.Scheduler.RemoveJob(redis.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(redis.GetJobID(scheduler.SyncJob)) return nil } diff --git a/controllers/clusters/zookeeper_controller.go b/controllers/clusters/zookeeper_controller.go index 258edc559..825739a42 100644 --- a/controllers/clusters/zookeeper_controller.go +++ b/controllers/clusters/zookeeper_controller.go @@ -18,7 +18,9 @@ package clusters import ( "context" + "encoding/json" "errors" + "fmt" "github.com/go-logr/logr" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -101,6 +103,52 @@ func (r *ZookeeperReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } +func (r *ZookeeperReconciler) createCluster(ctx context.Context, zook *v1beta1.Zookeeper, l logr.Logger) error { + l.Info("Creating zookeeper cluster", + "cluster name", zook.Spec.Name, + "data centres", zook.Spec.DataCentres) + + b, err := r.API.CreateClusterRaw(instaclustr.ZookeeperEndpoint, zook.Spec.ToInstAPI()) + if err != nil { + return fmt.Errorf("failed to create zookeeper cluster, err: %w", err) + } + + var instaModel models.ZookeeperCluster + err = json.Unmarshal(b, &instaModel) + if err != nil { + return fmt.Errorf("failed to unmarshal body to models.ZookeeperCluster, err: %w", err) + } + + patch := zook.NewPatch() + + zook.Spec.FromInstAPI(&instaModel) + zook.Annotations[models.ResourceStateAnnotation] = models.SyncingEvent + err = r.Patch(ctx, zook, patch) + if err != nil { + return fmt.Errorf("failed to patch cluster spec, err: %w", err) + } + + zook.Status.FromInstAPI(&instaModel) + err = r.Status().Patch(ctx, zook, patch) + if err != nil { + return fmt.Errorf("failed to patch cluster status, err: %w", err) + } + + l.Info("Zookeeper cluster has been created", "cluster ID", zook.Status.ID) + r.EventRecorder.Eventf( + zook, models.Normal, models.Created, + "Cluster creation request is sent. Cluster ID: %s", + zook.Status.ID, + ) + + err = r.createDefaultSecret(ctx, zook, l) + if err != nil { + return err + } + + return nil +} + func (r *ZookeeperReconciler) handleCreateCluster( ctx context.Context, zook *v1beta1.Zookeeper, @@ -110,41 +158,17 @@ func (r *ZookeeperReconciler) handleCreateCluster( l = l.WithName("Creation Event") if zook.Status.ID == "" { - l.Info("Creating zookeeper cluster", - "cluster name", zook.Spec.Name, - "data centres", zook.Spec.DataCentres) - - patch := zook.NewPatch() - - zook.Status.ID, err = r.API.CreateCluster(instaclustr.ZookeeperEndpoint, zook.Spec.ToInstAPI()) - if err != nil { - l.Error(err, "Cannot create zookeeper cluster", "spec", zook.Spec) - r.EventRecorder.Eventf( - zook, models.Warning, models.CreationFailed, - "Cluster creation on the Instaclustr is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - r.EventRecorder.Eventf( - zook, models.Normal, models.Created, - "Cluster creation request is sent. Cluster ID: %s", - zook.Status.ID, - ) - - err = r.Status().Patch(ctx, zook, patch) + err := r.createCluster(ctx, zook, l) if err != nil { - l.Error(err, "Cannot patch zookeeper cluster status from the Instaclustr API", - "spec", zook.Spec) - r.EventRecorder.Eventf( - zook, models.Warning, models.PatchFailed, - "Cluster resource status patch is failed. Reason: %v", - err, + r.EventRecorder.Eventf(zook, models.Warning, models.CreationFailed, + "Failed to create Zookeeper cluster. Reason: %v", err, ) return reconcile.Result{}, err } + } + if zook.Status.State != models.DeletedStatus { + patch := zook.NewPatch() zook.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent controllerutil.AddFinalizer(zook, models.DeletionFinalizer) err = r.Patch(ctx, zook, patch) @@ -158,26 +182,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( return reconcile.Result{}, err } - l.Info("Zookeeper cluster has been created", "cluster ID", zook.Status.ID) - - err = r.createDefaultSecret(ctx, zook, l) - if err != nil { - l.Error(err, "Cannot create default secret for Zookeeper cluster", - "cluster name", zook.Spec.Name, - "clusterID", zook.Status.ID, - ) - r.EventRecorder.Eventf( - zook, models.Warning, models.CreationFailed, - "Default user secret creation on the Instaclustr is failed. Reason: %v", - err, - ) - - return reconcile.Result{}, err - } - } - - if zook.Status.State != models.DeletedStatus { - err = r.startClusterStatusJob(zook) + err = r.startClusterSyncJob(zook) if err != nil { l.Error(err, "Cannot start cluster status job", "zookeeper cluster ID", zook.Status.ID) @@ -191,7 +196,7 @@ func (r *ZookeeperReconciler) handleCreateCluster( r.EventRecorder.Eventf( zook, models.Normal, models.Created, - "Cluster status check job is started", + "Cluster sync job is started", ) } @@ -253,24 +258,21 @@ func (r *ZookeeperReconciler) handleUpdateCluster( ) (reconcile.Result, error) { l = l.WithName("Update Event") - iData, err := r.API.GetZookeeper(zook.Status.ID) + instaModel, err := r.API.GetZookeeper(zook.Status.ID) if err != nil { l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", zook.Status.ID) return reconcile.Result{}, err } - iZook, err := zook.FromInstAPI(iData) - if err != nil { - l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", zook.Status.ID) - return reconcile.Result{}, err - } + iZook := &v1beta1.Zookeeper{} + iZook.FromInstAPI(instaModel) if zook.Annotations[models.ExternalChangesAnnotation] == models.True || r.RateLimiter.NumRequeues(req) == rlimiter.DefaultMaxTries { return handleExternalChanges[v1beta1.ZookeeperSpec](r.EventRecorder, r.Client, zook, iZook, l) } - if zook.Spec.ClusterSettingsNeedUpdate(iZook.Spec.Cluster) { + if zook.Spec.ClusterSettingsNeedUpdate(&iZook.Spec.GenericClusterSpec) { l.Info("Updating cluster settings", "instaclustr description", iZook.Spec.Description, "instaclustr two factor delete", iZook.Spec.TwoFactorDelete) @@ -300,7 +302,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error(err, "Cannot get zookeeper cluster", "cluster name", zook.Spec.Name, - "status", zook.Status.ClusterStatus.State) + "status", zook.Status.GenericStatus.State) r.EventRecorder.Eventf( zook, models.Warning, models.FetchFailed, "Cluster resource fetch from the Instaclustr API is failed. Reason: %v", @@ -366,7 +368,7 @@ func (r *ZookeeperReconciler) handleDeleteCluster( return reconcile.Result{}, err } - r.Scheduler.RemoveJob(zook.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(zook.GetJobID(scheduler.SyncJob)) controllerutil.RemoveFinalizer(zook, models.DeletionFinalizer) zook.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent err = r.Patch(ctx, zook, patch) @@ -403,10 +405,10 @@ func (r *ZookeeperReconciler) handleDeleteCluster( return models.ExitReconcile, nil } -func (r *ZookeeperReconciler) startClusterStatusJob(Zookeeper *v1beta1.Zookeeper) error { - job := r.newWatchStatusJob(Zookeeper) +func (r *ZookeeperReconciler) startClusterSyncJob(Zookeeper *v1beta1.Zookeeper) error { + job := r.newSyncJob(Zookeeper) - err := r.Scheduler.ScheduleJob(Zookeeper.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(Zookeeper.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -414,15 +416,16 @@ func (r *ZookeeperReconciler) startClusterStatusJob(Zookeeper *v1beta1.Zookeeper return nil } -func (r *ZookeeperReconciler) newWatchStatusJob(zook *v1beta1.Zookeeper) scheduler.Job { - l := log.Log.WithValues("component", "ZookeeperStatusClusterJob") +func (r *ZookeeperReconciler) newSyncJob(zook *v1beta1.Zookeeper) scheduler.Job { + l := log.Log.WithValues("syncJob", zook.GetJobID(scheduler.SyncJob), "clusterID", zook.Status.ID) + return func() error { namespacedName := client.ObjectKeyFromObject(zook) err := r.Get(context.Background(), namespacedName, zook) if k8serrors.IsNotFound(err) { l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) - r.Scheduler.RemoveJob(zook.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(zook.GetJobID(scheduler.SyncJob)) return nil } if err != nil { @@ -431,7 +434,7 @@ func (r *ZookeeperReconciler) newWatchStatusJob(zook *v1beta1.Zookeeper) schedul return err } - iData, err := r.API.GetZookeeper(zook.Status.ID) + instaModel, err := r.API.GetZookeeper(zook.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { if zook.DeletionTimestamp != nil { @@ -447,21 +450,16 @@ func (r *ZookeeperReconciler) newWatchStatusJob(zook *v1beta1.Zookeeper) schedul return err } - iZook, err := zook.FromInstAPI(iData) - if err != nil { - l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", zook.Status.ID) - return err - } + iZook := &v1beta1.Zookeeper{} + iZook.FromInstAPI(instaModel) - if !areStatusesEqual(&zook.Status.ClusterStatus, &iZook.Status.ClusterStatus) { - l.Info("Updating Zookeeper status", - "instaclustr status", iZook.Status, - "status", zook.Status) + if !zook.Status.Equals(&iZook.Status) { + l.Info("Updating Zookeeper status") - areDCsEqual := areDataCentresEqual(iZook.Status.ClusterStatus.DataCentres, zook.Status.ClusterStatus.DataCentres) + areDCsEqual := zook.Status.DCsEquals(iZook.Status.DataCentres) patch := zook.NewPatch() - zook.Status.ClusterStatus = iZook.Status.ClusterStatus + zook.Status.FromInstAPI(instaModel) err = r.Status().Patch(context.Background(), zook, patch) if err != nil { l.Error(err, "Cannot patch Zookeeper cluster", @@ -473,14 +471,14 @@ func (r *ZookeeperReconciler) newWatchStatusJob(zook *v1beta1.Zookeeper) schedul if !areDCsEqual { var nodes []*v1beta1.Node - for _, dc := range iZook.Status.ClusterStatus.DataCentres { + for _, dc := range iZook.Status.DataCentres { nodes = append(nodes, dc.Nodes...) } err = exposeservice.Create(r.Client, zook.Name, zook.Namespace, - zook.Spec.PrivateNetworkCluster, + zook.Spec.PrivateNetwork, nodes, models.ZookeeperConnectionPort) if err != nil { @@ -552,7 +550,7 @@ func (r *ZookeeperReconciler) handleExternalDelete(ctx context.Context, zook *v1 r.EventRecorder.Eventf(zook, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) r.Scheduler.RemoveJob(zook.GetJobID(scheduler.BackupsChecker)) - r.Scheduler.RemoveJob(zook.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(zook.GetJobID(scheduler.SyncJob)) return nil } @@ -577,6 +575,10 @@ func (r *ZookeeperReconciler) SetupWithManager(mgr ctrl.Manager) error { return true } + if newObj.Status.ID == "" && newObj.Annotations[models.ResourceStateAnnotation] == models.SyncingEvent { + return false + } + if newObj.Status.ID == "" { newObj.Annotations[models.ResourceStateAnnotation] = models.CreatingEvent return true @@ -607,7 +609,7 @@ func (r *ZookeeperReconciler) reconcileMaintenanceEvents(ctx context.Context, z return err } - if !z.Status.AreMaintenanceEventStatusesEqual(iMEStatuses) { + if !z.Status.MaintenanceEventsEqual(iMEStatuses) { patch := z.NewPatch() z.Status.MaintenanceEvents = iMEStatuses err = r.Status().Patch(ctx, z, patch) diff --git a/controllers/kafkamanagement/mirror_controller.go b/controllers/kafkamanagement/mirror_controller.go index 4b5677de0..752d8124b 100644 --- a/controllers/kafkamanagement/mirror_controller.go +++ b/controllers/kafkamanagement/mirror_controller.go @@ -231,7 +231,7 @@ func (r *MirrorReconciler) handleDeleteMirror( } patch := mirror.NewPatch() - r.Scheduler.RemoveJob(mirror.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(mirror.GetJobID(scheduler.SyncJob)) mirror.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent controllerutil.RemoveFinalizer(mirror, models.DeletionFinalizer) err = r.Patch(ctx, mirror, patch) @@ -256,7 +256,7 @@ func (r *MirrorReconciler) handleDeleteMirror( func (r *MirrorReconciler) startClusterStatusJob(mirror *v1beta1.Mirror) error { job := r.newWatchStatusJob(mirror) - err := r.Scheduler.ScheduleJob(mirror.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) + err := r.Scheduler.ScheduleJob(mirror.GetJobID(scheduler.SyncJob), scheduler.ClusterStatusInterval, job) if err != nil { return err } @@ -275,7 +275,7 @@ func (r *MirrorReconciler) newWatchStatusJob(mirror *v1beta1.Mirror) scheduler.J r.EventRecorder.Eventf(mirror, models.Normal, models.Deleted, "Mirror is not found in the k8s cluster. Closing Instaclustr status sync.") - r.Scheduler.RemoveJob(mirror.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(mirror.GetJobID(scheduler.SyncJob)) return nil } if err != nil { diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index 34a6896cd..870c9c91f 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -504,7 +504,7 @@ func (c *Client) UpdateKafkaConnect(id string, kc models.KafkaConnectAPIUpdate) return nil } -func (c *Client) GetZookeeper(id string) ([]byte, error) { +func (c *Client) GetZookeeper(id string) (*models.ZookeeperCluster, error) { url := c.serverHostname + ZookeeperEndpoint + id resp, err := c.DoRequest(url, http.MethodGet, nil) @@ -526,7 +526,13 @@ func (c *Client) GetZookeeper(id string) ([]byte, error) { return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } - return body, nil + var cluster models.ZookeeperCluster + err = json.Unmarshal(body, &cluster) + if err != nil { + return nil, err + } + + return &cluster, nil } func (c *Client) UpdateDescriptionAndTwoFactorDelete(clusterEndpoint, clusterID, description string, twoFactorDelete *v1beta1.TwoFactorDelete) error { diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 70f751bab..bea3d30d4 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -84,7 +84,7 @@ type API interface { GetKafka(id string) (*models.KafkaCluster, error) GetKafkaConnect(id string) (*models.KafkaConnectCluster, error) UpdateKafkaConnect(id string, kc models.KafkaConnectAPIUpdate) error - GetZookeeper(id string) ([]byte, error) + GetZookeeper(id string) (*models.ZookeeperCluster, error) RestoreCluster(restoreData any, clusterKind string) (string, error) GetPostgreSQL(id string) ([]byte, error) UpdatePostgreSQL(id string, r *models.PGClusterUpdate) error diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 3860e9f2b..1a68431b4 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -314,7 +314,7 @@ func (c *mockClient) GetKafkaConnect(id string) (*models.KafkaConnectCluster, er panic("GetKafkaConnect: is not implemented") } -func (c *mockClient) GetZookeeper(id string) ([]byte, error) { +func (c *mockClient) GetZookeeper(id string) (*models.ZookeeperCluster, error) { panic("GetZookeeper: is not implemented") } diff --git a/pkg/models/zookeeper_apiv2.go b/pkg/models/zookeeper_apiv2.go index 1645b6184..17d86293d 100644 --- a/pkg/models/zookeeper_apiv2.go +++ b/pkg/models/zookeeper_apiv2.go @@ -17,21 +17,19 @@ limitations under the License. package models type ZookeeperCluster struct { - ID string `json:"id,omitempty"` - Name string `json:"name"` - ZookeeperVersion string `json:"zookeeperVersion,omitempty"` - CurrentClusterOperationStatus string `json:"currentClusterOperationStatus,omitempty"` - Status string `json:"status,omitempty"` - PrivateNetworkCluster bool `json:"privateNetworkCluster"` - SLATier string `json:"slaTier"` - TwoFactorDelete []*TwoFactorDelete `json:"twoFactorDelete,omitempty"` - DataCentres []*ZookeeperDataCentre `json:"dataCentres"` - Description string `json:"description,omitempty"` + GenericClusterFields `json:",inline"` + + ZookeeperVersion string `json:"zookeeperVersion,omitempty"` + DataCentres []*ZookeeperDataCentre `json:"dataCentres"` } type ZookeeperDataCentre struct { - DataCentre `json:",inline"` + GenericDataCentreFields `json:",inline"` + + NumberOfNodes int `json:"numberOfNodes"` + NodeSize string `json:"nodeSize"` ClientToServerEncryption bool `json:"clientToServerEncryption"` - EnforceAuthSchemes []string `json:"enforceAuthSchemes,omitempty"` EnforceAuthEnabled bool `json:"enforceAuthEnabled"` + EnforceAuthSchemes []string `json:"enforceAuthSchemes,omitempty"` + Nodes []*Node `json:"nodes,omitempty"` } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2c0d5cbd2..0f1523667 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -29,7 +29,7 @@ var ClusterBackupsInterval time.Duration var UserCreationInterval time.Duration const ( - StatusChecker = "statusChecker" + SyncJob = "sync" BackupsChecker = "backupsChecker" UserCreator = "userCreator" OnPremisesIPsChecker = "onPremisesIPsChecker"