Skip to content

Commit

Permalink
issue-620, added debezium feature to cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and taaraora committed Dec 7, 2023
1 parent 8dd9a0e commit 3fe781e
Show file tree
Hide file tree
Showing 18 changed files with 155 additions and 137 deletions.
107 changes: 62 additions & 45 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ import (
"github.com/instaclustr/operator/pkg/models"
)

type Spark struct {
Version string `json:"version"`
}

type CassandraRestoreFrom struct {
// Original cluster ID. Backup from that cluster will be used for restore
ClusterID string `json:"clusterID"`
Expand Down Expand Up @@ -60,7 +56,6 @@ type CassandraSpec struct {
DataCentres []*CassandraDataCentre `json:"dataCentres,omitempty"`
LuceneEnabled bool `json:"luceneEnabled,omitempty"`
PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"`
Spark []*Spark `json:"spark,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
UserRefs References `json:"userRefs,omitempty"`
//+kubebuilder:validate:MaxItems:=1
Expand All @@ -79,6 +74,47 @@ type CassandraDataCentre struct {
PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"`
ClientToClusterEncryption bool `json:"clientToClusterEncryption"`
ReplicationFactor int `json:"replicationFactor"`

// Adds the specified version of Debezium Connector Cassandra to the Cassandra cluster
// +kubebuilder:validation:MaxItems=1
Debezium []DebeziumCassandraSpec `json:"debezium,omitempty"`
}

type DebeziumCassandraSpec struct {
// KafkaVPCType with only VPC_PEERED supported
KafkaVPCType string `json:"kafkaVpcType"`
KafkaTopicPrefix string `json:"kafkaTopicPrefix"`
KafkaDataCentreID string `json:"kafkaCdcId"`
Version string `json:"version"`
}

func (d *CassandraDataCentre) DebeziumToInstAPI() []*models.Debezium {
var instDebezium []*models.Debezium
for _, k8sDebezium := range d.Debezium {
instDebezium = append(instDebezium, &models.Debezium{
KafkaVPCType: k8sDebezium.KafkaVPCType,
KafkaTopicPrefix: k8sDebezium.KafkaTopicPrefix,
KafkaDataCentreID: k8sDebezium.KafkaDataCentreID,
Version: k8sDebezium.Version,
})
}
return instDebezium
}

func (d *CassandraDataCentre) DebeziumEquals(other *CassandraDataCentre) bool {
if len(d.Debezium) != len(other.Debezium) {
return false
}

for _, old := range d.Debezium {
for _, new := range other.Debezium {
if old != new {
return false
}
}
}

return true
}

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -214,10 +250,6 @@ func (cs *CassandraSpec) validateUpdate(oldSpec CassandraSpec) error {
if err != nil {
return err
}
err = validateSpark(cs.Spark, oldSpec.Spark)
if err != nil {
return err
}

for _, dc := range cs.DataCentres {
err = cs.validateResizeSettings(dc.NodesNumber)
Expand Down Expand Up @@ -263,6 +295,10 @@ func (cs *CassandraSpec) validateDataCentresUpdate(oldSpec CassandraSpec) error
return err
}

if !oldDC.DebeziumEquals(newDC) {
return models.ErrDebeziumImmutable
}

exists = true
break
}
Expand Down Expand Up @@ -309,12 +345,24 @@ func (cs *CassandraSpec) FromInstAPI(iCass *models.CassandraCluster) CassandraSp
DataCentres: cs.DCsFromInstAPI(iCass.DataCentres),
LuceneEnabled: iCass.LuceneEnabled,
PasswordAndUserAuth: iCass.PasswordAndUserAuth,
Spark: cs.SparkFromInstAPI(iCass.Spark),
BundledUseOnly: iCass.BundledUseOnly,
ResizeSettings: resizeSettingsFromInstAPI(iCass.ResizeSettings),
}
}

