Skip to content

Commit

Permalink
Send notifications events to Icinga Notifications daemon
Browse files Browse the repository at this point in the history
Co-Authored-By: Eric Lippmann <eric.lippmann@icinga.com>
  • Loading branch information
yhabteab and lippserd committed Aug 16, 2024
1 parent 1838f6d commit 62c18a1
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 15 deletions.
83 changes: 70 additions & 13 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-kubernetes/internal"
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/daemon"
"github.com/icinga/icinga-kubernetes/pkg/database"
"github.com/icinga/icinga-kubernetes/pkg/metrics"
"github.com/icinga/icinga-kubernetes/pkg/notifications"
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
factory := informers.NewSharedInformerFactory(clientset, 0)
log := klog.NewKlogr()

var cfg internal.Config
var cfg daemon.Config
err = config.FromYAMLFile(configLocation, &cfg)
if err != nil {
klog.Fatal(errors.Wrap(err, "can't create configuration"))
Expand Down Expand Up @@ -93,9 +94,13 @@ func main() {
}
}

var nclient *notifications.Client
if err := notifications.SyncSourceConfig(context.Background(), db, &cfg.Notifications); err != nil {
klog.Fatal(err)
}
if cfg.Notifications.Url != "" {
nclient = notifications.NewClient(db, cfg.Notifications)
}

g, ctx := errgroup.WithContext(context.Background())

Expand Down Expand Up @@ -135,42 +140,94 @@ func main() {
return s.Run(ctx)
})
g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode)
nodes := internal.NewMultiplex()
if cfg.Notifications.Url != "" {
nodesOut := nodes.Out()
g.Go(func() error { return nclient.Stream(ctx, nodesOut) })
}

return s.Run(ctx)
nodesIn := nodes.In()
g.Go(func() error { return nodes.Do(ctx) })

s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode)
return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(nodesIn)))
})
g.Go(func() error {
pods := make(chan any)
deletePodIds := make(chan interface{})
defer close(pods)
defer close(deletePodIds)
pods := internal.NewMultiplex()
deletedPodUuids := internal.NewMultiplex()

schemav1.SyncContainers(ctx, db, g, pods, deletePodIds)
if cfg.Notifications.Url != "" {
podsOut := pods.Out()
g.Go(func() error { return nclient.Stream(ctx, podsOut) })
}

schemav1.SyncContainers(ctx, db, g, pods.Out(), deletedPodUuids.Out())

f := schemav1.NewPodFactory(clientset)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New)

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(pods)), sync.WithOnDelete(com.ForwardBulk(deletePodIds)))
podsIn := pods.In()
deletedIn := deletedPodUuids.In()

g.Go(func() error { return pods.Do(ctx) })
g.Go(func() error { return deletedPodUuids.Do(ctx) })

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(podsIn)), sync.WithOnDelete(com.ForwardBulk(deletedIn)))
})
g.Go(func() error {
deployments := internal.NewMultiplex()
if cfg.Notifications.Url != "" {
deploymentsOut := deployments.Out()
g.Go(func() error { return nclient.Stream(ctx, deploymentsOut) })
}
s := syncv1.NewSync(db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment)

return s.Run(ctx)
deploymentsIn := deployments.In()
g.Go(func() error { return deployments.Do(ctx) })

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(deploymentsIn)))
})
g.Go(func() error {
daemonSet := internal.NewMultiplex()
if cfg.Notifications.Url != "" {
daemonSetOut := daemonSet.Out()
g.Go(func() error { return nclient.Stream(ctx, daemonSetOut) })
}

daemonSetIn := daemonSet.In()
g.Go(func() error { return daemonSet.Do(ctx) })

s := syncv1.NewSync(db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet)

return s.Run(ctx)
return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(daemonSetIn)))
})
g.Go(func() error {
replicaSet := internal.NewMultiplex()
if cfg.Notifications.Url != "" {
replicaSetOut := replicaSet.Out()
g.Go(func() error { return nclient.Stream(ctx, replicaSetOut) })
}

replicaSetIn := replicaSet.In()
g.Go(func() error { return replicaSet.Do(ctx) })

s := syncv1.NewSync(db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet)

return s.Run(ctx)
return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(replicaSetIn)))
})
g.Go(func() error {
statefulSet := internal.NewMultiplex()
if cfg.Notifications.Url != "" {
statefulSetOut := statefulSet.Out()
g.Go(func() error { return nclient.Stream(ctx, statefulSetOut) })
}

statefulSetIn := statefulSet.In()
g.Go(func() error { return statefulSet.Do(ctx) })

s := syncv1.NewSync(db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet)

return s.Run(ctx)
return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(statefulSetIn)))
})
g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService)
Expand Down
2 changes: 1 addition & 1 deletion internal/config.go → pkg/daemon/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package internal
package daemon

