Skip to content

Commit

Permalink
NETOBSERV-1358: splitting controllers (#503)
Browse files Browse the repository at this point in the history
* NETOBSERV-1358: splitting controllers

- Start with a new "Monitoring" controllers that deals with
  ServiceMonitors / Dashboards etc
- Create a Status Manager that gathers all statuses from each controller
- Each status becomes eventually converted in a Condition (k8s status
  API), plus there is a global "Ready" status that is a merge of each
component status

* Add filter predicate for reconcile reqs

Also:
- Centralize location of the kubebuilder rbac annotations in manager.go
- Update the displayed column for CLI status with new conditions
- Keep just 1 status per component, not 2 (merged errors & progress into
  a single status)

* Use sync.Map

* rename package flp (because shorter)

* Move FLP reconciler as a new controller

- Use status manager for FLP and sub-components
  (ingest/transfo/monolith)
- Like status, namespace management needs to be per-controller. So I'm
  moving away from having a dedicated field in Status, and use
per-component annotations instead
- Simplify a bit the reconcilers "managed objects" stuff
- Less verbose narrowcache and log context, better use named loggers

* Address review feedback
  • Loading branch information
jotak authored Dec 6, 2023
1 parent f8edd2b commit 79eb0be
Show file tree
Hide file tree
Showing 44 changed files with 1,722 additions and 1,316 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ type FlowCollectorStatus struct {
// +kubebuilder:printcolumn:name="Agent",type="string",JSONPath=`.spec.agent.type`
// +kubebuilder:printcolumn:name="Sampling (EBPF)",type="string",JSONPath=`.spec.agent.ebpf.sampling`
// +kubebuilder:printcolumn:name="Deployment Model",type="string",JSONPath=`.spec.deploymentModel`
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.conditions[*].reason"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=`.status.conditions[?(@.type=="Ready")].reason`

// FlowCollector is the Schema for the flowcollectors API, which pilots and configures netflow collection.
//
Expand Down
2 changes: 1 addition & 1 deletion api/v1beta1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ type FlowCollectorStatus struct {
// +kubebuilder:printcolumn:name="Agent",type="string",JSONPath=`.spec.agent.type`
// +kubebuilder:printcolumn:name="Sampling (EBPF)",type="string",JSONPath=`.spec.agent.ebpf.sampling`
// +kubebuilder:printcolumn:name="Deployment Model",type="string",JSONPath=`.spec.deploymentModel`
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.conditions[*].reason"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=`.status.conditions[?(@.type=="Ready")].reason`
// +kubebuilder:storageversion
// `FlowCollector` is the schema for the network flows collection API, which pilots and configures the underlying deployments.
type FlowCollector struct {
Expand Down
3 changes: 2 additions & 1 deletion api/v1beta2/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ type FlowCollectorStatus struct {
Conditions []metav1.Condition `json:"conditions"`

// Namespace where console plugin and flowlogs-pipeline have been deployed.
// Deprecated: annotations are used instead
Namespace string `json:"namespace,omitempty"`
}

Expand All @@ -945,7 +946,7 @@ type FlowCollectorStatus struct {
// +kubebuilder:printcolumn:name="Agent",type="string",JSONPath=`.spec.agent.type`
// +kubebuilder:printcolumn:name="Sampling (EBPF)",type="string",JSONPath=`.spec.agent.ebpf.sampling`
// +kubebuilder:printcolumn:name="Deployment Model",type="string",JSONPath=`.spec.deploymentModel`
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.conditions[*].reason"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=`.status.conditions[?(@.type=="Ready")].reason`
// `FlowCollector` is the schema for the network flows collection API, which pilots and configures the underlying deployments.
type FlowCollector struct {
metav1.TypeMeta `json:",inline"`
Expand Down
10 changes: 5 additions & 5 deletions bundle/manifests/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
- jsonPath: .spec.deploymentModel
name: Deployment Model
type: string
- jsonPath: .status.conditions[*].reason
- jsonPath: .status.conditions[?(@.type=="Ready")].reason
name: Status
type: string
deprecated: true
Expand Down Expand Up @@ -2460,7 +2460,7 @@ spec:
- jsonPath: .spec.deploymentModel
name: Deployment Model
type: string
- jsonPath: .status.conditions[*].reason
- jsonPath: .status.conditions[?(@.type=="Ready")].reason
name: Status
type: string
name: v1beta1
Expand Down Expand Up @@ -5145,7 +5145,7 @@ spec:
- jsonPath: .spec.deploymentModel
name: Deployment Model
type: string
- jsonPath: .status.conditions[*].reason
- jsonPath: .status.conditions[?(@.type=="Ready")].reason
name: Status
type: string
name: v1beta2
Expand Down Expand Up @@ -8033,8 +8033,8 @@ spec:
type: object
type: array
namespace:
description: Namespace where console plugin and flowlogs-pipeline
have been deployed.
description: 'Namespace where console plugin and flowlogs-pipeline
have been deployed. Deprecated: annotations are used instead'
type: string
required:
- conditions
Expand Down
10 changes: 5 additions & 5 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ spec:
- jsonPath: .spec.deploymentModel
name: Deployment Model
type: string
- jsonPath: .status.conditions[*].reason
- jsonPath: .status.conditions[?(@.type=="Ready")].reason
name: Status
type: string
deprecated: true
Expand Down Expand Up @@ -2446,7 +2446,7 @@ spec:
- jsonPath: .spec.deploymentModel
name: Deployment Model
type: string
- jsonPath: .status.conditions[*].reason
- jsonPath: .status.conditions[?(@.type=="Ready")].reason
name: Status
type: string
name: v1beta1
Expand Down Expand Up @@ -5131,7 +5131,7 @@ spec:
- jsonPath: .spec.deploymentModel
name: Deployment Model
type: string
- jsonPath: .status.conditions[*].reason
- jsonPath: .status.conditions[?(@.type=="Ready")].reason
name: Status
type: string
name: v1beta2
Expand Down Expand Up @@ -8019,8 +8019,8 @@ spec:
type: object
type: array
namespace:
description: Namespace where console plugin and flowlogs-pipeline
have been deployed.
description: 'Namespace where console plugin and flowlogs-pipeline
have been deployed. Deprecated: annotations are used instead'
type: string
required:
- conditions
Expand Down
100 changes: 40 additions & 60 deletions controllers/consoleplugin/consoleplugin_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ type pluginSpec = flowslatest.FlowCollectorConsolePlugin
// CPReconciler reconciles the current console plugin state with the desired configuration
type CPReconciler struct {
*reconcilers.Instance
owned ownedObjects
}

type ownedObjects struct {
deployment *appsv1.Deployment
service *corev1.Service
metricsService *corev1.Service
Expand All @@ -39,28 +35,20 @@ type ownedObjects struct {
serviceMonitor *monitoringv1.ServiceMonitor
}

func NewReconciler(common *reconcilers.Common, imageName string) CPReconciler {
owned := ownedObjects{
deployment: &appsv1.Deployment{},
service: &corev1.Service{},
metricsService: &corev1.Service{},
hpa: &ascv2.HorizontalPodAutoscaler{},
serviceAccount: &corev1.ServiceAccount{},
configMap: &corev1.ConfigMap{},
serviceMonitor: &monitoringv1.ServiceMonitor{},
func NewReconciler(cmn *reconcilers.Instance) CPReconciler {
rec := CPReconciler{
Instance: cmn,
deployment: cmn.Managed.NewDeployment(constants.PluginName),
service: cmn.Managed.NewService(constants.PluginName),
metricsService: cmn.Managed.NewService(metricsSvcName),
hpa: cmn.Managed.NewHPA(constants.PluginName),
serviceAccount: cmn.Managed.NewServiceAccount(constants.PluginName),
configMap: cmn.Managed.NewConfigMap(configMapName),
}
cmnInstance := common.NewInstance(imageName)
cmnInstance.Managed.AddManagedObject(constants.PluginName, owned.deployment)
cmnInstance.Managed.AddManagedObject(constants.PluginName, owned.service)
cmnInstance.Managed.AddManagedObject(metricsSvcName, owned.metricsService)
cmnInstance.Managed.AddManagedObject(constants.PluginName, owned.hpa)
cmnInstance.Managed.AddManagedObject(constants.PluginName, owned.serviceAccount)
cmnInstance.Managed.AddManagedObject(configMapName, owned.configMap)
if common.AvailableAPIs.HasSvcMonitor() {
cmnInstance.Managed.AddManagedObject(constants.PluginName, owned.serviceMonitor)
if cmn.AvailableAPIs.HasSvcMonitor() {
rec.serviceMonitor = cmn.Managed.NewServiceMonitor(constants.PluginName)
}

return CPReconciler{Instance: cmnInstance, owned: owned}
return rec
}

// CleanupNamespace cleans up old namespace
Expand All @@ -70,6 +58,9 @@ func (r *CPReconciler) CleanupNamespace(ctx context.Context) {

// Reconcile is the reconciler entry point to reconcile the current plugin state with the desired configuration
func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCollector) error {
l := log.FromContext(ctx).WithName("console-plugin")
ctx = log.IntoContext(ctx, l)

ns := r.Managed.Namespace
// Retrieve current owned objects
err := r.Managed.FetchAll(ctx)
Expand Down Expand Up @@ -148,7 +139,7 @@ func (r *CPReconciler) checkAutoPatch(ctx context.Context, desired *flowslatest.
}

func (r *CPReconciler) reconcilePermissions(ctx context.Context, builder *builder) error {
if !r.Managed.Exists(r.owned.serviceAccount) {
if !r.Managed.Exists(r.serviceAccount) {
return r.CreateOwned(ctx, builder.serviceAccount())
} // update not needed for now

Expand Down Expand Up @@ -193,12 +184,12 @@ func (r *CPReconciler) reconcileConfigMap(ctx context.Context, builder *builder)
if err != nil {
return "", err
}
if !r.Managed.Exists(r.owned.configMap) {
if !r.Managed.Exists(r.configMap) {
if err := r.CreateOwned(ctx, newCM); err != nil {
return "", err
}
} else if !reflect.DeepEqual(newCM.Data, r.owned.configMap.Data) {
if err := r.UpdateIfOwned(ctx, r.owned.configMap, newCM); err != nil {
} else if !reflect.DeepEqual(newCM.Data, r.configMap.Data) {
if err := r.UpdateIfOwned(ctx, r.configMap, newCM); err != nil {
return "", err
}
}
Expand All @@ -209,34 +200,31 @@ func (r *CPReconciler) reconcileDeployment(ctx context.Context, builder *builder
report := helper.NewChangeReport("Console deployment")
defer report.LogIfNeeded(ctx)

newDepl := builder.deployment(cmDigest)
if !r.Managed.Exists(r.owned.deployment) {
if err := r.CreateOwned(ctx, newDepl); err != nil {
return err
}
} else if helper.DeploymentChanged(r.owned.deployment, newDepl, constants.PluginName, helper.HPADisabled(&desired.ConsolePlugin.Autoscaler), helper.PtrInt32(desired.ConsolePlugin.Replicas), &report) {
if err := r.UpdateIfOwned(ctx, r.owned.deployment, newDepl); err != nil {
return err
}
} else {
r.CheckDeploymentInProgress(r.owned.deployment)
}
return nil
return reconcilers.ReconcileDeployment(
ctx,
r.Instance,
r.deployment,
builder.deployment(cmDigest),
constants.PluginName,
helper.PtrInt32(desired.ConsolePlugin.Replicas),
&desired.ConsolePlugin.Autoscaler,
&report,
)
}

func (r *CPReconciler) reconcileServices(ctx context.Context, builder *builder) error {
report := helper.NewChangeReport("Console services")
defer report.LogIfNeeded(ctx)

if err := r.ReconcileService(ctx, r.owned.service, builder.mainService(), &report); err != nil {
if err := r.ReconcileService(ctx, r.service, builder.mainService(), &report); err != nil {
return err
}
if err := r.ReconcileService(ctx, r.owned.metricsService, builder.metricsService(), &report); err != nil {
if err := r.ReconcileService(ctx, r.metricsService, builder.metricsService(), &report); err != nil {
return err
}
if r.AvailableAPIs.HasSvcMonitor() {
serviceMonitor := builder.serviceMonitor()
if err := reconcilers.GenericReconcile(ctx, r.Managed, &r.Client, r.owned.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
if err := reconcilers.GenericReconcile(ctx, r.Managed, &r.Client, r.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
}
}
Expand All @@ -247,22 +235,14 @@ func (r *CPReconciler) reconcileHPA(ctx context.Context, builder *builder, desir
report := helper.NewChangeReport("Console autoscaler")
defer report.LogIfNeeded(ctx)

// Delete or Create / Update Autoscaler according to HPA option
if helper.HPADisabled(&desired.ConsolePlugin.Autoscaler) {
r.Managed.TryDelete(ctx, r.owned.hpa)
} else {
newASC := builder.autoScaler()
if !r.Managed.Exists(r.owned.hpa) {
if err := r.CreateOwned(ctx, newASC); err != nil {
return err
}
} else if helper.AutoScalerChanged(r.owned.hpa, desired.ConsolePlugin.Autoscaler, &report) {
if err := r.UpdateIfOwned(ctx, r.owned.hpa, newASC); err != nil {
return err
}
}
}
return nil
return reconcilers.ReconcileHPA(
ctx,
r.Instance,
r.hpa,
builder.autoScaler(),
&desired.ConsolePlugin.Autoscaler,
&report,
)
}

func pluginNeedsUpdate(plg *osv1alpha1.ConsolePlugin, desired *pluginSpec) bool {
Expand Down
9 changes: 9 additions & 0 deletions controllers/controllers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package controllers

import (
"github.com/netobserv/network-observability-operator/controllers/flp"
"github.com/netobserv/network-observability-operator/controllers/monitoring"
"github.com/netobserv/network-observability-operator/pkg/manager"
)

var Registerers = []manager.Registerer{Start, flp.Start, monitoring.Start}
19 changes: 8 additions & 11 deletions controllers/ebpf/agent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/ebpf/internal/permissions"
"github.com/netobserv/network-observability-operator/controllers/operator"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"
"github.com/netobserv/network-observability-operator/pkg/volumes"
Expand Down Expand Up @@ -83,22 +82,19 @@ const (
// associated objects that are required to bind the proper permissions: namespace, service
// accounts, SecurityContextConstraints...
type AgentController struct {
reconcilers.Common
*reconcilers.Instance
permissions permissions.Reconciler
config *operator.Config
volumes volumes.Builder
}

func NewAgentController(common *reconcilers.Common, config *operator.Config) *AgentController {
func NewAgentController(common *reconcilers.Instance) *AgentController {
return &AgentController{
Common: *common,
Instance: common,
permissions: permissions.NewReconciler(common),
config: config,
}
}

func (c *AgentController) Reconcile(
ctx context.Context, target *flowslatest.FlowCollector) error {
func (c *AgentController) Reconcile(ctx context.Context, target *flowslatest.FlowCollector) error {
rlog := log.FromContext(ctx).WithName("ebpf")
ctx = log.IntoContext(ctx, rlog)
current, err := c.current(ctx)
Expand Down Expand Up @@ -136,13 +132,14 @@ func (c *AgentController) Reconcile(
switch requiredAction(current, desired) {
case actionCreate:
rlog.Info("action: create agent")
c.Status.SetCreatingDaemonSet(desired)
return c.CreateOwned(ctx, desired)
case actionUpdate:
rlog.Info("action: update agent")
return c.UpdateIfOwned(ctx, current, desired)
default:
rlog.Info("action: nothing to do")
c.CheckDaemonSetInProgress(current)
c.Status.CheckDaemonSetProgress(current)
return nil
}
}
Expand Down Expand Up @@ -178,7 +175,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol
if coll == nil || !helper.UseEBPF(&coll.Spec) {
return nil, nil
}
version := helper.ExtractVersion(c.config.EBPFAgentImage)
version := helper.ExtractVersion(c.Image)
annotations := make(map[string]string)
env, err := c.envConfig(ctx, coll, annotations)
if err != nil {
Expand Down Expand Up @@ -256,7 +253,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol
Volumes: volumes,
Containers: []corev1.Container{{
Name: constants.EBPFAgentName,
Image: c.config.EBPFAgentImage,
Image: c.Image,
ImagePullPolicy: corev1.PullPolicy(coll.Spec.Agent.EBPF.ImagePullPolicy),
Resources: coll.Spec.Agent.EBPF.Resources,
SecurityContext: c.securityContext(coll),
Expand Down
6 changes: 3 additions & 3 deletions controllers/ebpf/internal/permissions/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ var AllowedCapabilities = []v1.Capability{"BPF", "PERFMON", "NET_ADMIN", "SYS_RE
// - Create netobserv-ebpf-agent service account in the privileged namespace
// - For Openshift, apply the required SecurityContextConstraints for privileged Pod operation
type Reconciler struct {
reconcilers.Common
*reconcilers.Instance
}

func NewReconciler(cmn *reconcilers.Common) Reconciler {
return Reconciler{Common: *cmn}
func NewReconciler(cmn *reconcilers.Instance) Reconciler {
return Reconciler{Instance: cmn}
}

func (c *Reconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCollectorEBPF) error {
Expand Down
Loading

0 comments on commit 79eb0be

Please sign in to comment.