diff --git a/README.md b/README.md index afd2040..586ec40 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,6 @@ login-protector targets only StatefulSets. The StatefulSet should be configured 2. Add the sidecar container `ghcr.io/cybozu-go/local-session-tracker` and specify `shareProcessNamespace: true`. 3. Set the `updateStrategy` to `type: OnDelete`. - Example manifest: ```yaml @@ -88,8 +87,6 @@ Annotations can be used to modify the behavior of login-protector for the target - `login-protector.cybozu.io/tracker-name`: Specify the name of the local-session-tracker sidecar container. Default is "local-session-tracker". - `login-protector.cybozu.io/tracker-port`: Specify the port of the local-session-tracker sidecar container. Default is "8080". -- `login-protector.cybozu.io/no-pdb`: Set to "true" to prevent the creation of a PodDisruptionBudget. - ```yaml apiVersion: apps/v1 @@ -101,7 +98,6 @@ metadata: annotations: login-protector.cybozu.io/tracker-name: sidecar login-protector.cybozu.io/tracker-port: "9090" - login-protector.cybozu.io/no-pdb: "true" spec: replicas: 1 selector: @@ -129,6 +125,24 @@ spec: type: OnDelete ``` +The following annotation can be used to disable the creation of a PodDisruptionBudget for the target Pod: + +- `login-protector.cybozu.io/no-pdb`: Set to "true" to prevent the creation of a PodDisruptionBudget. + +If you want to force a Pod reboot, you can add the annotation to the target Pod: + +```console +$ kubectl annotate pod target-sts-0 login-protector.cybozu.io/no-pdb=true +``` + +## Metrics + +login-protector provides the following metrics: + +- `login_protector_pod_pending_updates`: The number of Pods that have pending updates. +- `login_protector_pod_protecting`: The number of Pods that are being protected. +- `login_protector_watcher_errors_total`: The number of errors that occurred in the Pod watcher. + ## Development Install Golang, Docker, Make, and [aqua](https://aquaproj.github.io/docs/install) beforehand. @@ -227,11 +241,6 @@ exit The container image of the test Pod should be updated because it is no longer logged in. - -## Release Process - -T.B.D. - ## License Apache License 2.0 diff --git a/Tiltfile b/Tiltfile index ce2a1b2..5b72266 100644 --- a/Tiltfile +++ b/Tiltfile @@ -15,9 +15,6 @@ CMD ["/local-session-tracker"] # Generate manifests local_resource('make manifests', "make manifests", deps=["api", "controllers", "hooks"], ignore=['*/*/zz_generated.deepcopy.go']) -# Don't watch generated files -watch_settings(ignore=['config/rbac/role.yaml']) - # Deploy login-protector watch_file('./config/') k8s_yaml(kustomize('./config/dev')) diff --git a/cmd/login-protector/main.go b/cmd/login-protector/main.go index 3f276de..46c4c75 100644 --- a/cmd/login-protector/main.go +++ b/cmd/login-protector/main.go @@ -11,7 +11,7 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth" "github.com/cybozu-go/login-protector/internal/controller" - appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" @@ -109,26 +109,26 @@ func main() { os.Exit(1) } - setupLog.Info("creating tty watcher") - ch := make(chan event.TypedGenericEvent[*appsv1.StatefulSet]) - watcher := controller.NewTTYWatcher( + setupLog.Info("creating local session watcher") + ch := make(chan event.TypedGenericEvent[*corev1.Pod]) + watcher := controller.NewLocalSessionWatcher( mgr.GetClient(), - mgr.GetLogger().WithName("TTYWatcher"), + mgr.GetLogger().WithName("LocalSessionWatcher"), ttyCheckInterval, ch, ) err = mgr.Add(watcher) if err != nil { - setupLog.Error(err, "unable to add runnable", "controller", "PDB") + setupLog.Error(err, "unable to add runnable", "runnable", "LocalSessionWatcher") os.Exit(1) } - setupLog.Info("creating pdb controller") - if err = (&controller.PDBReconciler{ + setupLog.Info("creating pod controller") + if err = (&controller.PodReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), }).SetupWithManager(ctx, mgr, ch); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "PDB") + setupLog.Error(err, "unable to create controller", "controller", "Pod") os.Exit(1) } diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 2117541..cbd14ce 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -11,6 +11,7 @@ rules: verbs: - get - list + - update - watch - apiGroups: - "" diff --git a/internal/common/constants.go b/internal/common/constants.go index 81941b8..7d1181a 100644 --- a/internal/common/constants.go +++ b/internal/common/constants.go @@ -4,7 +4,11 @@ const LabelKeyLoginProtectorProtect = "login-protector.cybozu.io/protect" const AnnotationKeyNoPDB = "login-protector.cybozu.io/no-pdb" const AnnotationKeyTrackerName = "login-protector.cybozu.io/tracker-name" const AnnotationKeyTrackerPort = "login-protector.cybozu.io/tracker-port" +const AnnotationLoggedIn = "login-protector.cybozu.io/logged-in" +const DefaultTrackerName = "local-session-tracker" +const DefaultTrackerPort = "8080" const ValueTrue = "true" +const ValueFalse = "false" const KindStatefulSet = "StatefulSet" const KindPod = "Pod" diff --git a/internal/controller/local_session_watcher.go b/internal/controller/local_session_watcher.go new file mode 100644 index 0000000..ded51b0 --- /dev/null +++ b/internal/controller/local_session_watcher.go @@ -0,0 +1,167 @@ +package controller + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/cybozu-go/login-protector/internal/common" + "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type LocalSessionWatcher struct { + client client.Client + logger logr.Logger + interval time.Duration + channel chan<- event.TypedGenericEvent[*corev1.Pod] +} + +func NewLocalSessionWatcher(client client.Client, logger logr.Logger, interval time.Duration, ch chan<- event.TypedGenericEvent[*corev1.Pod]) *LocalSessionWatcher { + return &LocalSessionWatcher{ + client: client, + logger: logger, + interval: interval, + channel: ch, + } +} + +func (w *LocalSessionWatcher) Start(ctx context.Context) error { + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + watcherErrorsCounter.WithLabelValues("local-session-watcher").Add(0) + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + err := w.poll(ctx) + if err != nil { + w.logger.Error(err, "failed to poll") + watcherErrorsCounter.WithLabelValues("local-session-watcher").Inc() + } + } + } +} + +func (w *LocalSessionWatcher) poll(ctx context.Context) error { + var stsList appsv1.StatefulSetList + err := w.client.List(ctx, &stsList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{common.LabelKeyLoginProtectorProtect: common.ValueTrue}), + }) + if err != nil { + w.logger.Error(err, "failed to list StatefulSets") + return err + } + + errList := make([]error, 0) + // Get all pods that belong to the StatefulSets + for _, sts := range stsList.Items { + trackerName := common.DefaultTrackerName + if name, ok := sts.Annotations[common.AnnotationKeyTrackerName]; ok { + trackerName = name + } + trackerPort := common.DefaultTrackerPort + if port, ok := sts.Annotations[common.AnnotationKeyTrackerPort]; ok { + trackerPort = port + } + + var podList corev1.PodList + err = w.client.List(ctx, &podList, client.InNamespace(sts.Namespace), client.MatchingLabels(sts.Spec.Selector.MatchLabels)) + if err != nil { + errList = append(errList, err) + continue + } + + for _, pod := range podList.Items { + err = w.notify(ctx, pod, trackerName, trackerPort) + if err != nil { + errList = append(errList, err) + } + } + } + if len(errList) > 0 { + return errors.Join(errList...) + } + return nil +} + +// notify notifies pod-controller that the login status has changed +func (w *LocalSessionWatcher) notify(ctx context.Context, pod corev1.Pod, trackerName, trackerPort string) error { + podIP := pod.Status.PodIP + + var container *corev1.Container + for _, c := range pod.Spec.Containers { + c := c + if c.Name == trackerName { + container = &c + break + } + } + if container == nil { + err := fmt.Errorf("failed to find sidecar container (Name: %s)", trackerName) + return err + } + + resp, err := http.Get(fmt.Sprintf("http://%s:%s/status", podIP, trackerPort)) + if err != nil { + return err + } + defer resp.Body.Close() // nolint:errcheck + + status := common.TTYStatus{} + statusBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + err = json.Unmarshal(statusBytes, &status) + if err != nil { + return err + } + if status.Total < 0 { + err = errors.New("broken status") + return err + } + + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + currentLoggedIn := pod.Annotations[common.AnnotationLoggedIn] + + if status.Total == 0 { + pod.Annotations[common.AnnotationLoggedIn] = common.ValueFalse + } else { + pod.Annotations[common.AnnotationLoggedIn] = common.ValueTrue + } + + if currentLoggedIn == pod.Annotations[common.AnnotationLoggedIn] { + return nil + } + + w.logger.Info("notify", "namespace", pod.Namespace, "pod", pod.Name, "current", currentLoggedIn, "new", pod.Annotations[common.AnnotationLoggedIn]) + + err = w.client.Update(ctx, &pod) + if err != nil { + return err + } + + ev := event.TypedGenericEvent[*corev1.Pod]{ + Object: pod.DeepCopy(), + } + w.channel <- ev + + return nil +} diff --git a/internal/controller/metrics.go b/internal/controller/metrics.go index 362d245..359a62f 100644 --- a/internal/controller/metrics.go +++ b/internal/controller/metrics.go @@ -27,6 +27,15 @@ var ( metricsNamespace+"_pod_protecting", "Describes whether the pod is being protected.", []string{"pod", "namespace"}, nil) + + watcherErrorsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "watcher_errors_total", + Help: "Number of errors occurred in watchers.", + }, + []string{"watcher"}, + ) ) type metricsCollector struct { @@ -38,6 +47,7 @@ type metricsCollector struct { func (c *metricsCollector) Describe(ch chan<- *prometheus.Desc) { ch <- pendingUpdatesDesc ch <- protectingPodsDesc + watcherErrorsCounter.Describe(ch) } func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) { @@ -89,6 +99,7 @@ func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) { ) } } + watcherErrorsCounter.Collect(ch) } func SetupMetrics(ctx context.Context, c client.Client, logger logr.Logger) error { diff --git a/internal/controller/pdb_controller.go b/internal/controller/pdb_controller.go deleted file mode 100644 index 33baf6d..0000000 --- a/internal/controller/pdb_controller.go +++ /dev/null @@ -1,210 +0,0 @@ -package controller - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - - "github.com/cybozu-go/login-protector/internal/common" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/intstr" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/source" -) - -type PDBReconciler struct { - Client client.Client - Scheme *runtime.Scheme -} - -//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch -//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch -//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=create;get;list;watch;delete - -func (r *PDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) - - sts := &appsv1.StatefulSet{} - if err := r.Client.Get(ctx, req.NamespacedName, sts); err != nil { - logger.Error(err, "failed to get StatefulSet") - return ctrl.Result{}, client.IgnoreNotFound(err) - } - if sts.DeletionTimestamp != nil { - logger.Info("the statefulset is being deleted") - return ctrl.Result{}, nil - } - - var podList corev1.PodList - err := r.Client.List(ctx, &podList, client.InNamespace(sts.Namespace), client.MatchingLabels(sts.Spec.Selector.MatchLabels)) - if err != nil { - return ctrl.Result{}, err - } - - trackerName := "local-session-tracker" - if name, ok := sts.Annotations[common.AnnotationKeyTrackerName]; ok { - trackerName = name - } - trackerPort := "8080" - if port, ok := sts.Annotations[common.AnnotationKeyTrackerPort]; ok { - trackerPort = port - } - noPDB := false - if noPDBStr, ok := sts.Annotations[common.AnnotationKeyNoPDB]; ok { - noPDB = noPDBStr == common.ValueTrue - } - - errorList := make([]error, 0) - for _, pod := range podList.Items { - err = r.reconcilePDB(ctx, &pod, trackerName, trackerPort, noPDB) - if err != nil { - errorList = append(errorList, err) - } - } - - if len(errorList) > 0 { - return ctrl.Result{}, fmt.Errorf("failed to reconcile PDB: %v", errorList) - } - - return ctrl.Result{}, nil -} - -func (r *PDBReconciler) reconcilePDB(ctx context.Context, pod *corev1.Pod, trackerName string, trackerPort string, noPDB bool) error { - logger := log.FromContext(ctx) - - if pod.DeletionTimestamp != nil { - logger.Info("the Pod is about to be deleted. skipping.") - return nil - } - - if val, ok := pod.Annotations[common.AnnotationKeyNoPDB]; ok { - noPDB = noPDB || (val == common.ValueTrue) - } - if noPDB { - pdb := &policyv1.PodDisruptionBudget{} - err := r.Client.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}, pdb) - if err != nil { - if !k8serrors.IsNotFound(err) { - return err - } - } else { - logger.Info("delete PDB, because pod has no PDB annotation", "pdb", pdb, "pod", pod.Name, "namespace", pod.Namespace) - err = r.Client.Delete(ctx, pdb) - if err != nil { - return err - } - } - return nil - } - - podIP := pod.Status.PodIP - - var container *corev1.Container - for _, c := range pod.Spec.Containers { - c := c - if c.Name == trackerName { - container = &c - break - } - } - if container == nil { - err := fmt.Errorf("failed to find sidecar container (Name: %s)", trackerName) - return err - } - - resp, err := http.Get(fmt.Sprintf("http://%s:%s/status", podIP, trackerPort)) - if err != nil { - return err - } - defer resp.Body.Close() // nolint:errcheck - - status := common.TTYStatus{} - statusBytes, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - err = json.Unmarshal(statusBytes, &status) - if err != nil { - return err - } - if status.Total < 0 { - err = errors.New("broken status") - return err - } - - foundPdb := false - pdb := &policyv1.PodDisruptionBudget{} - err = r.Client.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}, pdb) - if err != nil { - if !k8serrors.IsNotFound(err) { - return err - } - } else { - foundPdb = true - } - - if status.Total == 0 { - // no controlling terminals are observed. delete PDB. - if !foundPdb { - return nil - } - - logger.Info("delete PDB", "pdb", pdb, "pod", pod.Name, "namespace", pod.Namespace) - err = r.Client.Delete(ctx, pdb) - if err != nil { - return err - } - } else { - // some controlling terminals are observed. create PDB. - if foundPdb { - return nil - } - - zeroIntstr := intstr.FromInt32(0) - pdb = &policyv1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - }, - Spec: policyv1.PodDisruptionBudgetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: pod.Labels, - }, - MaxUnavailable: &zeroIntstr, - }, - } - err = ctrl.SetControllerReference(pod, pdb, r.Scheme) - if err != nil { - logger.Error(err, "failed to set controller reference") - return err - } - logger.Info("crate a new PDB", "pdb", pdb, "pod", pod.Name, "namespace", pod.Namespace) - err = r.Client.Create(ctx, pdb) - if err != nil { - return err - } - } - return nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *PDBReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, ch chan event.TypedGenericEvent[*appsv1.StatefulSet]) error { - return ctrl.NewControllerManagedBy(mgr). - For(&appsv1.StatefulSet{}, builder.WithPredicates(selectTargetStatefulSetPredicate())). - Owns(&corev1.Pod{}, builder.WithPredicates(selectTargetPodPredicate(ctx, mgr.GetClient()))). - Owns(&policyv1.PodDisruptionBudget{}, builder.WithPredicates(selectTargetPDBPredicate(ctx, mgr.GetClient()))). - WatchesRawSource(source.Channel(ch, &handler.TypedEnqueueRequestForObject[*appsv1.StatefulSet]{})). - Complete(r) -} diff --git a/internal/controller/pod_controller.go b/internal/controller/pod_controller.go new file mode 100644 index 0000000..6a344c6 --- /dev/null +++ b/internal/controller/pod_controller.go @@ -0,0 +1,141 @@ +package controller + +import ( + "context" + + "github.com/cybozu-go/login-protector/internal/common" + corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type PodReconciler struct { + Client client.Client + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;update;watch +//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=create;get;list;watch;delete + +func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + pod := &corev1.Pod{} + if err := r.Client.Get(ctx, req.NamespacedName, pod); err != nil { + logger.Error(err, "failed to get Pod") + return ctrl.Result{}, client.IgnoreNotFound(err) + } + if pod.DeletionTimestamp != nil { + logger.Info("the pod is being deleted") + return ctrl.Result{}, nil + } + + err := r.reconcilePDB(ctx, pod) + if err != nil { + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +func (r *PodReconciler) reconcilePDB(ctx context.Context, pod *corev1.Pod) error { + logger := log.FromContext(ctx) + + noPDB := false + if val, ok := pod.Annotations[common.AnnotationKeyNoPDB]; ok { + noPDB = val == common.ValueTrue + } + if noPDB { + pdb := &policyv1.PodDisruptionBudget{} + err := r.Client.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}, pdb) + if err != nil { + if !k8serrors.IsNotFound(err) { + return err + } + } else { + logger.Info("delete PDB, because pod has no PDB annotation", "pdb", pdb, "pod", pod.Name, "namespace", pod.Namespace) + err = r.Client.Delete(ctx, pdb) + if err != nil { + return err + } + } + return nil + } + + foundPdb := false + pdb := &policyv1.PodDisruptionBudget{} + err := r.Client.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}, pdb) + if err != nil { + if !k8serrors.IsNotFound(err) { + return err + } + } else { + foundPdb = true + } + + loggedIn := pod.Annotations[common.AnnotationLoggedIn] == common.ValueTrue + logger.Info("reconcile PDB", "pod", pod.Name, "namespace", pod.Namespace, "loggedIn", loggedIn, "foundPdb", foundPdb) + + if !loggedIn { + // no controlling terminals are observed. delete PDB. + if !foundPdb { + return nil + } + + logger.Info("delete PDB", "pdb", pdb, "pod", pod.Name, "namespace", pod.Namespace) + err = r.Client.Delete(ctx, pdb) + if err != nil { + return err + } + return nil + } + + // some controlling terminals are observed. create PDB. + if foundPdb { + return nil + } + + zeroIntstr := intstr.FromInt32(0) + pdb = &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: pod.Labels, + }, + MaxUnavailable: &zeroIntstr, + }, + } + err = ctrl.SetControllerReference(pod, pdb, r.Scheme) + if err != nil { + logger.Error(err, "failed to set controller reference") + return err + } + logger.Info("crate a new PDB", "pdb", pdb, "pod", pod.Name, "namespace", pod.Namespace) + err = r.Client.Create(ctx, pdb) + if err != nil { + return err + } + return nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *PodReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, ch chan event.TypedGenericEvent[*corev1.Pod]) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}, builder.WithPredicates(selectTargetPodPredicate(ctx, mgr.GetClient()))). + Owns(&policyv1.PodDisruptionBudget{}, builder.WithPredicates(selectTargetPDBPredicate(ctx, mgr.GetClient()))). + WatchesRawSource(source.Channel(ch, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})). + Complete(r) +} diff --git a/internal/controller/tty_watcher.go b/internal/controller/tty_watcher.go deleted file mode 100644 index 90e0179..0000000 --- a/internal/controller/tty_watcher.go +++ /dev/null @@ -1,61 +0,0 @@ -package controller - -import ( - "context" - "time" - - "github.com/cybozu-go/login-protector/internal/common" - "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" - "k8s.io/apimachinery/pkg/labels" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" -) - -type TTYWatcher struct { - client client.Client - logger logr.Logger - interval time.Duration - channel chan<- event.TypedGenericEvent[*appsv1.StatefulSet] -} - -func NewTTYWatcher(client client.Client, logger logr.Logger, interval time.Duration, ch chan<- event.TypedGenericEvent[*appsv1.StatefulSet]) *TTYWatcher { - return &TTYWatcher{ - client: client, - logger: logger, - interval: interval, - channel: ch, - } -} - -func (w *TTYWatcher) Start(ctx context.Context) error { - ticker := time.NewTicker(w.interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - w.pollStatefulSets(ctx) - } - } -} - -func (w *TTYWatcher) pollStatefulSets(ctx context.Context) { - var stsList appsv1.StatefulSetList - err := w.client.List(ctx, &stsList, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{common.LabelKeyLoginProtectorProtect: common.ValueTrue}), - }) - if err != nil { - w.logger.Error(err, "failed to list StatefulSets") - return - } - - // Get all pods that belong to the StatefulSets - for _, sts := range stsList.Items { - w.channel <- event.TypedGenericEvent[*appsv1.StatefulSet]{ - Object: sts.DeepCopy(), - } - } -} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 00a5d2d..6189e0c 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -208,6 +208,7 @@ var _ = Describe("controller", Ordered, func() { "login_protector_pod_protecting{namespace=\"default\",pod=\"target-sts-1\"} 0", "login_protector_pod_pending_updates{namespace=\"default\",pod=\"target-sts-0\"} 0", "login_protector_pod_pending_updates{namespace=\"default\",pod=\"target-sts-1\"} 0", + "login_protector_watcher_errors_total{tracker=\"local-session-tracker\"} 0", )) }).WithTimeout(testInterval).Should(Succeed()) @@ -239,6 +240,7 @@ var _ = Describe("controller", Ordered, func() { "login_protector_pod_protecting{namespace=\"default\",pod=\"target-sts-1\"} 0", "login_protector_pod_pending_updates{namespace=\"default\",pod=\"target-sts-0\"} 0", "login_protector_pod_pending_updates{namespace=\"default\",pod=\"target-sts-1\"} 0", + "login_protector_watcher_errors_total{tracker=\"local-session-tracker\"} 0", )) }).WithTimeout(testInterval).Should(Succeed()) @@ -254,8 +256,9 @@ var _ = Describe("controller", Ordered, func() { "login_protector_pod_protecting{namespace=\"default\",pod=\"target-sts-1\"} 0", "login_protector_pod_pending_updates{namespace=\"default\",pod=\"target-sts-0\"} 1", "login_protector_pod_pending_updates{namespace=\"default\",pod=\"target-sts-1\"} 0", + "login_protector_watcher_errors_total{tracker=\"local-session-tracker\"} 0", )) - }).WithTimeout(testInterval * 2).Should(Succeed()) + }).WithTimeout(3 * time.Minute).Should(Succeed()) }) }) })