From 93276cba0886a442ed84a861ec1ac1a7d2f68744 Mon Sep 17 00:00:00 2001 From: Nick Hudson Date: Fri, 19 Apr 2024 09:39:01 -0500 Subject: [PATCH] Support node/pod affinity, tolerations and topologySpreadConstraints that CNPG supports (#717) --- charts/tembo-operator/Chart.yaml | 2 +- charts/tembo-operator/templates/crd.yaml | 650 ++++++++++++++++++ tembo-operator/Cargo.lock | 2 +- tembo-operator/Cargo.toml | 2 +- tembo-operator/src/apis/coredb_types.rs | 32 + tembo-operator/src/app_service/manager.rs | 22 +- tembo-operator/src/cloudnativepg/clusters.rs | 1 + tembo-operator/src/cloudnativepg/cnpg.rs | 31 +- tembo-operator/src/cloudnativepg/mod.rs | 1 + .../placement/cnpg_node_affinity.rs | 376 ++++++++++ .../cloudnativepg/placement/cnpg_placement.rs | 329 +++++++++ .../placement/cnpg_pod_affinity.rs | 510 ++++++++++++++ .../placement/cnpg_pod_anti_affinity.rs | 511 ++++++++++++++ .../placement/cnpg_toleration.rs | 90 +++ .../cloudnativepg/placement/cnpg_topology.rs | 168 +++++ .../src/cloudnativepg/placement/mod.rs | 6 + tembo-operator/src/controller.rs | 8 +- tembo-operator/src/defaults.rs | 9 + 18 files changed, 2734 insertions(+), 16 deletions(-) create mode 100644 tembo-operator/src/cloudnativepg/placement/cnpg_node_affinity.rs create mode 100644 tembo-operator/src/cloudnativepg/placement/cnpg_placement.rs create mode 100644 tembo-operator/src/cloudnativepg/placement/cnpg_pod_affinity.rs create mode 100644 tembo-operator/src/cloudnativepg/placement/cnpg_pod_anti_affinity.rs create mode 100644 tembo-operator/src/cloudnativepg/placement/cnpg_toleration.rs create mode 100644 tembo-operator/src/cloudnativepg/placement/cnpg_topology.rs create mode 100644 tembo-operator/src/cloudnativepg/placement/mod.rs diff --git a/charts/tembo-operator/Chart.yaml b/charts/tembo-operator/Chart.yaml index 19da6d15c..67e277b4c 100644 --- a/charts/tembo-operator/Chart.yaml +++ b/charts/tembo-operator/Chart.yaml @@ -3,7 +3,7 @@ name: tembo-operator description: 'Helm chart to deploy the tembo-operator' type: application icon: https://cloud.tembo.io/images/TemboElephant.png -version: 0.5.0 +version: 0.5.1 home: https://tembo.io sources: - https://github.com/tembo-io/tembo diff --git a/charts/tembo-operator/templates/crd.yaml b/charts/tembo-operator/templates/crd.yaml index 236fef546..0ed3664a2 100644 --- a/charts/tembo-operator/templates/crd.yaml +++ b/charts/tembo-operator/templates/crd.yaml @@ -33,6 +33,576 @@ spec: ```yaml apiVersion: coredb.io/v1alpha1 kind: CoreDB metadata: name: test-db spec: {} ```` properties: + affinityConfiguration: + default: + podAntiAffinityType: preferred + topologyKey: topology.kubernetes.io/zone + description: |- + A AffinityConfiguration provides a way to configure the CoreDB instance to run on specific nodes in the cluster based off of nodeSelector, nodeAffinity and tolerations + + For more informaton on AffinityConfiguration please see the [Cloudnative-PG documentation](https://cloudnative-pg.io/documentation/1.22/cloudnative-pg.v1/#postgresql-cnpg-io-v1-AffinityConfiguration) + + **Default**: ```yaml apiVersion: coredb.io/v1alpha1 kind: CoreDB metadata: name: test-db-restore spec: affinityConfiguration: podAntiAffinityType: preferred topologyKey: topology.kubernetes.io/zone ``` + nullable: true + properties: + additionalPodAffinity: + description: AdditionalPodAffinity allows to specify pod affinity terms to be passed to all the cluster's pods. + nullable: true + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes that satisfy the affinity expressions specified by this field, but it may choose a node that violates one or more of the expressions. The node that is most preferred is the one with the greatest sum of weights, i.e. for each node that meets all of the scheduling requirements (resource request, requiredDuringScheduling affinity expressions, etc.), compute a sum by iterating through the elements of this field and adding "weight" to the sum if the node has pods which matches the corresponding podAffinityTerm; the node(s) with the highest sum are the most preferred. + items: + description: The weights of all of the matched WeightedPodAffinityTerm fields are added per-node to find the most preferred node(s) + properties: + podAffinityTerm: + description: Required. A pod affinity term, associated with the corresponding weight. + properties: + labelSelector: + description: A label query over a set of resources, in this case pods. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaceSelector: + description: A label query over the set of namespaces that the term applies to. The term is applied to the union of the namespaces selected by this field and the ones listed in the namespaces field. null selector and null or empty namespaces list means "this pod's namespace". An empty selector ({}) matches all namespaces. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaces: + description: namespaces specifies a static list of namespace names that the term applies to. The term is applied to the union of the namespaces listed in this field and the ones selected by namespaceSelector. null or empty namespaces list and null namespaceSelector means "this pod's namespace". + items: + type: string + nullable: true + type: array + topologyKey: + description: This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching the labelSelector in the specified namespaces, where co-located is defined as running on a node whose value of the label with key topologyKey matches that of any node on which any of the selected pods is running. Empty topologyKey is not allowed. + type: string + required: + - topologyKey + type: object + weight: + description: weight associated with matching the corresponding podAffinityTerm, in the range 1-100. + format: int32 + type: integer + required: + - podAffinityTerm + - weight + type: object + nullable: true + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field are not met at scheduling time, the pod will not be scheduled onto the node. If the affinity requirements specified by this field cease to be met at some point during pod execution (e.g. due to a pod label update), the system may or may not try to eventually evict the pod from its node. When there are multiple elements, the lists of nodes corresponding to each podAffinityTerm are intersected, i.e. all terms must be satisfied. + items: + description: Defines a set of pods (namely those matching the labelSelector relative to the given namespace(s)) that this pod should be co-located (affinity) or not co-located (anti-affinity) with, where co-located is defined as running on a node whose value of the label with key matches that of any node on which a pod of the set of pods is running + properties: + labelSelector: + description: A label query over a set of resources, in this case pods. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaceSelector: + description: A label query over the set of namespaces that the term applies to. The term is applied to the union of the namespaces selected by this field and the ones listed in the namespaces field. null selector and null or empty namespaces list means "this pod's namespace". An empty selector ({}) matches all namespaces. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaces: + description: namespaces specifies a static list of namespace names that the term applies to. The term is applied to the union of the namespaces listed in this field and the ones selected by namespaceSelector. null or empty namespaces list and null namespaceSelector means "this pod's namespace". + items: + type: string + nullable: true + type: array + topologyKey: + description: This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching the labelSelector in the specified namespaces, where co-located is defined as running on a node whose value of the label with key topologyKey matches that of any node on which any of the selected pods is running. Empty topologyKey is not allowed. + type: string + required: + - topologyKey + type: object + nullable: true + type: array + type: object + additionalPodAntiAffinity: + description: AdditionalPodAntiAffinity allows to specify pod anti-affinity terms to be added to the ones generated by the operator if EnablePodAntiAffinity is set to true (default) or to be used exclusively if set to false. + nullable: true + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes that satisfy the anti-affinity expressions specified by this field, but it may choose a node that violates one or more of the expressions. The node that is most preferred is the one with the greatest sum of weights, i.e. for each node that meets all of the scheduling requirements (resource request, requiredDuringScheduling anti-affinity expressions, etc.), compute a sum by iterating through the elements of this field and adding "weight" to the sum if the node has pods which matches the corresponding podAffinityTerm; the node(s) with the highest sum are the most preferred. + items: + description: The weights of all of the matched WeightedPodAffinityTerm fields are added per-node to find the most preferred node(s) + properties: + podAffinityTerm: + description: Required. A pod affinity term, associated with the corresponding weight. + properties: + labelSelector: + description: A label query over a set of resources, in this case pods. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaceSelector: + description: A label query over the set of namespaces that the term applies to. The term is applied to the union of the namespaces selected by this field and the ones listed in the namespaces field. null selector and null or empty namespaces list means "this pod's namespace". An empty selector ({}) matches all namespaces. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaces: + description: namespaces specifies a static list of namespace names that the term applies to. The term is applied to the union of the namespaces listed in this field and the ones selected by namespaceSelector. null or empty namespaces list and null namespaceSelector means "this pod's namespace". + items: + type: string + nullable: true + type: array + topologyKey: + description: This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching the labelSelector in the specified namespaces, where co-located is defined as running on a node whose value of the label with key topologyKey matches that of any node on which any of the selected pods is running. Empty topologyKey is not allowed. + type: string + required: + - topologyKey + type: object + weight: + description: weight associated with matching the corresponding podAffinityTerm, in the range 1-100. + format: int32 + type: integer + required: + - podAffinityTerm + - weight + type: object + nullable: true + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the anti-affinity requirements specified by this field are not met at scheduling time, the pod will not be scheduled onto the node. If the anti-affinity requirements specified by this field cease to be met at some point during pod execution (e.g. due to a pod label update), the system may or may not try to eventually evict the pod from its node. When there are multiple elements, the lists of nodes corresponding to each podAffinityTerm are intersected, i.e. all terms must be satisfied. + items: + description: Defines a set of pods (namely those matching the labelSelector relative to the given namespace(s)) that this pod should be co-located (affinity) or not co-located (anti-affinity) with, where co-located is defined as running on a node whose value of the label with key matches that of any node on which a pod of the set of pods is running + properties: + labelSelector: + description: A label query over a set of resources, in this case pods. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaceSelector: + description: A label query over the set of namespaces that the term applies to. The term is applied to the union of the namespaces selected by this field and the ones listed in the namespaces field. null selector and null or empty namespaces list means "this pod's namespace". An empty selector ({}) matches all namespaces. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + namespaces: + description: namespaces specifies a static list of namespace names that the term applies to. The term is applied to the union of the namespaces listed in this field and the ones selected by namespaceSelector. null or empty namespaces list and null namespaceSelector means "this pod's namespace". + items: + type: string + nullable: true + type: array + topologyKey: + description: This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching the labelSelector in the specified namespaces, where co-located is defined as running on a node whose value of the label with key topologyKey matches that of any node on which any of the selected pods is running. Empty topologyKey is not allowed. + type: string + required: + - topologyKey + type: object + nullable: true + type: array + type: object + enablePodAntiAffinity: + description: Activates anti-affinity for the pods. The operator will define pods anti-affinity unless this field is explicitly set to false + nullable: true + type: boolean + nodeAffinity: + description: 'NodeAffinity describes node affinity scheduling rules for the pod. More info: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity' + nullable: true + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes that satisfy the affinity expressions specified by this field, but it may choose a node that violates one or more of the expressions. The node that is most preferred is the one with the greatest sum of weights, i.e. for each node that meets all of the scheduling requirements (resource request, requiredDuringScheduling affinity expressions, etc.), compute a sum by iterating through the elements of this field and adding "weight" to the sum if the node matches the corresponding matchExpressions; the node(s) with the highest sum are the most preferred. + items: + description: An empty preferred scheduling term matches all objects with implicit weight 0 (i.e. it's a no-op). A null preferred scheduling term matches no objects (i.e. is also a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding weight. + properties: + matchExpressions: + description: A list of node selector requirements by node's labels. + items: + description: A node selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: The label key that the selector applies to. + type: string + operator: + description: Represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. If the operator is Gt or Lt, the values array must have a single element, which will be interpreted as an integer. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchFields: + description: A list of node selector requirements by node's fields. + items: + description: A node selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: The label key that the selector applies to. + type: string + operator: + description: Represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. If the operator is Gt or Lt, the values array must have a single element, which will be interpreted as an integer. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + type: object + weight: + description: Weight associated with matching the corresponding nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + nullable: true + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field are not met at scheduling time, the pod will not be scheduled onto the node. If the affinity requirements specified by this field cease to be met at some point during pod execution (e.g. due to an update), the system may or may not try to eventually evict the pod from its node. + nullable: true + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The terms are ORed. + items: + description: A null or empty node selector term matches no objects. The requirements of them are ANDed. The TopologySelectorTerm type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by node's labels. + items: + description: A node selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: The label key that the selector applies to. + type: string + operator: + description: Represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. If the operator is Gt or Lt, the values array must have a single element, which will be interpreted as an integer. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchFields: + description: A list of node selector requirements by node's fields. + items: + description: A node selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: The label key that the selector applies to. + type: string + operator: + description: Represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. If the operator is Gt or Lt, the values array must have a single element, which will be interpreted as an integer. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object + nodeSelector: + additionalProperties: + type: string + description: 'NodeSelector is map of key-value pairs used to define the nodes on which the pods can run. More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/' + nullable: true + type: object + podAntiAffinityType: + description: 'PodAntiAffinityType allows the user to decide whether pod anti-affinity between cluster instance has to be considered a strong requirement during scheduling or not. Allowed values are: "preferred" (default if empty) or "required". Setting it to "required", could lead to instances remaining pending until new kubernetes nodes are added if all the existing nodes don''t match the required pod anti-affinity rule. More info: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#inter-pod-affinity-and-anti-affinity' + nullable: true + type: string + tolerations: + description: 'Tolerations is a list of Tolerations that should be set for all the pods, in order to allow them to run on tainted nodes. More info: https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/' + items: + description: The pod this Toleration is attached to tolerates any taint that matches the triple using the matching operator . + properties: + effect: + description: Effect indicates the taint effect to match. Empty means match all taint effects. When specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute. + nullable: true + type: string + key: + description: Key is the taint key that the toleration applies to. Empty means match all taint keys. If the key is empty, operator must be Exists; this combination means to match all values and all keys. + nullable: true + type: string + operator: + description: Operator represents a key's relationship to the value. Valid operators are Exists and Equal. Defaults to Equal. Exists is equivalent to wildcard for value, so that a pod can tolerate all taints of a particular category. + nullable: true + type: string + tolerationSeconds: + description: TolerationSeconds represents the period of time the toleration (which must be of effect NoExecute, otherwise this field is ignored) tolerates the taint. By default, it is not set, which means tolerate the taint forever (do not evict). Zero and negative values will be treated as 0 (evict immediately) by the system. + format: int64 + nullable: true + type: integer + value: + description: Value is the taint value the toleration matches to. If the operator is Exists, the value should be empty, otherwise just a regular string. + nullable: true + type: string + type: object + nullable: true + type: array + topologyKey: + description: TopologyKey to use for anti-affinity configuration. See k8s documentation for more info on that + nullable: true + type: string + type: object appServices: description: |- app_service is a way to define a service that is deployed alongside the Postgres instance. This is typically used to deploy a service that is used to connect to the Postgres instance in some manner. @@ -2057,6 +2627,86 @@ spec: **Default**: `None` (uses the `default` StorageClass in your cluster) nullable: true type: string + topologySpreadConstraints: + description: |- + The topologySpreadConstraints provides a way to spread matching pods among the given topology + + For more information see the Kubernetes documentation on [Topology Spread Constraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/) Tembo is compatable with the `v1` version of the TopologySpreadConstraints up to [Kubernetes 1.25](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#topologyspreadconstraint-v1-core) + + **Default**: `None` + items: + description: TopologySpreadConstraint specifies how to spread matching pods among the given topology. + properties: + labelSelector: + description: LabelSelector is used to find matching pods. Pods that match this label selector are counted to determine the number of pods in their corresponding topology domain. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + nullable: true + type: array + required: + - key + - operator + type: object + nullable: true + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + nullable: true + type: object + type: object + matchLabelKeys: + description: MatchLabelKeys is a set of pod label keys to select the pods over which spreading will be calculated. The keys are used to lookup values from the incoming pod labels, those key-value labels are ANDed with labelSelector to select the group of existing pods over which spreading will be calculated for the incoming pod. The same key is forbidden to exist in both MatchLabelKeys and LabelSelector. MatchLabelKeys cannot be set when LabelSelector isn't set. Keys that don't exist in the incoming pod labels will be ignored. A null or empty list means only match against labelSelector. This is a beta field and requires the MatchLabelKeysInPodTopologySpread feature gate to be enabled (enabled by default). + items: + type: string + nullable: true + type: array + maxSkew: + description: 'MaxSkew describes the degree to which pods may be unevenly distributed. When `whenUnsatisfiable=DoNotSchedule`, it is the maximum permitted difference between the number of matching pods in the target topology and the global minimum. The global minimum is the minimum number of matching pods in an eligible domain or zero if the number of eligible domains is less than MinDomains. For example, in a 3-zone cluster, MaxSkew is set to 1, and pods with the same labelSelector spread as 2/2/1: In this case, the global minimum is 1. | zone1 | zone2 | zone3 | | P P | P P | P | - if MaxSkew is 1, incoming pod can only be scheduled to zone3 to become 2/2/2; scheduling it onto zone1(zone2) would make the ActualSkew(3-1) on zone1(zone2) violate MaxSkew(1). - if MaxSkew is 2, incoming pod can be scheduled onto any zone. When `whenUnsatisfiable=ScheduleAnyway`, it is used to give higher precedence to topologies that satisfy it. It''s a required field. Default value is 1 and 0 is not allowed.' + format: int32 + type: integer + minDomains: + description: 'MinDomains indicates a minimum number of eligible domains. When the number of eligible domains with matching topology keys is less than minDomains, Pod Topology Spread treats "global minimum" as 0, and then the calculation of Skew is performed. And when the number of eligible domains with matching topology keys equals or greater than minDomains, this value has no effect on scheduling. As a result, when the number of eligible domains is less than minDomains, scheduler won''t schedule more than maxSkew Pods to those domains. If value is nil, the constraint behaves as if MinDomains is equal to 1. Valid values are integers greater than 0. When value is not nil, WhenUnsatisfiable must be DoNotSchedule. For example, in a 3-zone cluster, MaxSkew is set to 2, MinDomains is set to 5 and pods with the same labelSelector spread as 2/2/2: | zone1 | zone2 | zone3 | | P P | P P | P P | The number of domains is less than 5(MinDomains), so "global minimum" is treated as 0. In this situation, new pod with the same labelSelector cannot be scheduled, because computed skew will be 3(3 - 0) if new Pod is scheduled to any of the three zones, it will violate MaxSkew. This is a beta field and requires the MinDomainsInPodTopologySpread feature gate to be enabled (enabled by default).' + format: int32 + nullable: true + type: integer + nodeAffinityPolicy: + description: 'NodeAffinityPolicy indicates how we will treat Pod''s nodeAffinity/nodeSelector when calculating pod topology spread skew. Options are: - Honor: only nodes matching nodeAffinity/nodeSelector are included in the calculations. - Ignore: nodeAffinity/nodeSelector are ignored. All nodes are included in the calculations. If this value is nil, the behavior is equivalent to the Honor policy. This is a beta-level feature default enabled by the NodeInclusionPolicyInPodTopologySpread feature flag.' + nullable: true + type: string + nodeTaintsPolicy: + description: 'NodeTaintsPolicy indicates how we will treat node taints when calculating pod topology spread skew. Options are: - Honor: nodes without taints, along with tainted nodes for which the incoming pod has a toleration, are included. - Ignore: node taints are ignored. All nodes are included. If this value is nil, the behavior is equivalent to the Ignore policy. This is a beta-level feature default enabled by the NodeInclusionPolicyInPodTopologySpread feature flag.' + nullable: true + type: string + topologyKey: + description: TopologyKey is the key of node labels. Nodes that have a label with this key and identical values are considered to be in the same topology. We consider each as a "bucket", and try to put balanced number of pods into each bucket. We define a domain as a particular instance of a topology. Also, we define an eligible domain as a domain whose nodes meet the requirements of nodeAffinityPolicy and nodeTaintsPolicy. e.g. If TopologyKey is "kubernetes.io/hostname", each Node is a domain of that topology. And, if TopologyKey is "topology.kubernetes.io/zone", each zone is a domain of that topology. It's a required field. + type: string + whenUnsatisfiable: + description: 'WhenUnsatisfiable indicates how to deal with a pod if it doesn''t satisfy the spread constraint. - DoNotSchedule (default) tells the scheduler not to schedule it. - ScheduleAnyway tells the scheduler to schedule the pod in any location, but giving higher precedence to topologies that would help reduce the skew. A constraint is considered "Unsatisfiable" for an incoming pod if and only if every possible node assignment for that pod would violate "MaxSkew" on some topology. For example, in a 3-zone cluster, MaxSkew is set to 1, and pods with the same labelSelector spread as 3/1/1: | zone1 | zone2 | zone3 | | P P P | P | P | If WhenUnsatisfiable is set to DoNotSchedule, incoming pod can only be scheduled to zone2(zone3) to become 3/2/1(3/1/2) as ActualSkew(2-1) on zone2(zone3) satisfies MaxSkew(1). In other words, the cluster can still be imbalanced, but scheduler won''t make it *more* imbalanced. It''s a required field.' + type: string + required: + - maxSkew + - topologyKey + - whenUnsatisfiable + type: object + nullable: true + type: array trunk_installs: default: [] description: |- diff --git a/tembo-operator/Cargo.lock b/tembo-operator/Cargo.lock index c53895b70..58d3d113d 100644 --- a/tembo-operator/Cargo.lock +++ b/tembo-operator/Cargo.lock @@ -494,7 +494,7 @@ dependencies = [ [[package]] name = "controller" -version = "0.44.1" +version = "0.45.0" dependencies = [ "actix-web", "anyhow", diff --git a/tembo-operator/Cargo.toml b/tembo-operator/Cargo.toml index ac4014da6..e69db1122 100644 --- a/tembo-operator/Cargo.toml +++ b/tembo-operator/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "controller" description = "Tembo Operator for Postgres" -version = "0.44.1" +version = "0.45.0" edition = "2021" default-run = "controller" license = "Apache-2.0" diff --git a/tembo-operator/src/apis/coredb_types.rs b/tembo-operator/src/apis/coredb_types.rs index 95ff3715c..877d4f068 100644 --- a/tembo-operator/src/apis/coredb_types.rs +++ b/tembo-operator/src/apis/coredb_types.rs @@ -13,6 +13,7 @@ use k8s_openapi::{ apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::ObjectMeta}, }; +use crate::cloudnativepg::clusters::{ClusterAffinity, ClusterTopologySpreadConstraints}; use crate::cloudnativepg::poolers::{ PoolerPgbouncerPoolMode, PoolerTemplateSpecContainersResources, }; @@ -554,6 +555,37 @@ pub struct CoreDBSpec { /// **Default**: `None` (uses the `default` StorageClass in your cluster) #[serde(rename = "storageClass")] pub storage_class: Option, + + /// A AffinityConfiguration provides a way to configure the CoreDB instance to run + /// on specific nodes in the cluster based off of nodeSelector, nodeAffinity and tolerations + /// + /// For more informaton on AffinityConfiguration please see the [Cloudnative-PG documentation](https://cloudnative-pg.io/documentation/1.22/cloudnative-pg.v1/#postgresql-cnpg-io-v1-AffinityConfiguration) + /// + /// **Default**: + /// ```yaml + /// apiVersion: coredb.io/v1alpha1 + /// kind: CoreDB + /// metadata: + /// name: test-db-restore + /// spec: + /// affinityConfiguration: + /// podAntiAffinityType: preferred + /// topologyKey: topology.kubernetes.io/zone + /// ``` + #[serde( + rename = "affinityConfiguration", + default = "defaults::default_affinity_configuration" + )] + pub affinity_configuration: Option, + + /// The topologySpreadConstraints provides a way to spread matching pods among the given topology + /// + /// For more information see the Kubernetes documentation on [Topology Spread Constraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/) + /// Tembo is compatable with the `v1` version of the TopologySpreadConstraints up to [Kubernetes 1.25](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#topologyspreadconstraint-v1-core) + /// + /// **Default**: `None` + #[serde(rename = "topologySpreadConstraints")] + pub topology_spread_constraints: Option>, } impl CoreDBSpec { diff --git a/tembo-operator/src/app_service/manager.rs b/tembo-operator/src/app_service/manager.rs index 1922cffca..c390966c7 100644 --- a/tembo-operator/src/app_service/manager.rs +++ b/tembo-operator/src/app_service/manager.rs @@ -1,5 +1,6 @@ use crate::{ - apis::coredb_types::CoreDB, ingress_route_crd::IngressRouteRoutes, Context, Error, Result, + apis::coredb_types::CoreDB, cloudnativepg::placement::cnpg_placement::PlacementConfig, + ingress_route_crd::IngressRouteRoutes, Context, Error, Result, }; use k8s_openapi::{ api::{ @@ -57,6 +58,7 @@ fn generate_resource( oref: OwnerReference, domain: Option, annotations: &BTreeMap, + placement: Option, ) -> AppServiceResources { let resource_name = format!("{}-{}", coredb_name, appsvc.name.clone()); let service = appsvc.routing.as_ref().map(|_| { @@ -76,6 +78,7 @@ fn generate_resource( namespace, oref, annotations, + placement, ); // If DATA_PLANE_BASEDOMAIN is not set, don't generate IngressRoutes, IngressRouteTCPs, or EntryPoints @@ -226,6 +229,7 @@ fn generate_deployment( namespace: &str, oref: OwnerReference, annotations: &BTreeMap, + placement: Option, ) -> Deployment { let mut labels: BTreeMap = BTreeMap::new(); labels.insert("app".to_owned(), resource_name.to_string()); @@ -451,7 +455,14 @@ fn generate_deployment( } } + let affinity = placement.as_ref().and_then(|p| p.combine_affinity_items()); + let tolerations = placement.as_ref().map(|p| p.tolerations.clone()); + let topology_spread_constraints = placement + .as_ref() + .and_then(|p| p.topology_spread_constraints.clone()); + let pod_spec = PodSpec { + affinity, containers: vec![Container { args: appsvc.args.clone(), command: appsvc.command.clone(), @@ -466,6 +477,8 @@ fn generate_deployment( volume_mounts: Some(volume_mounts), ..Container::default() }], + tolerations, + topology_spread_constraints, volumes: Some(volumes), security_context: pod_security_context, ..PodSpec::default() @@ -625,7 +638,11 @@ fn generate_appsvc_annotations(cdb: &CoreDB) -> BTreeMap { ) } -pub async fn reconcile_app_services(cdb: &CoreDB, ctx: Arc) -> Result<(), Action> { +pub async fn reconcile_app_services( + cdb: &CoreDB, + ctx: Arc, + placement: Option, +) -> Result<(), Action> { let client = ctx.client.clone(); let ns = cdb.namespace().unwrap(); let coredb_name = cdb.name_any(); @@ -761,6 +778,7 @@ pub async fn reconcile_app_services(cdb: &CoreDB, ctx: Arc) -> Result<( oref.clone(), domain.to_owned(), &annotations, + placement.clone(), ) }) .collect(); diff --git a/tembo-operator/src/cloudnativepg/clusters.rs b/tembo-operator/src/cloudnativepg/clusters.rs index a4240c6b1..15961413f 100644 --- a/tembo-operator/src/cloudnativepg/clusters.rs +++ b/tembo-operator/src/cloudnativepg/clusters.rs @@ -1,4 +1,5 @@ // WARNING: generated by kopium - manual changes will be overwritten + // kopium command: kopium -D Default clusters.postgresql.cnpg.io -A // kopium version: 0.16.5 diff --git a/tembo-operator/src/cloudnativepg/cnpg.rs b/tembo-operator/src/cloudnativepg/cnpg.rs index d9f3b6549..f33ce92b3 100644 --- a/tembo-operator/src/cloudnativepg/cnpg.rs +++ b/tembo-operator/src/cloudnativepg/cnpg.rs @@ -10,7 +10,7 @@ use crate::{ cloudnativepg::{ backups::Backup, clusters::{ - Cluster, ClusterAffinity, ClusterBackup, ClusterBackupBarmanObjectStore, + Cluster, ClusterBackup, ClusterBackupBarmanObjectStore, ClusterBackupBarmanObjectStoreData, ClusterBackupBarmanObjectStoreDataCompression, ClusterBackupBarmanObjectStoreDataEncryption, ClusterBackupBarmanObjectStoreS3Credentials, @@ -43,6 +43,7 @@ use crate::{ ClusterStorage, ClusterSuperuserSecret, }, cnpg_utils::{is_image_updated, patch_cluster, restart_and_wait_for_restart}, + placement::cnpg_placement::PlacementConfig, poolers::{ Pooler, PoolerCluster, PoolerPgbouncer, PoolerSpec, PoolerTemplate, PoolerTemplateSpec, PoolerTemplateSpecContainers, PoolerType, @@ -647,6 +648,8 @@ pub fn cnpg_cluster_from_cdb( let (backup, service_account_template) = cnpg_backup_configuration(cdb, &cfg); let storage = cnpg_cluster_storage(cdb); let replication = cnpg_high_availability(cdb); + let affinity = cdb.spec.affinity_configuration.clone(); + let topology_spread_constraints = cdb.spec.topology_spread_constraints.clone(); let PostgresConfig { postgres_parameters, @@ -709,11 +712,8 @@ pub fn cnpg_cluster_from_cdb( ..ObjectMeta::default() }, spec: ClusterSpec { - affinity: Some(ClusterAffinity { - pod_anti_affinity_type: Some("preferred".to_string()), - topology_key: Some("topology.kubernetes.io/zone".to_string()), - ..ClusterAffinity::default() - }), + affinity, + topology_spread_constraints, backup, service_account_template, bootstrap, @@ -1260,13 +1260,23 @@ pub async fn reconcile_metrics_service(cdb: &CoreDB, ctx: Arc) -> Resul } // Reconcile a Pooler #[instrument(skip(cdb, ctx) fields(trace_id, instance_name = %cdb.name_any()))] -pub async fn reconcile_pooler(cdb: &CoreDB, ctx: Arc) -> Result<(), Action> { +pub async fn reconcile_pooler( + cdb: &CoreDB, + ctx: Arc, + placement: Option, +) -> Result<(), Action> { let client = ctx.client.clone(); let name = cdb.name_any() + "-pooler"; let namespace = cdb.namespace().unwrap(); - let pooler_api: Api = Api::namespaced(client.clone(), namespace.as_str()); - let owner_reference = cdb.controller_owner_ref(&()).unwrap(); + let pooler_api: Api = Api::namespaced(client.clone(), namespace.as_str()); + let pooler_tolerations = placement + .as_ref() + .and_then(|config| config.convert_pooler_tolerations()); + let topology_spread_constraints = placement + .as_ref() + .and_then(|p| p.convert_pooler_topology_spread_constraints()); + let affinity = placement.as_ref().and_then(|p| p.convert_pooler_affinity()); // If pooler is enabled, create or update if cdb.spec.connectionPooler.enabled { @@ -1301,6 +1311,9 @@ pub async fn reconcile_pooler(cdb: &CoreDB, ctx: Arc) -> Result<(), Act resources: cdb.spec.connectionPooler.pooler.resources.clone(), ..Default::default() }], + affinity, + tolerations: pooler_tolerations, + topology_spread_constraints, ..Default::default() }), }), diff --git a/tembo-operator/src/cloudnativepg/mod.rs b/tembo-operator/src/cloudnativepg/mod.rs index 9ea49bd49..529082379 100644 --- a/tembo-operator/src/cloudnativepg/mod.rs +++ b/tembo-operator/src/cloudnativepg/mod.rs @@ -3,6 +3,7 @@ pub mod clusters; pub(crate) mod cnpg; // pub(crate) mod cnpg_backups; mod cnpg_utils; +pub(crate) mod placement; pub mod poolers; mod scheduledbackups; diff --git a/tembo-operator/src/cloudnativepg/placement/cnpg_node_affinity.rs b/tembo-operator/src/cloudnativepg/placement/cnpg_node_affinity.rs new file mode 100644 index 000000000..d48d4b606 --- /dev/null +++ b/tembo-operator/src/cloudnativepg/placement/cnpg_node_affinity.rs @@ -0,0 +1,376 @@ +use crate::cloudnativepg::clusters::{ + ClusterAffinityNodeAffinity, + ClusterAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchExpressions, + ClusterAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchFields, + ClusterAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchExpressions, + ClusterAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchFields, +}; +use crate::cloudnativepg::poolers::{ + PoolerTemplateSpecAffinityNodeAffinity, + PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecution, + PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreference, + PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchExpressions, + PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchFields, + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecution, + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTerms, + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchExpressions, + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchFields, +}; +use k8s_openapi::api::core::v1::{ + NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, PreferredSchedulingTerm, +}; + +// Start of functions needed to convert a ClusterAffinityNodeAffinity to a k8s_openapi::api::core::v1::NodeAffinity +// +// convert_node_affinity converts a ClusterAffinityNodeAffinity to a k8s_openapi::api::core::v1::NodeAffinity +// this is the meta function that calls the other conversion functions +pub fn convert_node_affinity(ca: &ClusterAffinityNodeAffinity) -> NodeAffinity { + NodeAffinity { + preferred_during_scheduling_ignored_during_execution: Some(convert_preferred(ca)), + required_during_scheduling_ignored_during_execution: convert_required(ca), + } +} + +// convert_preferred converts a ClusterAffinityNodeAffinity to a Vec +fn convert_preferred(ca: &ClusterAffinityNodeAffinity) -> Vec { + match &ca.preferred_during_scheduling_ignored_during_execution { + Some(prefs) => prefs + .iter() + .map(|pref| PreferredSchedulingTerm { + weight: pref.weight, + preference: NodeSelectorTerm { + match_expressions: Some(convert_match_expressions( + &pref.preference.match_expressions, + )), + match_fields: Some(convert_match_fields(&pref.preference.match_fields)), + }, + }) + .collect(), + None => Vec::new(), + } +} +// convert_required converts a ClusterAffinityNodeAffinity to an Option +fn convert_required(ca: &ClusterAffinityNodeAffinity) -> Option { + ca.required_during_scheduling_ignored_during_execution + .as_ref() + .map(|req| NodeSelector { + node_selector_terms: req + .node_selector_terms + .iter() + .map(|term| NodeSelectorTerm { + match_expressions: convert_required_match_expressions(&term.match_expressions), + match_fields: convert_required_match_fields(&term.match_fields), + }) + .collect(), + }) +} + +// Convert match expressions safely without assuming defaults +fn convert_required_match_expressions( + expressions: &Option>, +) -> Option> { + expressions.as_ref().map(|exprs| { + exprs + .iter() + .map(|expr| { + NodeSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.as_ref().cloned(), // Use cloned to safely handle the Option + } + }) + .collect() + }) +} + +// Convert match fields safely without assuming defaults +fn convert_required_match_fields( + fields: &Option>, +) -> Option> { + fields.as_ref().map(|flds| { + flds.iter() + .map(|field| { + NodeSelectorRequirement { + key: field.key.clone(), + operator: field.operator.clone(), + values: field.values.as_ref().cloned(), // Use cloned to safely handle the Option + } + }) + .collect() + }) +} + +// convert_match_expressions converts a ClusterAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchExpressions to a Vec +fn convert_match_expressions( + expressions: &Option>, +) -> Vec { + expressions.as_ref().map_or(Vec::new(), |exprs| { + exprs + .iter() + .map(|expr| NodeSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }) +} + +// convert_match_fields converts a ClusterAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchFields to a Vec +fn convert_match_fields( + fields: &Option>, +) -> Vec { + fields.as_ref().map_or(Vec::new(), |flds| { + flds.iter() + .map(|field| NodeSelectorRequirement { + key: field.key.clone(), + operator: field.operator.clone(), + values: Some( + field + .values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }) +} + +// convert_node_affinity_to_pooler converts a NodeAffinity to a PoolerTemplateSpecAffinityNodeAffinity +// to be used in the PoolerTemplateSpec struct when creating a new pooler. +pub fn convert_node_affinity_to_pooler( + node_affinity: &NodeAffinity, +) -> Option { + if node_affinity + .preferred_during_scheduling_ignored_during_execution + .is_none() + && node_affinity + .required_during_scheduling_ignored_during_execution + .is_none() + { + None + } else { + Some(PoolerTemplateSpecAffinityNodeAffinity { + required_during_scheduling_ignored_during_execution: node_affinity.required_during_scheduling_ignored_during_execution.as_ref().map(|req| { + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecution { + node_selector_terms: convert_node_selector_terms_to_pooler(&req.node_selector_terms), + } + }), + preferred_during_scheduling_ignored_during_execution: node_affinity.preferred_during_scheduling_ignored_during_execution.as_ref().map(|prefs| { + convert_preferred_scheduling_terms_to_pooler(prefs) + }), + }) + } +} + +// convert_node_selector_terms_to_pooler converts a Vec to a Vec +// to be used in the PoolerTemplateSpecAffinityNodeAffinity struct when creating a new pooler. +fn convert_node_selector_terms_to_pooler( + terms: &[NodeSelectorTerm], +) -> Vec{ + terms.iter().map(|term| { + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTerms { + match_expressions: term.match_expressions.as_ref().map(|expressions| { + expressions.iter().map(|expr| { + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + match_fields: term.match_fields.as_ref().map(|fields| { + fields.iter().map(|field| { + PoolerTemplateSpecAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchFields { + key: field.key.clone(), + operator: field.operator.clone(), + values: field.values.clone(), + } + }).collect() + }), + } + }).collect() +} + +// convert_preferred_scheduling_terms_to_pooler converts a Vec to a Vec +// to be used in the PoolerTemplateSpecAffinityNodeAffinity struct when creating a new pooler. +fn convert_preferred_scheduling_terms_to_pooler( + terms: &[PreferredSchedulingTerm], +) -> Vec { + terms.iter().map(|term| { + PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecution { + preference: PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreference { + match_expressions: term.preference.match_expressions.as_ref().map(|expressions| { + expressions.iter().map(|expr| { + PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + match_fields: term.preference.match_fields.as_ref().map(|fields| { + fields.iter().map(|field| { + PoolerTemplateSpecAffinityNodeAffinityPreferredDuringSchedulingIgnoredDuringExecutionPreferenceMatchFields { + key: field.key.clone(), + operator: field.operator.clone(), + values: field.values.clone(), + } + }).collect() + }), + }, + weight: term.weight, + } + }).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_sample_node_affinity() -> NodeAffinity { + NodeAffinity { + required_during_scheduling_ignored_during_execution: Some(NodeSelector { + node_selector_terms: vec![NodeSelectorTerm { + match_expressions: Some(vec![NodeSelectorRequirement { + key: "region".to_string(), + operator: "In".to_string(), + values: Some(vec!["us-west-1".to_string()]), + }]), + ..Default::default() + }], + }), + preferred_during_scheduling_ignored_during_execution: Some(vec![ + PreferredSchedulingTerm { + weight: 100, + preference: NodeSelectorTerm { + match_expressions: Some(vec![NodeSelectorRequirement { + key: "zone".to_string(), + operator: "In".to_string(), + values: Some(vec!["us-west-1a".to_string()]), + }]), + ..Default::default() + }, + }, + ]), + } + } + + #[test] + fn test_convert_node_affinity_empty() { + let ca = ClusterAffinityNodeAffinity { + preferred_during_scheduling_ignored_during_execution: None, + required_during_scheduling_ignored_during_execution: None, + }; + + let result = convert_node_affinity(&ca); + // assert!(result + // .preferred_during_scheduling_ignored_during_execution + // .expect("preferred_during_scheduling_ignored_during_execution should be Some")); + assert!(result + .required_during_scheduling_ignored_during_execution + .is_none()); + } + + #[test] + fn test_convert_required_match_expressions_empty() { + let expressions = None; + let result = convert_required_match_expressions(&expressions); + assert_eq!(result, None); + } + + #[test] + fn test_convert_required_match_fields_empty() { + let fields = None; + let result = convert_required_match_fields(&fields); + assert_eq!(result, None); + } + + #[test] + fn test_convert_required_match_expressions() { + let expressions = Some(vec![ + ClusterAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchExpressions { + key: "key1".to_string(), + operator: "In".to_string(), + values: Some(vec!["value1".to_string(), "value2".to_string()]), + }, + ]); + + let result = convert_required_match_expressions(&expressions).unwrap(); + assert_eq!(result.len(), 1); + assert_eq!(result[0].key, "key1"); + assert_eq!(result[0].operator, "In"); + assert_eq!( + result[0].values, + Some(vec!["value1".to_string(), "value2".to_string()]) + ); + } + + #[test] + fn test_convert_required_match_fields() { + let fields = Some(vec![ + ClusterAffinityNodeAffinityRequiredDuringSchedulingIgnoredDuringExecutionNodeSelectorTermsMatchFields { + key: "field1".to_string(), + operator: "Exists".to_string(), + values: None, + }, + ]); + + let result = convert_required_match_fields(&fields).unwrap(); + assert_eq!(result.len(), 1); + assert_eq!(result[0].key, "field1"); + assert_eq!(result[0].operator, "Exists"); + assert!(result[0].values.is_none()); + } + #[test] + fn test_convert_node_affinity_to_pooler_non_empty() { + let node_affinity = create_sample_node_affinity(); + let result = convert_node_affinity_to_pooler(&node_affinity); + + assert!(result.is_some()); + let pooler_node_affinity = result.unwrap(); + assert!(pooler_node_affinity + .required_during_scheduling_ignored_during_execution + .is_some()); + assert!(pooler_node_affinity + .preferred_during_scheduling_ignored_during_execution + .is_some()); + + let required = &pooler_node_affinity + .required_during_scheduling_ignored_during_execution + .unwrap(); + assert_eq!(required.node_selector_terms.len(), 1); + assert_eq!( + required.node_selector_terms[0] + .match_expressions + .as_ref() + .unwrap()[0] + .key, + "region" + ); + + let preferred = &pooler_node_affinity + .preferred_during_scheduling_ignored_during_execution + .unwrap()[0]; + assert_eq!(preferred.weight, 100); + assert_eq!( + preferred.preference.match_expressions.as_ref().unwrap()[0].key, + "zone" + ); + } + + #[test] + fn test_convert_node_affinity_to_pooler_empty() { + let node_affinity = NodeAffinity { + required_during_scheduling_ignored_during_execution: None, + preferred_during_scheduling_ignored_during_execution: None, + }; + let result = convert_node_affinity_to_pooler(&node_affinity); + + assert!(result.is_none()); + } +} diff --git a/tembo-operator/src/cloudnativepg/placement/cnpg_placement.rs b/tembo-operator/src/cloudnativepg/placement/cnpg_placement.rs new file mode 100644 index 000000000..a9de28c9d --- /dev/null +++ b/tembo-operator/src/cloudnativepg/placement/cnpg_placement.rs @@ -0,0 +1,329 @@ +use crate::apis::coredb_types::CoreDB; +use crate::cloudnativepg::placement::{ + cnpg_node_affinity::{convert_node_affinity, convert_node_affinity_to_pooler}, + cnpg_pod_affinity::{convert_pod_affinity, convert_pod_affinity_to_pooler}, + cnpg_pod_anti_affinity::{convert_pod_anti_affinity, convert_pod_anti_affinity_to_pooler}, + cnpg_toleration::{convert_toleration, convert_toleration_to_pooler}, + cnpg_topology::{convert_cluster_topology_spread_constraints, convert_topo_to_pooler}, +}; +use crate::cloudnativepg::poolers::{ + PoolerTemplateSpecAffinity, PoolerTemplateSpecTolerations, + PoolerTemplateSpecTopologySpreadConstraints, +}; +use k8s_openapi::api::core::v1::{ + Affinity, NodeAffinity, PodAffinity, PodAntiAffinity, Toleration, TopologySpreadConstraint, +}; +use std::collections::BTreeMap; + +#[derive(Debug, Clone)] +pub struct PlacementConfig { + pub node_selector: Option>, + pub tolerations: Vec, + pub node_affinity: Option, + pub pod_affinity: Option, + pub pod_anti_affinity: Option, + pub topology_spread_constraints: Option>, +} + +// PlacementConfig is a struct that holds the affinity and topology configuration for the CNPG Cluster, Pooler +// and any Tembo specific Deployments like for AppServices. +impl PlacementConfig { + pub fn new(core_db: &CoreDB) -> Option { + core_db + .spec + .affinity_configuration + .as_ref() + .map(|config| PlacementConfig { + node_selector: config.node_selector.clone(), + tolerations: config + .tolerations + .as_ref() + .map_or_else(Vec::new, |tolerations| { + tolerations.iter().map(convert_toleration).collect() + }), + node_affinity: config.node_affinity.as_ref().map(convert_node_affinity), + pod_affinity: config + .additional_pod_affinity + .as_ref() + .map(convert_pod_affinity), + pod_anti_affinity: config + .additional_pod_anti_affinity + .as_ref() + .map(convert_pod_anti_affinity), + topology_spread_constraints: convert_cluster_topology_spread_constraints( + &core_db.spec.topology_spread_constraints, + ), + }) + } + + // combine_affinity_items will combine self.node_affinity, self.pod_affinity, and self.pod_anti_affinity into a single pod affinity object. + // This is used to simplify the process of creating an affinity object for a pod or deployment. + pub fn combine_affinity_items(&self) -> Option { + let mut affinity = Affinity::default(); + if let Some(node_affinity) = &self.node_affinity { + affinity.node_affinity = Some(node_affinity.clone()); + } + if let Some(pod_affinity) = &self.pod_affinity { + affinity.pod_affinity = Some(pod_affinity.clone()); + } + if let Some(pod_anti_affinity) = &self.pod_anti_affinity { + affinity.pod_anti_affinity = Some(pod_anti_affinity.clone()); + } + if affinity.node_affinity.is_none() + && affinity.pod_affinity.is_none() + && affinity.pod_anti_affinity.is_none() + { + None + } else { + Some(affinity) + } + } + + // convert_pooler_tolerations Converts `Toleration` to `PoolerTemplateSpecTolerations`. + // to be used in the PoolerTemplateSpec struct when building out a pooler. + pub fn convert_pooler_tolerations(&self) -> Option> { + if self.tolerations.is_empty() { + None + } else { + Some( + self.tolerations + .iter() + .filter_map(convert_toleration_to_pooler) + .collect(), + ) + } + } + + // convert_pooler_topology_spread_constraints Converts `TopologySpreadConstraint` to `PoolerTemplateSpecTopologySpreadConstraints`. + // to be used in the PoolerTemplateSpec struct when building out a pooler. + pub fn convert_pooler_topology_spread_constraints( + &self, + ) -> Option> { + self.topology_spread_constraints + .as_ref() + .and_then(|topologies| { + if topologies.is_empty() { + None + } else { + convert_topo_to_pooler(topologies) + } + }) + } + + // convert_pooler_affinity Converts `Affinity` to `PoolerTemplateSpecAffinity`. + // to be used in the PoolerTemplateSpec struct when building out a pooler. + pub fn convert_pooler_affinity(&self) -> Option { + if self.node_affinity.is_none() + && self.pod_affinity.is_none() + && self.pod_anti_affinity.is_none() + { + None + } else { + Some(PoolerTemplateSpecAffinity { + node_affinity: self + .node_affinity + .as_ref() + .and_then(convert_node_affinity_to_pooler), + pod_affinity: self + .pod_affinity + .as_ref() + .and_then(convert_pod_affinity_to_pooler), + pod_anti_affinity: self + .pod_anti_affinity + .as_ref() + .and_then(convert_pod_anti_affinity_to_pooler), + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::api::core::v1::{ + NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, PodAffinityTerm, Toleration, + WeightedPodAffinityTerm, + }; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; + + fn create_node_affinity() -> NodeAffinity { + NodeAffinity { + required_during_scheduling_ignored_during_execution: Some(NodeSelector { + node_selector_terms: vec![NodeSelectorTerm { + match_expressions: Some(vec![NodeSelectorRequirement { + key: "key1".to_string(), + operator: "In".to_string(), + values: Some(vec!["value1".to_string()]), + }]), + ..Default::default() + }], + }), + ..Default::default() + } + } + + fn create_pod_affinity() -> PodAffinity { + PodAffinity { + required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("key", "value")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["default".to_string()]), + topology_key: "kubernetes.io/hostname".to_string(), + ..Default::default() + }]), + preferred_during_scheduling_ignored_during_execution: Some(vec![ + WeightedPodAffinityTerm { + weight: 100, + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("key", "value")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["default".to_string()]), + topology_key: "kubernetes.io/hostname".to_string(), + ..Default::default() + }, + }, + ]), + } + } + + fn create_pod_anti_affinity() -> PodAntiAffinity { + PodAntiAffinity { + required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("anti-key", "anti-value")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["default-namespace".to_string()]), + topology_key: "kubernetes.io/hostname".to_string(), + ..Default::default() + }]), + preferred_during_scheduling_ignored_during_execution: Some(vec![ + WeightedPodAffinityTerm { + weight: 80, + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("pref-anti-key", "pref-anti-value")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["preferred-namespace".to_string()]), + topology_key: "kubernetes.io/zone".to_string(), + ..Default::default() + }, + }, + ]), + } + } + + fn create_tolerations() -> Vec { + vec![Toleration { + key: Some("key".to_string()), + operator: Some("Equal".to_string()), + value: Some("value".to_string()), + effect: Some("NoSchedule".to_string()), + toleration_seconds: Some(3600), + }] + } + + #[test] + + fn test_convert_pooler_tolerations_non_empty() { + let tolerations = create_tolerations(); + + let placement = PlacementConfig { + tolerations, + node_affinity: None, + pod_affinity: None, + pod_anti_affinity: None, + topology_spread_constraints: None, + node_selector: None, + }; + + let result = placement.convert_pooler_tolerations(); + assert!( + result.is_some(), + "Tolerations conversion should not be None" + ); + let tolerations_result = result.unwrap(); + assert_eq!( + tolerations_result.len(), + 1, + "Expected exactly one toleration to be converted" + ); + + // Detailed assertions on toleration contents + let toleration = &tolerations_result[0]; + assert_eq!(toleration.key.as_deref(), Some("key")); + assert_eq!(toleration.operator.as_deref(), Some("Equal")); + assert_eq!(toleration.value.as_deref(), Some("value")); + assert_eq!(toleration.effect.as_deref(), Some("NoSchedule")); + assert_eq!(toleration.toleration_seconds, Some(3600)); + } + + #[test] + fn test_combine_affinity_items() { + let node_affinity = create_node_affinity(); + let pod_affinity = create_pod_affinity(); + let pod_anti_affinity = create_pod_anti_affinity(); + let tolerations = create_tolerations(); + + let placement_config = PlacementConfig { + node_selector: None, + tolerations, + node_affinity: Some(node_affinity), + pod_affinity: Some(pod_affinity), + pod_anti_affinity: Some(pod_anti_affinity), + topology_spread_constraints: None, // Add sample topology constraints if needed + }; + + let combined_affinity = placement_config.combine_affinity_items().unwrap(); + + assert!( + combined_affinity.node_affinity.is_some(), + "Node affinity should be combined." + ); + assert!( + combined_affinity.pod_affinity.is_some(), + "Pod affinity should be combined." + ); + assert!( + combined_affinity.pod_anti_affinity.is_some(), + "Pod anti-affinity should be combined." + ); + + // Ensure that tolerations are converted correctly (this assumes a simple pass-through in your actual function) + let converted_tolerations = placement_config.convert_pooler_tolerations().unwrap(); + assert_eq!( + converted_tolerations.len(), + placement_config.tolerations.len(), + "Tolerations should be converted correctly." + ); + } +} diff --git a/tembo-operator/src/cloudnativepg/placement/cnpg_pod_affinity.rs b/tembo-operator/src/cloudnativepg/placement/cnpg_pod_affinity.rs new file mode 100644 index 000000000..2b98515ba --- /dev/null +++ b/tembo-operator/src/cloudnativepg/placement/cnpg_pod_affinity.rs @@ -0,0 +1,510 @@ +use crate::cloudnativepg::clusters::{ + ClusterAffinityAdditionalPodAffinity, + ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm, + ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector, + ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector, + ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecution, + ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector, + ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector, +}; +use crate::cloudnativepg::poolers::{ + PoolerTemplateSpecAffinityPodAffinity, + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecution, + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm, + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector, + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions, + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector, + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelectorMatchExpressions, + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecution, + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector, + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelectorMatchExpressions, + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector, + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelectorMatchExpressions, +}; +use k8s_openapi::api::core::v1::{PodAffinity, PodAffinityTerm, WeightedPodAffinityTerm}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, LabelSelectorRequirement}; + +// Start of helper functions to convert a ClusterAffinityPodAffinity to a k8s_openapi::api::core::v1::PodAffinity +// +// convert_pod_affinity_term converts a ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm to a k8s_openapi::api::core::v1::PodAffinityTerm +fn convert_exec_pod_affinity_term( + term: &ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm, +) -> PodAffinityTerm { + PodAffinityTerm { + label_selector: term + .label_selector + .as_ref() + .map(convert_affinity_term_label_selector), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_affinity_term_namespace_selector), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_required_pod_affinity_term converts a ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecution to a PodAffinityTerm +fn convert_required_pod_affinity_term( + term: &ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecution, +) -> PodAffinityTerm { + PodAffinityTerm { + label_selector: term + .label_selector + .as_ref() + .map(convert_affinity_exec_label_selector), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_affinity_exec_namespace_selector), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_exec_label_selector converts a ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector to a LabelSelector +fn convert_affinity_exec_label_selector( + selector: &ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector, +) -> LabelSelector { + LabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|expressions| { + expressions + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// convert_term_label_selector converts a ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector to a LabelSelector +fn convert_affinity_term_label_selector( + selector: &ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector, +) -> LabelSelector { + LabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|expressions| { + expressions + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// convert_exec_namespace_selector converts a ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector to a LabelSelector +fn convert_affinity_exec_namespace_selector( + ns_selector: &ClusterAffinityAdditionalPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector, +) -> LabelSelector { + LabelSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// convert_term_namespace_selector converts a ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector to a LabelSelector +fn convert_affinity_term_namespace_selector( + ns_selector: &ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector, +) -> LabelSelector { + // Conversion logic here, assuming it's the same as convert_label_selector logic + LabelSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// convert_pod_affinity converts a ClusterAffinityAdditionalPodAffinity to a PodAffinity +pub fn convert_pod_affinity( + cluster_affinity: &ClusterAffinityAdditionalPodAffinity, +) -> PodAffinity { + PodAffinity { + required_during_scheduling_ignored_during_execution: cluster_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .map(|req| req.iter().map(convert_required_pod_affinity_term).collect()), + preferred_during_scheduling_ignored_during_execution: cluster_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .map(|prefs| { + prefs + .iter() + .map(|pref| WeightedPodAffinityTerm { + weight: pref.weight, + pod_affinity_term: convert_exec_pod_affinity_term(&pref.pod_affinity_term), + }) + .collect() + }), + } +} + +// Start functions that do conversions to the PoolerTemplateSpec structs +// +// convert_pod_affinity_to_pooler converts a PodAffinity to a PoolerTemplateSpecAffinityPodAffinity struct +// to be used in the PoolerTemplateSpec struct when building out a pooler. +pub fn convert_pod_affinity_to_pooler( + pod_affinity: &PodAffinity, +) -> Option { + Some(PoolerTemplateSpecAffinityPodAffinity { + required_during_scheduling_ignored_during_execution: pod_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .map(|req| { + req.iter() + .map(convert_required_pod_affinity_term_to_pooler) + .collect() + }), + preferred_during_scheduling_ignored_during_execution: pod_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .map(|prefs| { + prefs + .iter() + .map(|pref| PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecution { + pod_affinity_term: convert_preferred_pod_affinity_term_to_pooler(&pref.pod_affinity_term), + weight: pref.weight, + }) + .collect() + }), + }) +} + +// convert_required_pod_affinity_term_to_pooler converts a PodAffinityTerm to a PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecution +// to be used in the PoolerTemplateSpecAffinityPodAffinity struct when building out a pooler. +fn convert_required_pod_affinity_term_to_pooler( + term: &PodAffinityTerm, +) -> PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecution { + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecution { + label_selector: term + .label_selector + .as_ref() + .map(convert_required_label_selector_to_pooler), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_required_namespace_selector_to_pooler), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_preferred_pod_affinity_term_to_pooler converts a PodAffinityTerm to a PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecution +// to be used in the PoolerTemplateSpecAffinityPodAffinity struct when building out a pooler. +fn convert_preferred_pod_affinity_term_to_pooler( + term: &PodAffinityTerm, +) -> PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm{ + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm { + label_selector: term + .label_selector + .as_ref() + .map(convert_preferred_label_selector_to_pooler), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_preferred_namespace_selector_to_pooler), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_preferred_namespace_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector +// to be used in the PoolerTemplateSpecAffinityPodAffinity struct when building out a pooler. +fn convert_preferred_namespace_selector_to_pooler( + ns_selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector{ + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + } +} + +// convert_preferred_label_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector +// to be used in the PoolerTemplateSpecAffinityPodAffinity struct when building out a pooler. +fn convert_preferred_label_selector_to_pooler( + selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector{ + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|exprs| exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect()), + } +} + +// convert_required_namespace_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector +// to be used in the PoolerTemplateSpecAffinityPodAffinity struct when building out a pooler. +fn convert_required_namespace_selector_to_pooler( + ns_selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector{ + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + } +} + +// convert_required_label_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector +// to be used in the PoolerTemplateSpecAffinityPodAffinity struct when building out a pooler. +fn convert_required_label_selector_to_pooler( + selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector +{ + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|exprs| { + exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cloudnativepg::clusters::ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions; + + fn create_sample_pod_affinity() -> PodAffinity { + PodAffinity { + required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("key", "value")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["default".to_string()]), + topology_key: "kubernetes.io/hostname".to_string(), + ..Default::default() + }]), + preferred_during_scheduling_ignored_during_execution: Some(vec![ + WeightedPodAffinityTerm { + weight: 100, + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("key2", "value2")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["default".to_string()]), + topology_key: "kubernetes.io/zone".to_string(), + ..Default::default() + }, + }, + ]), + } + } + + #[test] + fn test_convert_pod_affinity_term_full() { + let term = ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm { + label_selector: Some(ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector { + match_labels: Some(std::collections::BTreeMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ])), + match_expressions: Some(vec![ + ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions { + key: "exp_key".to_string(), + operator: "In".to_string(), + values: Some(vec!["val1".to_string(), "val2".to_string()]), + }, + ]), + }), + namespace_selector: Some(ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector { + match_labels: Some(std::collections::BTreeMap::from([("ns_key".to_string(), "ns_value".to_string())])), + match_expressions: None, + }), + namespaces: Some(vec!["namespace1".to_string(), "namespace2".to_string()]), + topology_key: "topology_key".to_string(), + }; + + let result = convert_exec_pod_affinity_term(&term); + assert_eq!(result.topology_key, "topology_key"); + assert_eq!( + result.namespaces, + Some(vec!["namespace1".to_string(), "namespace2".to_string()]) + ); + assert!(result.label_selector.is_some()); + assert!(result.namespace_selector.is_some()); + + let label_selector = result.label_selector.unwrap(); + assert_eq!( + label_selector.match_labels.unwrap().get("key1"), + Some(&"value1".to_string()) + ); + assert_eq!(label_selector.match_expressions.unwrap().len(), 1); + + let namespace_selector = result.namespace_selector.unwrap(); + assert_eq!( + namespace_selector.match_labels.unwrap().get("ns_key"), + Some(&"ns_value".to_string()) + ); + } + + // Test converting an empty PodAffinityTerm + #[test] + fn test_convert_pod_affinity_term_empty() { + let term = ClusterAffinityAdditionalPodAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm { + label_selector: None, + namespace_selector: None, + namespaces: None, + topology_key: "".to_string(), + }; + + let result = convert_exec_pod_affinity_term(&term); + assert_eq!(result.topology_key, ""); + assert!(result.label_selector.is_none()); + assert!(result.namespace_selector.is_none()); + assert!(result.namespaces.is_none()); + } + + #[test] + fn test_convert_pod_affinity_to_pooler_non_empty() { + let pod_affinity = create_sample_pod_affinity(); + let result = convert_pod_affinity_to_pooler(&pod_affinity); + + assert!(result.is_some()); + let pooler_pod_affinity = result.unwrap(); + assert_eq!( + pooler_pod_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .unwrap() + .len(), + 1 + ); + assert_eq!( + pooler_pod_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .unwrap() + .len(), + 1 + ); + + let required = &pooler_pod_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .unwrap()[0]; + assert_eq!( + required + .label_selector + .as_ref() + .unwrap() + .match_labels + .as_ref() + .unwrap() + .get("key"), + Some(&"value".to_string()) + ); + + let preferred = &pooler_pod_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .unwrap()[0]; + assert_eq!( + preferred + .pod_affinity_term + .label_selector + .as_ref() + .unwrap() + .match_labels + .as_ref() + .unwrap() + .get("key2"), + Some(&"value2".to_string()) + ); + } + + #[test] + fn test_convert_pod_affinity_to_pooler_empty() { + let pod_affinity = PodAffinity { + required_during_scheduling_ignored_during_execution: None, + preferred_during_scheduling_ignored_during_execution: None, + }; + let result = convert_pod_affinity_to_pooler(&pod_affinity); + + assert!(result.is_some()); + let pooler_pod_affinity = result.unwrap(); + assert!(pooler_pod_affinity + .required_during_scheduling_ignored_during_execution + .is_none()); + assert!(pooler_pod_affinity + .preferred_during_scheduling_ignored_during_execution + .is_none()); + } +} diff --git a/tembo-operator/src/cloudnativepg/placement/cnpg_pod_anti_affinity.rs b/tembo-operator/src/cloudnativepg/placement/cnpg_pod_anti_affinity.rs new file mode 100644 index 000000000..a1154e983 --- /dev/null +++ b/tembo-operator/src/cloudnativepg/placement/cnpg_pod_anti_affinity.rs @@ -0,0 +1,511 @@ +use crate::cloudnativepg::clusters::{ + ClusterAffinityAdditionalPodAntiAffinity, + ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm, + ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector, + ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector, + ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution, + ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector, + ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector, +}; +use crate::cloudnativepg::poolers::{ + PoolerTemplateSpecAffinityPodAntiAffinity, + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecution, + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm, + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector, + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions, + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector, + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelectorMatchExpressions, + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution, + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector, + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelectorMatchExpressions, + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector, + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelectorMatchExpressions, +}; +use k8s_openapi::api::core::v1::{PodAffinityTerm, PodAntiAffinity, WeightedPodAffinityTerm}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, LabelSelectorRequirement}; + +// Start of helper functions to convert a ClusterAffinityPodAntiAffinity to a k8s_openapi::api::core::v1::PodAntiAffinity +// +// convert_pod_anti_affinity converts a ClusterAffinityAdditionalPodAntiAffinity to a PodAntiAffinity +pub fn convert_pod_anti_affinity( + cluster_affinity: &ClusterAffinityAdditionalPodAntiAffinity, +) -> PodAntiAffinity { + PodAntiAffinity { + required_during_scheduling_ignored_during_execution: cluster_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .map(|req| { + req.iter() + .map(convert_required_pod_anti_affinity_term) + .collect() + }), + preferred_during_scheduling_ignored_during_execution: cluster_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .map(|prefs| { + prefs + .iter() + .map(|pref| WeightedPodAffinityTerm { + weight: pref.weight, + pod_affinity_term: convert_exec_pod_anti_affinity_term( + &pref.pod_affinity_term, + ), + }) + .collect() + }), + } +} + +// convert_required_pod_anti_affinity_term converts a ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution to a PodAffinityTerm +fn convert_required_pod_anti_affinity_term( + term: &ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution, +) -> PodAffinityTerm { + PodAffinityTerm { + label_selector: term + .label_selector + .as_ref() + .map(convert_anti_affinity_exec_label_selector), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_anti_affinity_exec_namespace_selector), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_anti_affinity_exec_label_selector converts a ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector to a LabelSelector +fn convert_anti_affinity_exec_label_selector( + selector: &ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector, +) -> LabelSelector { + LabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|expressions| { + expressions + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// convert_anti_affinity_exec_namespace_selector converts a ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector to a LabelSelector +fn convert_anti_affinity_exec_namespace_selector( + ns_selector: &ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector, +) -> LabelSelector { + LabelSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// convert_exec_pod_anti_affinity_term converts a ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm to a PodAffinityTerm +fn convert_exec_pod_anti_affinity_term( + term: &ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm, +) -> PodAffinityTerm { + PodAffinityTerm { + label_selector: term + .label_selector + .as_ref() + .map(convert_anti_affinity_term_label_selector), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_anti_affinity_term_namespace_selector), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_anti_affinity_term_namespace_selector converts a ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector to a LabelSelector +fn convert_anti_affinity_term_namespace_selector( + ns_selector: &ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector, +) -> LabelSelector { + // Conversion logic here, assuming it's the same as convert_label_selector logic + LabelSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// convert_anti_affinity_term_label_selector converts a ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector to a LabelSelector +fn convert_anti_affinity_term_label_selector( + selector: &ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector, +) -> LabelSelector { + LabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|expressions| { + expressions + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: Some( + expr.values + .as_ref() + .map_or_else(Vec::new, |vals| vals.clone()), + ), + }) + .collect() + }), + } +} + +// Start functions that do conversions to the PoolerTemplateSpec structs +// +// convert_pod_anti_affinity_to_pooler converts a PodAntiAffinity to a PoolerTemplateSpecAffinityPodAntiAffinity struct +// to be used in the PoolerTemplateSpec struct when building out a pooler. +pub fn convert_pod_anti_affinity_to_pooler( + pod_affinity: &PodAntiAffinity, +) -> Option { + Some(PoolerTemplateSpecAffinityPodAntiAffinity { + required_during_scheduling_ignored_during_execution: pod_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .map(|req| { + req.iter() + .map(convert_required_pod_anti_affinity_term_to_pooler) + .collect() + }), + preferred_during_scheduling_ignored_during_execution: pod_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .map(|prefs| { + prefs + .iter() + .map(|pref| PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecution { + pod_affinity_term: convert_preferred_pod_anti_affinity_term_to_pooler(&pref.pod_affinity_term), + weight: pref.weight, + }) + .collect() + }), + }) +} + +// convert_required_pod_anti_affinity_term_to_pooler converts a PodAffinityTerm to a PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution +// to be used in the PoolerTemplateSpecAffinityPodAntiAffinity struct when building out a pooler. +fn convert_required_pod_anti_affinity_term_to_pooler( + term: &PodAffinityTerm, +) -> PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution { + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution { + label_selector: term + .label_selector + .as_ref() + .map(convert_required_label_selector_to_pooler), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_required_namespace_selector_to_pooler), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_preferred_pod_anti_affinity_term_to_pooler converts a PodAffinityTerm to a PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm +fn convert_preferred_pod_anti_affinity_term_to_pooler( + term: &PodAffinityTerm, +) -> PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm{ + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm { + label_selector: term + .label_selector + .as_ref() + .map(convert_preferred_label_selector_to_pooler), + namespace_selector: term + .namespace_selector + .as_ref() + .map(convert_preferred_namespace_selector_to_pooler), + namespaces: term.namespaces.clone(), + topology_key: term.topology_key.clone(), + } +} + +// convert_preferred_namespace_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector +fn convert_preferred_namespace_selector_to_pooler( + ns_selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector{ + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + } +} + +// convert_preferred_label_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector +fn convert_preferred_label_selector_to_pooler( + selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector{ + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|exprs| exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect()), + } +} + +// convert_required_namespace_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector +fn convert_required_namespace_selector_to_pooler( + ns_selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector{ + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelector { + match_labels: ns_selector.match_labels.clone(), + match_expressions: ns_selector.match_expressions.as_ref().map(|exprs| { + exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionNamespaceSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + } +} + +// convert_required_label_selector_to_pooler converts a LabelSelector to a PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector +fn convert_required_label_selector_to_pooler( + selector: &LabelSelector, +) -> PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector +{ + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|exprs| { + exprs.iter().map(|expr| { + PoolerTemplateSpecAffinityPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }).collect() + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cloudnativepg::clusters::ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions; + + fn create_sample_pod_anti_affinity() -> PodAntiAffinity { + PodAntiAffinity { + required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("key1", "value1")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["default".to_string()]), + topology_key: "kubernetes.io/hostname".to_string(), + ..Default::default() + }]), + preferred_during_scheduling_ignored_during_execution: Some(vec![ + WeightedPodAffinityTerm { + weight: 100, + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some( + [("key2", "value2")] + .iter() + .cloned() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }), + namespaces: Some(vec!["default".to_string()]), + topology_key: "kubernetes.io/zone".to_string(), + ..Default::default() + }, + }, + ]), + } + } + + #[test] + fn test_convert_pod_anti_affinity_term_full() { + let term = ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTerm { + label_selector: Some(ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelector { + match_labels: Some(std::collections::BTreeMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ])), + match_expressions: Some(vec![ + ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermLabelSelectorMatchExpressions { + key: "exp_key".to_string(), + operator: "In".to_string(), + values: Some(vec!["val1".to_string(), "val2".to_string()]), + }, + ]), + }), + namespace_selector: Some(ClusterAffinityAdditionalPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecutionPodAffinityTermNamespaceSelector { + match_labels: Some(std::collections::BTreeMap::from([("ns_key".to_string(), "ns_value".to_string())])), + match_expressions: None, + }), + namespaces: Some(vec!["namespace1".to_string(), "namespace2".to_string()]), + topology_key: "topology_key".to_string(), + }; + + let result = convert_exec_pod_anti_affinity_term(&term); + assert_eq!(result.topology_key, "topology_key"); + assert_eq!( + result.namespaces, + Some(vec!["namespace1".to_string(), "namespace2".to_string()]) + ); + assert!(result.label_selector.is_some()); + assert!(result.namespace_selector.is_some()); + + let label_selector = result.label_selector.unwrap(); + assert_eq!( + label_selector.match_labels.unwrap().get("key1"), + Some(&"value1".to_string()) + ); + assert_eq!(label_selector.match_expressions.unwrap().len(), 1); + + let namespace_selector = result.namespace_selector.unwrap(); + assert_eq!( + namespace_selector.match_labels.unwrap().get("ns_key"), + Some(&"ns_value".to_string()) + ); + } + #[test] + + fn test_convert_required_pod_anti_affinity_term() { + let term = ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecution { + label_selector: Some(ClusterAffinityAdditionalPodAntiAffinityRequiredDuringSchedulingIgnoredDuringExecutionLabelSelector { + match_expressions: None, + match_labels: Some(std::collections::BTreeMap::from([("key".to_string(), "value".to_string())])), + }), + namespace_selector: None, + namespaces: Some(vec!["default".to_string()]), + topology_key: "kubernetes.io/hostname".to_string(), + }; + + let pod_affinity_term = convert_required_pod_anti_affinity_term(&term); + assert_eq!(pod_affinity_term.topology_key, "kubernetes.io/hostname"); + assert!(pod_affinity_term.label_selector.is_some()); + } + + #[test] + fn test_convert_pod_anti_affinity_to_pooler_non_empty() { + let pod_anti_affinity = create_sample_pod_anti_affinity(); + let result = convert_pod_anti_affinity_to_pooler(&pod_anti_affinity); + + assert!(result.is_some()); + let pooler_anti_affinity = result.unwrap(); + assert_eq!( + pooler_anti_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .unwrap() + .len(), + 1 + ); + assert_eq!( + pooler_anti_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .unwrap() + .len(), + 1 + ); + + let required = &pooler_anti_affinity + .required_during_scheduling_ignored_during_execution + .as_ref() + .unwrap()[0]; + assert_eq!( + required + .label_selector + .as_ref() + .unwrap() + .match_labels + .as_ref() + .unwrap() + .get("key1"), + Some(&"value1".to_string()) + ); + + let preferred = &pooler_anti_affinity + .preferred_during_scheduling_ignored_during_execution + .as_ref() + .unwrap()[0]; + assert_eq!( + preferred + .pod_affinity_term + .label_selector + .as_ref() + .unwrap() + .match_labels + .as_ref() + .unwrap() + .get("key2"), + Some(&"value2".to_string()) + ); + } + + #[test] + fn test_convert_pod_anti_affinity_to_pooler_empty() { + let pod_anti_affinity = PodAntiAffinity { + required_during_scheduling_ignored_during_execution: None, + preferred_during_scheduling_ignored_during_execution: None, + }; + let result = convert_pod_anti_affinity_to_pooler(&pod_anti_affinity); + + assert!(result.is_some()); + let pooler_anti_affinity = result.unwrap(); + assert!(pooler_anti_affinity + .required_during_scheduling_ignored_during_execution + .is_none()); + assert!(pooler_anti_affinity + .preferred_during_scheduling_ignored_during_execution + .is_none()); + } +} diff --git a/tembo-operator/src/cloudnativepg/placement/cnpg_toleration.rs b/tembo-operator/src/cloudnativepg/placement/cnpg_toleration.rs new file mode 100644 index 000000000..185c02995 --- /dev/null +++ b/tembo-operator/src/cloudnativepg/placement/cnpg_toleration.rs @@ -0,0 +1,90 @@ +use crate::cloudnativepg::clusters::ClusterAffinityTolerations; +use crate::cloudnativepg::poolers::PoolerTemplateSpecTolerations; +use k8s_openapi::api::core::v1::Toleration; + +// convert_toleration converts a ClusterAffinityTolerations to a k8s_openapi::api::core::v1::Toleration +pub fn convert_toleration(cat: &ClusterAffinityTolerations) -> Toleration { + Toleration { + effect: cat.effect.clone(), + key: cat.key.clone(), + operator: cat.operator.clone(), + toleration_seconds: cat.toleration_seconds, + value: cat.value.clone(), + } +} + +// convert_toleration converts a k8s_openapi::api::core::v1::Toleration to a PoolerTemplateSpecTolerations to +// be used in the PoolerTemplateSpec struct when building out a pooler. +pub fn convert_toleration_to_pooler( + toleration: &Toleration, +) -> Option { + if toleration.key.is_none() && toleration.effect.is_none() { + None + } else { + Some(PoolerTemplateSpecTolerations { + effect: toleration.effect.clone(), + key: toleration.key.clone(), + operator: toleration.operator.clone().or(Some("Equal".to_string())), + toleration_seconds: toleration.toleration_seconds, + value: toleration.value.clone(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::api::core::v1::Toleration; + + #[test] + fn test_convert_toleration() { + let cat = ClusterAffinityTolerations { + effect: Some("NoExecute".to_string()), + key: Some("key1".to_string()), + operator: Some("Exists".to_string()), + toleration_seconds: Some(3600), + value: Some("value1".to_string()), + }; + + let result = convert_toleration(&cat); + assert_eq!(result.effect, Some("NoExecute".to_string())); + assert_eq!(result.key, Some("key1".to_string())); + assert_eq!(result.operator, Some("Exists".to_string())); + assert_eq!(result.toleration_seconds, Some(3600)); + assert_eq!(result.value, Some("value1".to_string())); + } + + #[test] + fn test_convert_toleration_to_pooler_non_empty_key_and_effect() { + let toleration = Toleration { + effect: Some("NoExecute".to_string()), + key: Some("key1".to_string()), + operator: None, + toleration_seconds: Some(3600), + value: Some("value1".to_string()), + }; + + let result = convert_toleration_to_pooler(&toleration); + assert!(result.is_some()); + let pooler_toleration = result.unwrap(); + assert_eq!(pooler_toleration.effect, Some("NoExecute".to_string())); + assert_eq!(pooler_toleration.key, Some("key1".to_string())); + assert_eq!(pooler_toleration.operator, Some("Equal".to_string())); // Default operator + assert_eq!(pooler_toleration.toleration_seconds, Some(3600)); + assert_eq!(pooler_toleration.value, Some("value1".to_string())); + } + + #[test] + fn test_convert_toleration_to_pooler_empty_key_and_effect() { + let toleration = Toleration { + effect: None, + key: None, + operator: None, + toleration_seconds: None, + value: None, + }; + + let result = convert_toleration_to_pooler(&toleration); + assert!(result.is_none()); + } +} diff --git a/tembo-operator/src/cloudnativepg/placement/cnpg_topology.rs b/tembo-operator/src/cloudnativepg/placement/cnpg_topology.rs new file mode 100644 index 000000000..e77a3a42d --- /dev/null +++ b/tembo-operator/src/cloudnativepg/placement/cnpg_topology.rs @@ -0,0 +1,168 @@ +use crate::cloudnativepg::clusters::{ + ClusterTopologySpreadConstraints, ClusterTopologySpreadConstraintsLabelSelector, +}; +use crate::cloudnativepg::poolers::{ + PoolerTemplateSpecTopologySpreadConstraints, + PoolerTemplateSpecTopologySpreadConstraintsLabelSelector, + PoolerTemplateSpecTopologySpreadConstraintsLabelSelectorMatchExpressions, +}; +use k8s_openapi::api::core::v1::TopologySpreadConstraint; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, LabelSelectorRequirement}; + +pub fn convert_cluster_topology_spread_constraints( + cluster_constraints: &Option>, +) -> Option> { + cluster_constraints.as_ref().map(|constraints| { + constraints + .iter() + .map(|constraint| TopologySpreadConstraint { + label_selector: constraint + .label_selector + .as_ref() + .map(convert_topo_label_selector), + max_skew: constraint.max_skew, + min_domains: constraint.min_domains, + node_affinity_policy: constraint.node_affinity_policy.clone(), + node_taints_policy: constraint.node_taints_policy.clone(), + topology_key: constraint.topology_key.clone(), + when_unsatisfiable: constraint.when_unsatisfiable.clone(), + match_label_keys: constraint.match_label_keys.clone(), + }) + .collect() + }) +} + +fn convert_topo_label_selector( + selector: &ClusterTopologySpreadConstraintsLabelSelector, +) -> LabelSelector { + LabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|expressions| { + expressions + .iter() + .map(|expr| LabelSelectorRequirement { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + }) + .collect() + }), + } +} + +pub fn convert_topo_to_pooler( + topologies: &[TopologySpreadConstraint], +) -> Option> { + if topologies.is_empty() { + None + } else { + Some( + topologies + .iter() + .map(|topo| PoolerTemplateSpecTopologySpreadConstraints { + label_selector: topo + .label_selector + .as_ref() + .map(convert_topo_label_selector_to_pooler), + max_skew: topo.max_skew, + min_domains: topo.min_domains, + node_affinity_policy: topo.node_affinity_policy.clone(), + node_taints_policy: topo.node_taints_policy.clone(), + topology_key: topo.topology_key.clone(), + when_unsatisfiable: topo.when_unsatisfiable.clone(), + match_label_keys: topo.match_label_keys.clone(), + }) + .collect(), + ) + } +} + +// Function to convert k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector +// to PoolerTemplateSpecTopologySpreadConstraintsLabelSelector +fn convert_topo_label_selector_to_pooler( + selector: &LabelSelector, +) -> PoolerTemplateSpecTopologySpreadConstraintsLabelSelector { + PoolerTemplateSpecTopologySpreadConstraintsLabelSelector { + match_labels: selector.match_labels.clone(), + match_expressions: selector.match_expressions.as_ref().map(|expressions| { + expressions + .iter() + .map(|expr| { + PoolerTemplateSpecTopologySpreadConstraintsLabelSelectorMatchExpressions { + key: expr.key.clone(), + operator: expr.operator.clone(), + values: expr.values.clone(), + } + }) + .collect() + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cloudnativepg::clusters::ClusterTopologySpreadConstraintsLabelSelectorMatchExpressions; + use std::collections::BTreeMap; + + #[test] + fn test_convert_full() { + let cluster_constraints = Some(vec![ClusterTopologySpreadConstraints { + label_selector: Some(ClusterTopologySpreadConstraintsLabelSelector { + match_labels: Some(BTreeMap::from([("key1".to_string(), "value1".to_string())])), + match_expressions: Some(vec![ + ClusterTopologySpreadConstraintsLabelSelectorMatchExpressions { + key: "exp_key".to_string(), + operator: "In".to_string(), + values: Some(vec!["val1".to_string(), "val2".to_string()]), + }, + ]), + }), + match_label_keys: Some(vec!["label_key1".to_string()]), + max_skew: 1, + min_domains: Some(2), + node_affinity_policy: Some("Honor".to_string()), + node_taints_policy: Some("Ignore".to_string()), + topology_key: "topology_key".to_string(), + when_unsatisfiable: "DoNotSchedule".to_string(), + }]); + + let result = convert_cluster_topology_spread_constraints(&cluster_constraints).unwrap(); + assert_eq!(result.len(), 1); + let constraint = &result[0]; + assert_eq!(constraint.max_skew, 1); + assert_eq!(constraint.topology_key, "topology_key"); + assert_eq!(constraint.when_unsatisfiable, "DoNotSchedule"); + assert_eq!(constraint.min_domains, Some(2)); + assert_eq!(constraint.node_affinity_policy, Some("Honor".to_string())); + assert_eq!(constraint.node_taints_policy, Some("Ignore".to_string())); + + let label_selector = constraint.label_selector.as_ref().unwrap(); + assert_eq!( + label_selector.match_labels.as_ref().unwrap().get("key1"), + Some(&"value1".to_string()) + ); + assert_eq!(label_selector.match_expressions.as_ref().unwrap().len(), 1); + } + + #[test] + fn test_convert_topology_spread_constraints_none() { + let constraints = None; + let result = convert_cluster_topology_spread_constraints(&constraints); + assert!( + result.is_none(), + "Expected no topology constraints to be converted." + ); + } + + #[test] + fn test_convert_topology_spread_constraints_empty_vector() { + let constraints = Some(Vec::new()); + let result = convert_cluster_topology_spread_constraints(&constraints); + assert_eq!( + result, + Some(Vec::new()), + "Expected an empty vector of topology constraints." + ); + } +} diff --git a/tembo-operator/src/cloudnativepg/placement/mod.rs b/tembo-operator/src/cloudnativepg/placement/mod.rs new file mode 100644 index 000000000..2f8acbc34 --- /dev/null +++ b/tembo-operator/src/cloudnativepg/placement/mod.rs @@ -0,0 +1,6 @@ +mod cnpg_node_affinity; +pub(crate) mod cnpg_placement; +mod cnpg_pod_affinity; +mod cnpg_pod_anti_affinity; +mod cnpg_toleration; +mod cnpg_topology; diff --git a/tembo-operator/src/controller.rs b/tembo-operator/src/controller.rs index 8311b02a8..c59a8e740 100644 --- a/tembo-operator/src/controller.rs +++ b/tembo-operator/src/controller.rs @@ -10,6 +10,7 @@ use crate::{ cnpg_cluster_from_cdb, reconcile_cnpg, reconcile_cnpg_scheduled_backup, reconcile_pooler, }, + placement::cnpg_placement::PlacementConfig, VOLUME_SNAPSHOT_CLASS_NAME, }, config::Config, @@ -174,6 +175,9 @@ impl CoreDB { let name = self.name_any(); let coredbs: Api = Api::namespaced(client.clone(), &ns); + // Setup Node/Pod Placement Configuration for the Pooler and App Service deployments + let placement_config = PlacementConfig::new(self); + reconcile_network_policies(ctx.client.clone(), &ns).await?; // Fetch any metadata we need from Trunk @@ -292,7 +296,7 @@ impl CoreDB { debug!("Reconciling secret"); // Superuser connection info reconcile_secret(self, ctx.clone()).await?; - reconcile_app_services(self, ctx.clone()).await?; + reconcile_app_services(self, ctx.clone(), placement_config.clone()).await?; if self .spec @@ -343,7 +347,7 @@ impl CoreDB { })?; // Reconcile Pooler resource - reconcile_pooler(self, ctx.clone()).await?; + reconcile_pooler(self, ctx.clone(), placement_config.clone()).await?; // Check if Postgres is already running let pg_postmaster_start_time = is_not_restarting(self, ctx.clone(), "postgres").await?; diff --git a/tembo-operator/src/defaults.rs b/tembo-operator/src/defaults.rs index bfd5ee32d..c41919051 100644 --- a/tembo-operator/src/defaults.rs +++ b/tembo-operator/src/defaults.rs @@ -9,6 +9,7 @@ use crate::{ apis::coredb_types::{ Backup, ConnectionPooler, PgBouncer, S3Credentials, ServiceAccountTemplate, VolumeSnapshot, }, + cloudnativepg::clusters::ClusterAffinity, cloudnativepg::poolers::{PoolerPgbouncerPoolMode, PoolerTemplateSpecContainersResources}, extensions::types::{Extension, TrunkInstall}, stacks::types::ImagePerPgVersion, @@ -227,6 +228,14 @@ pub fn default_volume_snapshot() -> Option { }) } +pub fn default_affinity_configuration() -> Option { + Some(ClusterAffinity { + pod_anti_affinity_type: Some("preferred".to_string()), + topology_key: Some("topology.kubernetes.io/zone".to_string()), + ..ClusterAffinity::default() + }) +} + #[cfg(test)] mod tests { use super::*;