import (
"github.com/icinga/icinga-go-library/database"
Expand Down
95 changes: 95 additions & 0 deletions pkg/notifications/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package notifications

import (
"bytes"
"context"
"encoding/json"
"github.com/icinga/icinga-kubernetes/internal"
"github.com/icinga/icinga-kubernetes/pkg/database"
"github.com/pkg/errors"
"io"
"k8s.io/klog/v2"
"net/http"
"net/url"
)

// Notifiable can be implemented by all k8s types that want to submit notification events to Icinga Notifications.
type Notifiable interface {
// GetNotificationsEvent returns the event data of this type that will be transmitted to Icinga Notifications.
GetNotificationsEvent(baseUrl *url.URL) map[string]any
}

type Client struct {
db *database.Database
client http.Client
Config
}

func NewClient(db *database.Database, c Config) *Client {
return &Client{db: db, client: http.Client{}, Config: c}
}

func (c *Client) ProcessEvent(ctx context.Context, notifiable Notifiable) error {
var username, password string
if !IsAutoCreationEnabled(&c.Config) {
username = c.Config.Username
password = c.Config.Password
} else {
var err error
if username, password, err = retrieveCredentials(ctx, c.db); err != nil {
return err
}
}

baseUrl, err := url.Parse(c.Config.KubernetesWebUrl)
if err != nil {
return errors.Wrapf(err, "cannot parse Icinga for Kubernetes Web URL: %q", c.Config.KubernetesWebUrl)
}

body, err := json.Marshal(notifiable.GetNotificationsEvent(baseUrl))
if err != nil {
return errors.Wrapf(err, "cannot marshal notifications event data of type: %T", notifiable)
}

r, err := http.NewRequest(http.MethodPost, c.Config.Url+"/process-event", bytes.NewBuffer(body))
if err != nil {
return errors.Wrap(err, "cannot create new notifications http request")
}

r.SetBasicAuth(username, password)
r.Header.Set("User-Agent", "icinga-kubernetes/"+internal.Version.Version)
r.Header.Add("Content-Type", "application/json")

res, err := c.client.Do(r)
if err != nil {
return errors.Wrap(err, "cannot send notifications event")
}
defer func() {
_, _ = io.Copy(io.Discard, res.Body)
_ = res.Body.Close()
}()

if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAlreadyReported {
return errors.Errorf("received unexpected http status code from Icinga Notifications: %d", res.StatusCode)
}

return nil
}

// Stream consumes the items from the given `entities` chan and triggers a notifications event for each of them.
func (c *Client) Stream(ctx context.Context, entities <-chan any) error {
for {
select {
case entity, more := <-entities:
if !more {
return nil
}

if err := c.ProcessEvent(ctx, entity.(Notifiable)); err != nil {
klog.Error(err)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
2 changes: 1 addition & 1 deletion pkg/schema/v1/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func GetContainerState(container kcorev1.Container, status kcorev1.ContainerStat
// When pods are deleted, their IDs are streamed through the `deletePods` chan, and this fetches all the container
// IDs matching the respective pod ID from the database and initiates a container deletion stream that cleans up all
// container-related resources.
func SyncContainers(ctx context.Context, db *database.Database, g *errgroup.Group, upsertPods <-chan interface{}, deletePods <-chan interface{}) {
func SyncContainers(ctx context.Context, db *database.Database, g *errgroup.Group, upsertPods, deletePods <-chan interface{}) {
type containerFingerprint struct {
Uuid types.UUID
PodUuid types.UUID
Expand Down
20 changes: 20 additions & 0 deletions pkg/schema/v1/daemon_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
kserializer "k8s.io/apimachinery/pkg/runtime/serializer"
kjson "k8s.io/apimachinery/pkg/runtime/serializer/json"
ktypes "k8s.io/apimachinery/pkg/types"
"net/url"
"strings"
)

Expand Down Expand Up @@ -154,6 +155,25 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) {
d.Yaml = string(output)
}

// GetNotificationsEvent implements the notifications.Notifiable interface.
func (d *DaemonSet) GetNotificationsEvent(baseUrl *url.URL) map[string]any {
daemonSetUrl := baseUrl.JoinPath("/daemonset")
daemonSetUrl.RawQuery = fmt.Sprintf("id=%s", d.Uuid)

return map[string]any{
"name": d.Namespace + "/" + d.Name,
"severity": d.IcingaState.ToSeverity(),
"message": d.IcingaStateReason,
"url": daemonSetUrl.String(),
"tags": map[string]any{
"name": d.Name,
"namespace": d.Namespace,
"uuid": d.Uuid.String(),
"resource": "daemon_set",
},
}
}

func (d *DaemonSet) getIcingaState() (IcingaState, string) {
if d.DesiredNumberScheduled < 1 {
reason := fmt.Sprintf("DaemonSet %s/%s has an invalid desired node count: %d.", d.Namespace, d.Name, d.DesiredNumberScheduled)
Expand Down
20 changes: 20 additions & 0 deletions pkg/schema/v1/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
kserializer "k8s.io/apimachinery/pkg/runtime/serializer"
kjson "k8s.io/apimachinery/pkg/runtime/serializer/json"
ktypes "k8s.io/apimachinery/pkg/types"
"net/url"
"strings"
)

Expand Down Expand Up @@ -166,6 +167,25 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) {
d.Yaml = string(output)
}

// GetNotificationsEvent implements the notifications.Notifiable interface.
func (d *Deployment) GetNotificationsEvent(baseUrl *url.URL) map[string]any {
deploymentUrl := baseUrl.JoinPath("/deployment")
deploymentUrl.RawQuery = fmt.Sprintf("id=%s", d.Uuid)

return map[string]any{
"name": d.Namespace + "/" + d.Name,
"severity": d.IcingaState.ToSeverity(),
"message": d.IcingaStateReason,
"url": deploymentUrl.String(),
"tags": map[string]any{
"name": d.Name,
"namespace": d.Namespace,
"uuid": d.Uuid.String(),
"resource": "deployment",
},
}
}

func (d *Deployment) getIcingaState() (IcingaState, string) {
if gracePeriodReason := IsWithinGracePeriod(d); gracePeriodReason != nil {
return Ok, *gracePeriodReason
Expand Down
17 changes: 17 additions & 0 deletions pkg/schema/v1/icinga_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ func (s IcingaState) Value() (driver.Value, error) {
return s.String(), nil
}

func (s IcingaState) ToSeverity() string {
switch s {
case Ok:
return "ok"
case Pending:
return "info"
case Unknown:
return "err"
case Warning:
return "warning"
case Critical:
return "crit"
default:
panic(fmt.Sprintf("invalid Icinga state %d", s))
}
}

// Assert interface compliance.
var (
_ fmt.Stringer = (*IcingaState)(nil)
Expand Down
20 changes: 20 additions & 0 deletions pkg/schema/v1/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
kjson "k8s.io/apimachinery/pkg/runtime/serializer/json"
knet "k8s.io/utils/net"
"net"
"net/url"
"strings"
)

Expand Down Expand Up @@ -190,6 +191,25 @@ func (n *Node) Obtain(k8s kmetav1.Object) {
}
}

// GetNotificationsEvent implements the notifications.Notifiable interface.
func (n *Node) GetNotificationsEvent(baseUrl *url.URL) map[string]any {
nodeUrl := baseUrl.JoinPath("/node")
nodeUrl.RawQuery = fmt.Sprintf("id=%s", n.Uuid)

return map[string]any{
"name": n.Name,
"severity": n.IcingaState.ToSeverity(),
"message": n.IcingaStateReason,
"url": nodeUrl.String(),
"tags": map[string]any{
"name": n.Name,
"namespace": n.Namespace,
"uuid": n.Uuid.String(),
"resource": "node",
},
}
}

func (n *Node) getIcingaState(node *kcorev1.Node) (IcingaState, string) {
//if node.Status.Phase == kcorev1.NodePending {
// return Pending, fmt.Sprintf("Node %s is pending.", node.Name)
Expand Down
Loading

0 comments on commit 62c18a1

Please sign in to comment.