Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/rbac/cluster/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ rules:
- get
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- patch
- apiGroups:
- pgv2.percona.com
resources:
Expand Down
7 changes: 7 additions & 0 deletions config/rbac/namespace/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ rules:
- get
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- patch
- apiGroups:
- pgv2.percona.com
resources:
Expand Down
7 changes: 7 additions & 0 deletions deploy/bundle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53738,6 +53738,13 @@ rules:
- get
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- patch
- apiGroups:
- pgv2.percona.com
resources:
Expand Down
7 changes: 7 additions & 0 deletions deploy/cw-bundle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53738,6 +53738,13 @@ rules:
- get
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- patch
- apiGroups:
- pgv2.percona.com
resources:
Expand Down
7 changes: 7 additions & 0 deletions deploy/cw-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ rules:
- get
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- patch
- apiGroups:
- pgv2.percona.com
resources:
Expand Down
7 changes: 7 additions & 0 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ rules:
- get
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- patch
- apiGroups:
- pgv2.percona.com
resources:
Expand Down
60 changes: 31 additions & 29 deletions internal/controller/postgrescluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -89,18 +90,18 @@ func (r *Reconciler) reconcileClusterPodService(
return clusterPodService, err
}

// generateClusterPrimaryService returns a v1.Service and v1.Endpoints that
// generateClusterPrimaryService returns a v1.Service and discoveryv1.EndpointSlice that
// resolve to the PostgreSQL primary instance.
func (r *Reconciler) generateClusterPrimaryService(
cluster *v1beta1.PostgresCluster, leader *corev1.Service,
) (*corev1.Service, *corev1.Endpoints, error) {
) (*corev1.Service, *discoveryv1.EndpointSlice, error) {
// We want to name and label our primary Service consistently. When Patroni is
// using Endpoints for its DCS, however, they and any Service that uses them
// must use the same name as the Patroni "scope" which has its own constraints.
//
// To stay free from those constraints, our primary Service resolves to the
// ClusterIP of the Service created in Reconciler.reconcilePatroniLeaderLease
// when Patroni is using Endpoints.
// when Patroni is using EndpointSlices.

service := &corev1.Service{ObjectMeta: naming.ClusterPrimaryService(cluster)}
service.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Service"))
Expand All @@ -117,18 +118,22 @@ func (r *Reconciler) generateClusterPrimaryService(

err := errors.WithStack(r.setControllerReference(cluster, service))

// Endpoints for a Service have the same name as the Service. Copy labels,
// annotations, and ownership, too.
endpoints := &corev1.Endpoints{}
service.ObjectMeta.DeepCopyInto(&endpoints.ObjectMeta)
endpoints.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Endpoints"))
// EndpointSlice for a Service. Copy labels, annotations, and ownership.
endpointSlice := &discoveryv1.EndpointSlice{}
service.ObjectMeta.DeepCopyInto(&endpointSlice.ObjectMeta)
endpointSlice.SetGroupVersionKind(discoveryv1.SchemeGroupVersion.WithKind("EndpointSlice"))

if endpointSlice.Labels == nil {
endpointSlice.Labels = make(map[string]string)
}
endpointSlice.Labels[discoveryv1.LabelServiceName] = service.Name

if leader == nil {
// TODO(cbandy): We need to build a different kind of Service here.
return nil, nil, errors.New("Patroni DCS other than Kubernetes Endpoints is not implemented")
return nil, nil, errors.New("Patroni DCS other than Kubernetes EndpointSlices is not implemented")
}

// Allocate no IP address (headless) and manage the Endpoints ourselves.
// Allocate no IP address (headless) and manage the EndpointSlice ourselves.
// - https://docs.k8s.io/concepts/services-networking/service/#headless-services
// - https://docs.k8s.io/concepts/services-networking/service/#services-without-selectors
service.Spec.ClusterIP = corev1.ClusterIPNone
Expand All @@ -141,44 +146,41 @@ func (r *Reconciler) generateClusterPrimaryService(
TargetPort: intstr.FromString(naming.PortPostgreSQL),
}}

// Resolve to the ClusterIP for which Patroni has configured the Endpoints.
endpoints.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: leader.Spec.ClusterIP}},
// Set the address type for the EndpointSlice
endpointSlice.AddressType = discoveryv1.AddressTypeIPv4

endpointSlice.Endpoints = []discoveryv1.Endpoint{{
Addresses: []string{leader.Spec.ClusterIP},
}}

