Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: sync sidecar #53

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions cmd/sync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package main

import (
"errors"
"flag"
"os"

"github.com/thanos-community/thanos-operator/internal/controller"
controllermetrics "github.com/thanos-community/thanos-operator/internal/pkg/metrics"
"github.com/thanos-community/thanos-operator/internal/sync"

"github.com/prometheus/client_golang/prometheus"
versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}

const name = "configmap-syncer"

var (
metricsAddr string
probeAddr string

configMapName string
configMapKey string
pathToWrite string
)

func main() {
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to")
flag.StringVar(&configMapName, "name", "", "The name of the ConfigMap to watch")
flag.StringVar(&configMapKey, "key", "", "The ConfigMap key to read")
flag.StringVar(&pathToWrite, "path", "/", "The path to write to on disk")

opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

if configMapName == "" || configMapKey == "" {
setupLog.Error(errors.New("name and key of the ConfigMap are required"), "could not create manager")
os.Exit(1)
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: probeAddr,
Metrics: metricsserver.Options{BindAddress: metricsAddr},
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&corev1.ConfigMap{}: {
Field: fields.OneTermEqualSelector("metadata.name", configMapName),
},
},
},
})
if err != nil {
setupLog.Error(err, "could not create manager")
os.Exit(1)
}

ctrlmetrics.Registry.MustRegister(
versioncollector.NewCollector(name),
)

prometheus.DefaultRegisterer = ctrlmetrics.Registry
baseMetrics := controllermetrics.NewBaseMetrics(ctrlmetrics.Registry)

conf := sync.ConfigMapSyncerOptions{
ConfigMapOptions: sync.ConfigMapOptions{
Name: configMapName,
Key: configMapKey,
Path: pathToWrite,
},
InstrumentationConfig: controller.InstrumentationConfig{
Logger: ctrl.Log.WithName(name),
MetricsRegistry: ctrlmetrics.Registry,
BaseMetrics: baseMetrics,
},
}

if err = sync.NewController(
conf,
mgr.GetClient(),
mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", name)
os.Exit(1)
}

if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "could not start manager")
os.Exit(1)
}
}
20 changes: 20 additions & 0 deletions internal/pkg/metrics/controller_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type ThanosCompactMetrics struct {
*BaseMetrics
}

type ConfigMapSyncerMetrics struct {
*BaseMetrics
LastWriteSuccessTime *prometheus.GaugeVec
ConfigMapHash *prometheus.GaugeVec
}

func NewBaseMetrics(reg prometheus.Registerer) *BaseMetrics {
return &BaseMetrics{
ReconciliationsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -122,3 +128,17 @@ func NewThanosCompactMetrics(reg prometheus.Registerer, baseMetrics *BaseMetrics
BaseMetrics: baseMetrics,
}
}

func NewConfigMapSyncerMetrics(reg prometheus.Registerer, baseMetrics *BaseMetrics) ConfigMapSyncerMetrics {
return ConfigMapSyncerMetrics{
BaseMetrics: baseMetrics,
LastWriteSuccessTime: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_configmap_syncer_last_write_success_timestamp_seconds",
Help: "Unix timestamp of the last successful write to disk",
}, []string{"resource", "namespace"}),
ConfigMapHash: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_configmap_syncer_configmap_hash",
Help: "Hash of the last created or updated configmap",
}, []string{"resource", "namespace"}),
}
}
117 changes: 117 additions & 0 deletions internal/sync/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package sync

import (
"context"
"crypto/md5"
"encoding/binary"
"os"

"github.com/go-logr/logr"
"github.com/thanos-community/thanos-operator/internal/controller"
controllermetrics "github.com/thanos-community/thanos-operator/internal/pkg/metrics"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// Controller watches for ConfigMaps and syncs their contents to disk.
// This controller is intended to sit as a sidecar to a workload that requires near real-time updates to a file on disk.
type Controller struct {
client.Client
Scheme *runtime.Scheme

logger logr.Logger
metrics controllermetrics.ConfigMapSyncerMetrics
conf ConfigMapOptions
}

type ConfigMapSyncerOptions struct {
ConfigMapOptions ConfigMapOptions
InstrumentationConfig controller.InstrumentationConfig
}

// ConfigMapOptions defines the configuration for a Controller.
type ConfigMapOptions struct {
// Name of the ConfigMap to watch.
Name string
// Key of the ConfigMap to watch.
Key string
// Path to write the ConfigMap contents to.
Path string
}

// NewController returns a Controller for a single ConfigMap and key.
func NewController(conf ConfigMapSyncerOptions, client client.Client, scheme *runtime.Scheme) *Controller {
return &Controller{
Client: client,
Scheme: scheme,
conf: conf.ConfigMapOptions,
logger: conf.InstrumentationConfig.Logger,
metrics: controllermetrics.NewConfigMapSyncerMetrics(conf.InstrumentationConfig.MetricsRegistry, conf.InstrumentationConfig.BaseMetrics),
}
}

// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch

// Reconcile reads that state of the cluster for a ConfigMap object and syncs it to disk.
func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.metrics.ReconciliationsTotal.WithLabelValues("configmap-sync-controller").Inc()

if r.conf.Name != req.NamespacedName.Name {
r.logger.Info("ignoring ConfigMap", "name", req.NamespacedName.Name)
return ctrl.Result{}, nil
}

var cm corev1.ConfigMap
if err := r.Get(ctx, req.NamespacedName, &cm); err != nil {
if apierrors.IsNotFound(err) {
// we'll ignore not-found errors, since they can't be fixed by an immediate requeue
return ctrl.Result{}, nil
}
r.logger.Error(err, "unable to fetch ConfigMap")
r.metrics.ReconciliationsFailedTotal.WithLabelValues("configmap-sync-controller").Inc()
return ctrl.Result{}, err
}

if cm.Data == nil || cm.Data[r.conf.Key] == "" {
r.logger.Info("ConfigMap has no data or missing key, skipping")
return ctrl.Result{}, nil
}

data := []byte(cm.Data[r.conf.Key])
if err := os.WriteFile(r.conf.Path, data, 0644); err != nil {
r.logger.Error(err, "failed to write file")
r.metrics.ReconciliationsFailedTotal.WithLabelValues("configmap-sync-controller").Inc()
return ctrl.Result{}, err
}

r.metrics.LastWriteSuccessTime.WithLabelValues(cm.GetName(), cm.GetNamespace()).SetToCurrentTime()
r.metrics.ConfigMapHash.WithLabelValues(cm.GetName(), cm.GetNamespace()).Set(hashAsMetricValue(data))

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *Controller) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.ConfigMap{}, builder.WithPredicates(
predicate.GenerationChangedPredicate{},
)).
Complete(r)
}

// hashAsMetricValue generates metric value from hash of data.
func hashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
bytes := make([]byte, 8)
copy(bytes, smallSum)

return float64(binary.LittleEndian.Uint64(bytes))
}
Loading
Loading