func (cs *CassandraSpec) DebeziumFromInstAPI(iDebeziums []*models.Debezium) (dcs []DebeziumCassandraSpec) {
var debeziums []DebeziumCassandraSpec
for _, iDebezium := range iDebeziums {
debeziums = append(debeziums, DebeziumCassandraSpec{
KafkaVPCType: iDebezium.KafkaVPCType,
KafkaTopicPrefix: iDebezium.KafkaTopicPrefix,
KafkaDataCentreID: iDebezium.KafkaDataCentreID,
Version: iDebezium.Version,
})
}
return debeziums
}

func (cs *CassandraSpec) DCsFromInstAPI(iDCs []*models.CassandraDataCentre) (dcs []*CassandraDataCentre) {
for _, iDC := range iDCs {
dcs = append(dcs, &CassandraDataCentre{
Expand All @@ -323,15 +371,7 @@ func (cs *CassandraSpec) DCsFromInstAPI(iDCs []*models.CassandraDataCentre) (dcs
PrivateIPBroadcastForDiscovery: iDC.PrivateIPBroadcastForDiscovery,
ClientToClusterEncryption: iDC.ClientToClusterEncryption,
ReplicationFactor: iDC.ReplicationFactor,
})
}
return
}

func (cs *CassandraSpec) SparkFromInstAPI(iSparks []*models.Spark) (sparks []*Spark) {
for _, iSpark := range iSparks {
sparks = append(sparks, &Spark{
Version: iSpark.Version,
Debezium: cs.DebeziumFromInstAPI(iDC.Debezium),
})
}
return
Expand All @@ -350,7 +390,6 @@ func (cs *CassandraSpec) ToInstAPI() *models.CassandraCluster {
CassandraVersion: cs.Version,
LuceneEnabled: cs.LuceneEnabled,
PasswordAndUserAuth: cs.PasswordAndUserAuth,
Spark: cs.SparkToInstAPI(),
DataCentres: cs.DCsToInstAPI(),
SLATier: cs.SLATier,
PrivateNetworkCluster: cs.PrivateNetworkCluster,
Expand Down Expand Up @@ -382,21 +421,11 @@ func (c *Cassandra) RestoreInfoToInstAPI(restoreData *CassandraRestoreFrom) any
return iRestore
}

func (cs *CassandraSpec) SparkToInstAPI() (iSparks []*models.Spark) {
for _, spark := range cs.Spark {
iSparks = append(iSparks, &models.Spark{
Version: spark.Version,
})
}
return
}

func (cs *CassandraSpec) IsEqual(spec CassandraSpec) bool {
return cs.Cluster.IsEqual(spec.Cluster) &&
cs.AreDCsEqual(spec.DataCentres) &&
cs.LuceneEnabled == spec.LuceneEnabled &&
cs.PasswordAndUserAuth == spec.PasswordAndUserAuth &&
cs.IsSparkEqual(spec.Spark) &&
cs.BundledUseOnly == spec.BundledUseOnly
}

Expand All @@ -416,21 +445,8 @@ func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool {
iDC.ClientToClusterEncryption != dataCentre.ClientToClusterEncryption ||
iDC.PrivateIPBroadcastForDiscovery != dataCentre.PrivateIPBroadcastForDiscovery ||
iDC.ContinuousBackup != dataCentre.ContinuousBackup ||
iDC.ReplicationFactor != dataCentre.ReplicationFactor {
return false
}
}

return true
}

func (cs *CassandraSpec) IsSparkEqual(sparks []*Spark) bool {
if len(cs.Spark) != len(sparks) {
return false
}

for i, spark := range sparks {
if cs.Spark[i].Version != spark.Version {
iDC.ReplicationFactor != dataCentre.ReplicationFactor ||
!dataCentre.DebeziumEquals(iDC) {
return false
}
}
Expand Down Expand Up @@ -464,6 +480,7 @@ func (cdc *CassandraDataCentre) ToInstAPI() *models.CassandraDataCentre {
ContinuousBackup: cdc.ContinuousBackup,
PrivateIPBroadcastForDiscovery: cdc.PrivateIPBroadcastForDiscovery,
ReplicationFactor: cdc.ReplicationFactor,
Debezium: cdc.DebeziumToInstAPI(),
}
}

Expand Down
11 changes: 0 additions & 11 deletions apis/clusters/v1beta1/cassandra_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob
return err
}

if len(c.Spec.Spark) > 1 {
return fmt.Errorf("spark should not have more than 1 item")
}

appVersions, err := cv.API.ListAppVersions(models.CassandraAppKind)
if err != nil {
return fmt.Errorf("cannot list versions for kind: %v, err: %w",
Expand All @@ -102,13 +98,6 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob
return err
}

for _, spark := range c.Spec.Spark {
err = validateAppVersion(appVersions, models.SparkAppType, spark.Version)
if err != nil {
return err
}
}

if len(c.Spec.DataCentres) == 0 {
return fmt.Errorf("data centres field is empty")
}
Expand Down
1 change: 0 additions & 1 deletion apis/clusters/v1beta1/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ type Cluster struct {

// The PCI compliance standards relate to the security of user data and transactional information.
// Can only be applied clusters provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch and Redis.
// PCI compliance cannot be enabled if the cluster has Spark.
PCICompliance bool `json:"pciCompliance,omitempty"`

PrivateNetworkCluster bool `json:"privateNetworkCluster,omitempty"`
Expand Down
12 changes: 0 additions & 12 deletions apis/clusters/v1beta1/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,6 @@ func validateTwoFactorDelete(new, old []*TwoFactorDelete) error {
return nil
}

func validateSpark(new, old []*Spark) error {
if len(old) != len(new) {
return models.ErrImmutableSpark
}
if len(old) != 0 &&
*old[0] != *new[0] {
return models.ErrImmutableSpark
}

return nil
}

func validateTagsUpdate(new, old map[string]string) error {
if len(old) != len(new) {
return models.ErrImmutableTags
Expand Down
46 changes: 20 additions & 26 deletions apis/clusters/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion config/crd/bases/clusters.instaclustr.com_cadences.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ spec:
description: The PCI compliance standards relate to the security of
user data and transactional information. Can only be applied clusters
provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch
and Redis. PCI compliance cannot be enabled if the cluster has Spark.
and Redis.
type: boolean
privateNetworkCluster:
type: boolean
Expand Down
33 changes: 23 additions & 10 deletions config/crd/bases/clusters.instaclustr.com_cassandras.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,28 @@ spec:
type: array
continuousBackup:
type: boolean
debezium:
description: Adds the specified version of Debezium Connector
Cassandra to the Cassandra cluster
items:
properties:
kafkaCdcId:
type: string
kafkaTopicPrefix:
type: string
kafkaVpcType:
description: KafkaVPCType with only VPC_PEERED supported
type: string
version:
type: string
required:
- kafkaCdcId
- kafkaTopicPrefix
- kafkaVpcType
- version
type: object
maxItems: 1
type: array
name:
type: string
network:
Expand Down Expand Up @@ -115,7 +137,7 @@ spec:
description: The PCI compliance standards relate to the security of
user data and transactional information. Can only be applied clusters
provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch
and Redis. PCI compliance cannot be enabled if the cluster has Spark.
and Redis.
type: boolean
privateNetworkCluster:
type: boolean
Expand Down Expand Up @@ -184,15 +206,6 @@ spec:
class nodes. See SLA Tier for more information. Enum: "PRODUCTION"
"NON_PRODUCTION".'
type: string
spark:
items:
properties:
version:
type: string
required:
- version
type: object
type: array
twoFactorDelete:
items:
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ spec:
description: The PCI compliance standards relate to the security of
user data and transactional information. Can only be applied clusters
provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch
and Redis. PCI compliance cannot be enabled if the cluster has Spark.
and Redis.
type: boolean
privateNetworkCluster:
type: boolean
Expand Down
Loading

0 comments on commit 3fe781e

Please sign in to comment.