// Copy the EndpointPorts from the ServicePorts.
for _, sp := range service.Spec.Ports {
endpoints.Subsets[0].Ports = append(endpoints.Subsets[0].Ports,
corev1.EndpointPort{
Name: sp.Name,
Port: sp.Port,
Protocol: sp.Protocol,
port := sp.Port
endpointSlice.Ports = append(endpointSlice.Ports,
discoveryv1.EndpointPort{
Name: &sp.Name,
Port: &port,
Protocol: &sp.Protocol,
})
}

return service, endpoints, err
return service, endpointSlice, err
}

// +kubebuilder:rbac:groups="",resources="endpoints",verbs={create,patch}
// +kubebuilder:rbac:groups="discovery.k8s.io",resources="endpointslices",verbs={create,patch}
// +kubebuilder:rbac:groups="",resources="services",verbs={create,patch}

// The OpenShift RestrictedEndpointsAdmission plugin requires special
// authorization to create Endpoints that contain ClusterIPs.
// - https://github.com/openshift/origin/pull/9383
// +kubebuilder:rbac:groups="",resources="endpoints/restricted",verbs={create}

// reconcileClusterPrimaryService writes the Service and Endpoints that resolve
// reconcileClusterPrimaryService writes the Service and EndpointSlice that resolve
// to the PostgreSQL primary instance.
func (r *Reconciler) reconcileClusterPrimaryService(
ctx context.Context, cluster *v1beta1.PostgresCluster, leader *corev1.Service,
) (*corev1.Service, error) {
service, endpoints, err := r.generateClusterPrimaryService(cluster, leader)
service, endpointSlice, err := r.generateClusterPrimaryService(cluster, leader)

if err == nil {
err = errors.WithStack(r.apply(ctx, service))
}
if err == nil {
err = errors.WithStack(r.apply(ctx, endpoints))
err = errors.WithStack(r.apply(ctx, endpointSlice))
}
return service, err
}
Expand Down
31 changes: 17 additions & 14 deletions internal/controller/postgrescluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -597,7 +598,7 @@ func TestGenerateClusterPrimaryService(t *testing.T) {
_, _, err := reconciler.generateClusterPrimaryService(cluster, nil)
assert.ErrorContains(t, err, "not implemented")

alwaysExpect := func(t testing.TB, service *corev1.Service, endpoints *corev1.Endpoints) {
alwaysExpect := func(t testing.TB, service *corev1.Service, endpointSlice *discoveryv1.EndpointSlice) {
assert.Assert(t, cmp.MarshalMatches(service.TypeMeta, `
apiVersion: v1
kind: Service
Expand Down Expand Up @@ -632,16 +633,17 @@ ownerReferences:
assert.Assert(t, service.Spec.Selector == nil,
"got %v", service.Spec.Selector)

assert.Assert(t, cmp.MarshalMatches(endpoints, `
apiVersion: v1
kind: Endpoints
assert.Assert(t, cmp.MarshalMatches(endpointSlice, `
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
labels:
app.kubernetes.io/component: pg
app.kubernetes.io/instance: pg5
app.kubernetes.io/managed-by: percona-postgresql-operator
app.kubernetes.io/name: percona-postgresql
app.kubernetes.io/part-of: percona-postgresql
kubernetes.io/service-name: pg5-primary
postgres-operator.crunchydata.com/cluster: pg5
postgres-operator.crunchydata.com/role: primary
name: pg5-primary
Expand All @@ -653,19 +655,20 @@ metadata:
kind: PostgresCluster
name: pg5
uid: ""
subsets:
addressType: IPv4
endpoints:
- addresses:
- ip: 1.9.8.3
ports:
- name: postgres
port: 2600
protocol: TCP
- 1.9.8.3
ports:
- name: postgres
port: 2600
protocol: TCP
`))
}

service, endpoints, err := reconciler.generateClusterPrimaryService(cluster, leader)
service, endpointSlice, err := reconciler.generateClusterPrimaryService(cluster, leader)
assert.NilError(t, err)
alwaysExpect(t, service, endpoints)
alwaysExpect(t, service, endpointSlice)

t.Run("LeaderLoadBalancer", func(t *testing.T) {
leader := leader.DeepCopy()
Expand All @@ -676,9 +679,9 @@ subsets:
{IP: "1.2.3.4", Hostname: "only.the.first"},
}

service, endpoints, err := reconciler.generateClusterPrimaryService(cluster, leader)
service, endpointSlice, err := reconciler.generateClusterPrimaryService(cluster, leader)
assert.NilError(t, err)
alwaysExpect(t, service, endpoints)
alwaysExpect(t, service, endpointSlice)

// generateClusterPrimaryService no longer sets ExternalIPs or ExternalName from
// LoadBalancer-type leader service
Expand Down
Loading