Skip to content

Commit

Permalink
Refactor local-session-watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
zoetrope committed Sep 17, 2024
1 parent 754375c commit 8ba835e
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 317 deletions.
27 changes: 18 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ login-protector targets only StatefulSets. The StatefulSet should be configured
2. Add the sidecar container `ghcr.io/cybozu-go/local-session-tracker` and specify `shareProcessNamespace: true`.
3. Set the `updateStrategy` to `type: OnDelete`.


Example manifest:

```yaml
Expand Down Expand Up @@ -88,8 +87,6 @@ Annotations can be used to modify the behavior of login-protector for the target
- `login-protector.cybozu.io/tracker-name`: Specify the name of the local-session-tracker sidecar container. Default is "local-session-tracker".
- `login-protector.cybozu.io/tracker-port`: Specify the port of the local-session-tracker sidecar container. Default is "8080".
- `login-protector.cybozu.io/no-pdb`: Set to "true" to prevent the creation of a PodDisruptionBudget.


```yaml
apiVersion: apps/v1
Expand All @@ -101,7 +98,6 @@ metadata:
annotations:
login-protector.cybozu.io/tracker-name: sidecar
login-protector.cybozu.io/tracker-port: "9090"
login-protector.cybozu.io/no-pdb: "true"
spec:
replicas: 1
selector:
Expand Down Expand Up @@ -129,6 +125,24 @@ spec:
type: OnDelete
```

The following annotation can be used to disable the creation of a PodDisruptionBudget for the target Pod:

- `login-protector.cybozu.io/no-pdb`: Set to "true" to prevent the creation of a PodDisruptionBudget.

If you want to force a Pod reboot, you can add the annotation to the target Pod:

```console
$ kubectl annotate pod target-sts-0 login-protector.cybozu.io/no-pdb=true
```

## Metrics

login-protector provides the following metrics:

- `login_protector_pod_pending_updates`: The number of Pods that have pending updates.
- `login_protector_pod_protecting`: The number of Pods that are being protected.
- `login_protector_watcher_errors_total`: The number of errors that occurred in the Pod watcher.

## Development

Install Golang, Docker, Make, and [aqua](https://aquaproj.github.io/docs/install) beforehand.
Expand Down Expand Up @@ -227,11 +241,6 @@ exit

The container image of the test Pod should be updated because it is no longer logged in.


## Release Process

T.B.D.

## License

Apache License 2.0
Expand Down
3 changes: 0 additions & 3 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ CMD ["/local-session-tracker"]
# Generate manifests
local_resource('make manifests', "make manifests", deps=["api", "controllers", "hooks"], ignore=['*/*/zz_generated.deepcopy.go'])

# Don't watch generated files
watch_settings(ignore=['config/rbac/role.yaml'])

# Deploy login-protector
watch_file('./config/')
k8s_yaml(kustomize('./config/dev'))
Expand Down
18 changes: 9 additions & 9 deletions cmd/login-protector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"

"github.com/cybozu-go/login-protector/internal/controller"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -109,26 +109,26 @@ func main() {
os.Exit(1)
}

setupLog.Info("creating tty watcher")
ch := make(chan event.TypedGenericEvent[*appsv1.StatefulSet])
watcher := controller.NewTTYWatcher(
setupLog.Info("creating local session watcher")
ch := make(chan event.TypedGenericEvent[*corev1.Pod])
watcher := controller.NewLocalSessionWatcher(
mgr.GetClient(),
mgr.GetLogger().WithName("TTYWatcher"),
mgr.GetLogger().WithName("LocalSessionWatcher"),
ttyCheckInterval,
ch,
)
err = mgr.Add(watcher)
if err != nil {
setupLog.Error(err, "unable to add runnable", "controller", "PDB")
setupLog.Error(err, "unable to add runnable", "runnable", "LocalSessionWatcher")
os.Exit(1)
}

setupLog.Info("creating pdb controller")
if err = (&controller.PDBReconciler{
setupLog.Info("creating pod controller")
if err = (&controller.PodReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(ctx, mgr, ch); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PDB")
setupLog.Error(err, "unable to create controller", "controller", "Pod")
os.Exit(1)
}

Expand Down
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rules:
verbs:
- get
- list
- update
- watch
- apiGroups:
- ""
Expand Down
4 changes: 4 additions & 0 deletions internal/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ const LabelKeyLoginProtectorProtect = "login-protector.cybozu.io/protect"
const AnnotationKeyNoPDB = "login-protector.cybozu.io/no-pdb"
const AnnotationKeyTrackerName = "login-protector.cybozu.io/tracker-name"
const AnnotationKeyTrackerPort = "login-protector.cybozu.io/tracker-port"
const AnnotationLoggedIn = "login-protector.cybozu.io/logged-in"

const DefaultTrackerName = "local-session-tracker"
const DefaultTrackerPort = "8080"
const ValueTrue = "true"
const ValueFalse = "false"
const KindStatefulSet = "StatefulSet"
const KindPod = "Pod"
167 changes: 167 additions & 0 deletions internal/controller/local_session_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package controller

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/cybozu-go/login-protector/internal/common"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)

type LocalSessionWatcher struct {
client client.Client
logger logr.Logger
interval time.Duration
channel chan<- event.TypedGenericEvent[*corev1.Pod]
}

func NewLocalSessionWatcher(client client.Client, logger logr.Logger, interval time.Duration, ch chan<- event.TypedGenericEvent[*corev1.Pod]) *LocalSessionWatcher {
return &LocalSessionWatcher{
client: client,
logger: logger,
interval: interval,
channel: ch,
}
}

func (w *LocalSessionWatcher) Start(ctx context.Context) error {
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
watcherErrorsCounter.WithLabelValues("local-session-watcher").Add(0)

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
err := w.poll(ctx)
if err != nil {
w.logger.Error(err, "failed to poll")
watcherErrorsCounter.WithLabelValues("local-session-watcher").Inc()
}
}
}
}

func (w *LocalSessionWatcher) poll(ctx context.Context) error {
var stsList appsv1.StatefulSetList
err := w.client.List(ctx, &stsList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{common.LabelKeyLoginProtectorProtect: common.ValueTrue}),
})
if err != nil {
w.logger.Error(err, "failed to list StatefulSets")
return err
}

errList := make([]error, 0)
// Get all pods that belong to the StatefulSets
for _, sts := range stsList.Items {
trackerName := common.DefaultTrackerName
if name, ok := sts.Annotations[common.AnnotationKeyTrackerName]; ok {
trackerName = name
}
trackerPort := common.DefaultTrackerPort
if port, ok := sts.Annotations[common.AnnotationKeyTrackerPort]; ok {
trackerPort = port
}

var podList corev1.PodList
err = w.client.List(ctx, &podList, client.InNamespace(sts.Namespace), client.MatchingLabels(sts.Spec.Selector.MatchLabels))
if err != nil {
errList = append(errList, err)
continue
}

for _, pod := range podList.Items {
err = w.notify(ctx, pod, trackerName, trackerPort)
if err != nil {
errList = append(errList, err)
}
}
}
if len(errList) > 0 {
return errors.Join(errList...)
}
return nil
}

// notify notifies pod-controller that the login status has changed
func (w *LocalSessionWatcher) notify(ctx context.Context, pod corev1.Pod, trackerName, trackerPort string) error {
podIP := pod.Status.PodIP

var container *corev1.Container
for _, c := range pod.Spec.Containers {
c := c
if c.Name == trackerName {
container = &c
break
}
}
if container == nil {
err := fmt.Errorf("failed to find sidecar container (Name: %s)", trackerName)
return err
}

resp, err := http.Get(fmt.Sprintf("http://%s:%s/status", podIP, trackerPort))
if err != nil {
return err
}
defer resp.Body.Close() // nolint:errcheck

status := common.TTYStatus{}
statusBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
err = json.Unmarshal(statusBytes, &status)
if err != nil {
return err
}
if status.Total < 0 {
err = errors.New("broken status")
return err
}

if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}

if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
currentLoggedIn := pod.Annotations[common.AnnotationLoggedIn]

if status.Total == 0 {
pod.Annotations[common.AnnotationLoggedIn] = common.ValueFalse
} else {
pod.Annotations[common.AnnotationLoggedIn] = common.ValueTrue
}

if currentLoggedIn == pod.Annotations[common.AnnotationLoggedIn] {
return nil
}

w.logger.Info("notify", "namespace", pod.Namespace, "pod", pod.Name, "current", currentLoggedIn, "new", pod.Annotations[common.AnnotationLoggedIn])

err = w.client.Update(ctx, &pod)
if err != nil {
return err
}

ev := event.TypedGenericEvent[*corev1.Pod]{
Object: pod.DeepCopy(),
}
w.channel <- ev

return nil
}
11 changes: 11 additions & 0 deletions internal/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ var (
metricsNamespace+"_pod_protecting",
"Describes whether the pod is being protected.",
[]string{"pod", "namespace"}, nil)

watcherErrorsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "watcher_errors_total",
Help: "Number of errors occurred in watchers.",
},
[]string{"watcher"},
)
)

type metricsCollector struct {
Expand All @@ -38,6 +47,7 @@ type metricsCollector struct {
func (c *metricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- pendingUpdatesDesc
ch <- protectingPodsDesc
watcherErrorsCounter.Describe(ch)
}

func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) {
Expand Down Expand Up @@ -89,6 +99,7 @@ func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) {
)
}
}
watcherErrorsCounter.Collect(ch)
}

func SetupMetrics(ctx context.Context, c client.Client, logger logr.Logger) error {
Expand Down
Loading

0 comments on commit 8ba835e

Please sign in to comment.