Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended status check for reconciliation #207

Merged
merged 7 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
133 changes: 112 additions & 21 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/aenix-io/etcd-operator/internal/log"
Expand All @@ -47,6 +48,10 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
etcdDefaultTimeout = 5 * time.Second
)

// EtcdClusterReconciler reconciles a EtcdCluster object
type EtcdClusterReconciler struct {
client.Client
Expand All @@ -56,6 +61,7 @@ type EtcdClusterReconciler struct {
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=view;list;watch
Expand All @@ -80,13 +86,62 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, nil
}

state := observables{}

// create two services and the pdb
err = r.ensureUnconditionalObjects(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}

// fetch STS if exists
err = r.Get(ctx, req.NamespacedName, &state.statefulSet)
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err)
}
state.stsExists = state.statefulSet.UID != ""

// fetch endpoints
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
if err != nil {
return ctrl.Result{}, err
}
state.endpointsFound = clusterClient != nil && singleClients != nil

if !state.endpointsFound {
if !state.stsExists {
// TODO: happy path for new cluster creation
log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
}
}

// get status of every endpoint and member list from every endpoint
state.etcdStatuses = make([]etcdStatus, len(singleClients))
{
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
for i := range singleClients {
wg.Add(1)
go func(i int) {
defer wg.Done()
state.etcdStatuses[i].fill(ctx, singleClients[i])
}(i)
}
wg.Wait()
cancel()
}
state.setClusterID()
if state.inSplitbrain() {
lllamnyp marked this conversation as resolved.
Show resolved Hide resolved
log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue")
return ctrl.Result{}, nil
}
// fill conditions
if len(instance.Status.Conditions) == 0 {
factory.FillConditions(instance)
}

// ensure managed resources
if err = r.ensureClusterObjects(ctx, instance); err != nil {
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
}

Expand Down Expand Up @@ -138,8 +193,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.updateStatus(ctx, instance)
}

// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(
// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {

if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
Expand All @@ -148,30 +203,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects(
}
log.Debug(ctx, "cluster state configmap reconciled")

if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile headless service failed")
return err
}
log.Debug(ctx, "headless service reconciled")

if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile statefulset failed")
return err
}
log.Debug(ctx, "statefulset reconciled")

if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile client service failed")
return err
}
log.Debug(ctx, "client service reconciled")

if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile pdb failed")
return err
}
log.Debug(ctx, "pdb reconciled")

return nil
}

Expand Down Expand Up @@ -498,3 +535,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie

return nil
}

// ensureUnconditionalObjects creates the two services and the PDB
// which can be created at the start of the reconciliation loop
// without any risk of disrupting the etcd cluster
func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error {
const concurrentOperations = 3
c := make(chan error)
defer close(c)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(concurrentOperations)
wrapWithMsg := func(err error, msg string) error {
if err != nil {
return fmt.Errorf(msg+": %w", err)
}
return nil
}
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client),
"couldn't ensure client service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client),
"couldn't ensure headless service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client),
"couldn't ensure pod disruption budget"):
}
}(c)

for i := 0; i < concurrentOperations; i++ {
if err := <-c; err != nil {
cancel()

// let all goroutines select the ctx.Done() case to avoid races on closed channels
wg.Wait()
return err
}
}
return nil
}
63 changes: 63 additions & 0 deletions internal/controller/factory/etcd_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package factory

import (
"context"
"fmt"

"github.com/aenix-io/etcd-operator/api/v1alpha1"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) {
cfg, err := configFromCluster(ctx, cluster, cli)
if err != nil {
return nil, nil, err
}
if len(cfg.Endpoints) == 0 {
return nil, nil, nil
}
eps := cfg.Endpoints
clusterClient, err := clientv3.New(cfg)
if err != nil {
return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err)
}
singleClients := make([]*clientv3.Client, len(eps))
for i, ep := range eps {
cfg.Endpoints = []string{ep}
singleClients[i], err = clientv3.New(cfg)
if err != nil {
return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err)
}
}
return clusterClient, singleClients, nil
}

func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) {
ep := v1.Endpoints{}
err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
if client.IgnoreNotFound(err) != nil {
return clientv3.Config{}, err
}
if err != nil {
return clientv3.Config{Endpoints: []string{}}, nil
}

names := map[string]struct{}{}
urls := make([]string, 0, 8)
for _, v := range ep.Subsets {
for _, addr := range v.Addresses {
names[addr.Hostname] = struct{}{}
}
for _, addr := range v.NotReadyAddresses {
names[addr.Hostname] = struct{}{}
}
}
for name := range names {
urls = append(urls, fmt.Sprintf("%s:%s", name, "2379"))
}

return clientv3.Config{Endpoints: urls}, nil
}
80 changes: 80 additions & 0 deletions internal/controller/observables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package controller

import (
"context"
"sync"

clientv3 "go.etcd.io/etcd/client/v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

// etcdStatus holds the details of the status that an etcd endpoint
// can return about itself, i.e. its own status and its perceived
// member list
type etcdStatus struct {
endpointStatus *clientv3.StatusResponse
endpointStatusError error
memberList *clientv3.MemberListResponse
memberListError error
}

// observables stores observations that the operator can make about
// states of objects in kubernetes
type observables struct {
statefulSet appsv1.StatefulSet
stsExists bool
endpointsFound bool
etcdStatuses []etcdStatus
clusterID uint64
_ int
_ []corev1.PersistentVolumeClaim
}

// setClusterID populates the clusterID field based on etcdStatuses
func (o *observables) setClusterID() {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].endpointStatus != nil {
o.clusterID = o.etcdStatuses[i].endpointStatus.Header.ClusterId
return
}
}
}

// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses.
// If more than one unique ID is reported, cluster is in splitbrain.
func (o *observables) inSplitbrain() bool {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].endpointStatus != nil {
if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId {
return true
}
}
}
return false
}

// fill takes a single-endpoint client and populates the fields of etcdStatus
// with the endpoint's status and its perceived member list.
func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0])
}()
s.memberList, s.memberListError = c.MemberList(ctx)
wg.Wait()
}

// TODO: make a real function
func (o *observables) _() int {
if o.etcdStatuses != nil {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].memberList != nil {
return len(o.etcdStatuses[i].memberList.Members)
}
}
}
return 0
}
Loading