Skip to content

Commit

Permalink
[extension/observer/k8sobserver] add k8s.ingress endpoint (#33005)
Browse files Browse the repository at this point in the history
**Description:** Add support for k8s.ingress endpoint

As described in the issue, this will allow users to dynamically obtain
Kubernetes ingress ressource, facilitating the monitoring of certificate
expiration with the Blackbox Exporter for example.
I didn't create a global ingress resource with sub-endpoints for each
path because I don't see a relevant 'target'. I'm uncertain about the
impact of my decision regarding the 'UID' as it's structured as
'{namespace}/{ingress UID}/{host}{path}'. Since a path automatically
starts with a '/', and can contain other '/', should I escape them ?

**Link to tracking Issue:** <Issue number if applicable>
#32971

---------

Signed-off-by: Ludovic Ortega <ludovic.ortega@adminafk.fr>
Co-authored-by: Chris Mark <chrismarkou92@gmail.com>
  • Loading branch information
M0NsTeRRR and ChrsMark authored Jun 27, 2024
1 parent 968d4b8 commit 7c573a9
Show file tree
Hide file tree
Showing 17 changed files with 457 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat_k8sobserver_ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobserver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for k8s.ingress endpoint.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32971]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
39 changes: 39 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
PodType EndpointType = "pod"
// K8sServiceType is a service endpoint.
K8sServiceType EndpointType = "k8s.service"
// K8sIngressType is a ingress endpoint.
K8sIngressType EndpointType = "k8s.ingress"
// K8sNodeType is a Kubernetes Node endpoint.
K8sNodeType EndpointType = "k8s.node"
// HostPortType is a hostport endpoint.
Expand Down Expand Up @@ -151,6 +153,43 @@ func (s *K8sService) Type() EndpointType {
return K8sServiceType
}

// K8sIngress is a discovered k8s ingress.
type K8sIngress struct {
// Name of the ingress.
Name string
// UID is the unique ID in the cluster for the ingress.
UID string
// Labels is a map of user-specified metadata.
Labels map[string]string
// Annotations is a map of user-specified metadata.
Annotations map[string]string
// Namespace must be unique for ingress with same name.
Namespace string
// Scheme represents whether the ingress path is accessible via HTTPS or HTTP.
Scheme string
// Host is the fully qualified domain name of a network host
Host string
// Path that map requests to backends
Path string
}

func (s *K8sIngress) Env() EndpointEnv {
return map[string]any{
"uid": s.UID,
"name": s.Name,
"labels": s.Labels,
"annotations": s.Annotations,
"namespace": s.Namespace,
"scheme": s.Scheme,
"host": s.Host,
"path": s.Path,
}
}

func (s *K8sIngress) Type() EndpointType {
return K8sIngressType
}

// Pod is a discovered k8s pod.
type Pod struct {
// Name of the pod.
Expand Down
6 changes: 4 additions & 2 deletions extension/observer/k8sobserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<!-- end autogenerated section -->

The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
Kubernetes pod, port, service and node endpoints via the Kubernetes API.
Kubernetes pod, port, service, ingress and node endpoints via the Kubernetes API.

## Example Config

Expand All @@ -25,6 +25,7 @@ extensions:
observe_pods: true
observe_nodes: true
observe_services: true
observe_ingresses: true

receivers:
receiver_creator:
Expand Down Expand Up @@ -70,4 +71,5 @@ All fields are optional.
| node | string | <no value> | The node name to limit the discovery of pod, port, and node endpoints. Providing no value (the default) results in discovering endpoints for all available nodes. |
| observe_pods | bool | `true` | Whether to report observer pod and port endpoints. If `true` and `node` is specified it will only discover pod and port endpoints whose `spec.nodeName` matches the provided node name. If `true` and `node` isn't specified, it will discover all available pod and port endpoints. Please note that Collector connectivity to pods from other nodes is dependent on your cluster configuration and isn't guaranteed. |
| observe_nodes | bool | `false` | Whether to report observer k8s.node endpoints. If `true` and `node` is specified it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and `node` isn't specified, it will discover all available node endpoints. Please note that Collector connectivity to nodes is dependent on your cluster configuration and isn't guaranteed.|
| observe_services | bool | `false` | Whether to report observer k8s.service endpoints.|
| observe_services | bool | `false` | Whether to report observer k8s.service endpoints.|
| observe_ingresses | bool | `false` | Whether to report observer k8s.ingress endpoints.|
6 changes: 4 additions & 2 deletions extension/observer/k8sobserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ type Config struct {
ObserveNodes bool `mapstructure:"observe_nodes"`
// ObserveServices determines whether to report observer service and port endpoints. `false` by default.
ObserveServices bool `mapstructure:"observe_services"`
// ObserveIngresses determines whether to report observer ingress. `false` by default.
ObserveIngresses bool `mapstructure:"observe_ingresses"`
}

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
if !cfg.ObservePods && !cfg.ObserveNodes && !cfg.ObserveServices {
return fmt.Errorf("one of observe_pods, observe_nodes and observe_services must be true")
if !cfg.ObservePods && !cfg.ObserveNodes && !cfg.ObserveServices && !cfg.ObserveIngresses {
return fmt.Errorf("one of observe_pods, observe_nodes, observe_services and observe_ingresses must be true")
}
return nil
}
13 changes: 7 additions & 6 deletions extension/observer/k8sobserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "observe-all"),
expected: &Config{
Node: "",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
ObservePods: true,
ObserveNodes: true,
ObserveServices: true,
Node: "",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
ObservePods: true,
ObserveNodes: true,
ObserveServices: true,
ObserveIngresses: true,
},
},
{
Expand All @@ -52,7 +53,7 @@ func TestLoadConfig(t *testing.T) {
},
{
id: component.NewIDWithName(metadata.Type, "invalid_no_observing"),
expectedErr: "one of observe_pods, observe_nodes and observe_services must be true",
expectedErr: "one of observe_pods, observe_nodes, observe_services and observe_ingresses must be true",
},
}
for _, tt := range tests {
Expand Down
20 changes: 20 additions & 0 deletions extension/observer/k8sobserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
Expand Down Expand Up @@ -49,6 +50,8 @@ func (h *handler) OnAdd(objectInterface any, _ bool) {
endpoints = convertPodToEndpoints(h.idNamespace, object)
case *v1.Service:
endpoints = convertServiceToEndpoints(h.idNamespace, object)
case *networkingv1.Ingress:
endpoints = convertIngressToEndpoints(h.idNamespace, object)
case *v1.Node:
endpoints = append(endpoints, convertNodeToEndpoint(h.idNamespace, object))
default: // unsupported
Expand Down Expand Up @@ -92,6 +95,19 @@ func (h *handler) OnUpdate(oldObjectInterface, newObjectInterface any) {
newEndpoints[e.ID] = e
}

case *networkingv1.Ingress:
newIngress, ok := newObjectInterface.(*networkingv1.Ingress)
if !ok {
h.logger.Warn("skip updating endpoint for ingress as the update is of different type", zap.Any("oldIngress", oldObjectInterface), zap.Any("newObject", newObjectInterface))
return
}
for _, e := range convertIngressToEndpoints(h.idNamespace, oldObject) {
oldEndpoints[e.ID] = e
}
for _, e := range convertIngressToEndpoints(h.idNamespace, newIngress) {
newEndpoints[e.ID] = e
}

case *v1.Node:
newNode, ok := newObjectInterface.(*v1.Node)
if !ok {
Expand Down Expand Up @@ -165,6 +181,10 @@ func (h *handler) OnDelete(objectInterface any) {
if object != nil {
endpoints = convertServiceToEndpoints(h.idNamespace, object)
}
case *networkingv1.Ingress:
if object != nil {
endpoints = convertIngressToEndpoints(h.idNamespace, object)
}
case *v1.Node:
if object != nil {
endpoints = append(endpoints, convertNodeToEndpoint(h.idNamespace, object))
Expand Down
64 changes: 64 additions & 0 deletions extension/observer/k8sobserver/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,70 @@ func TestServiceEndpointsChanged(t *testing.T) {
}, th.ListEndpoints())
}

func TestIngressEndpointsAdded(t *testing.T) {
th := newTestHandler()
th.OnAdd(ingress, true)
assert.ElementsMatch(t, []observer.Endpoint{
{
ID: "test-1/ingress-1-UID/host-1/",
Target: "https://host-1/",
Details: &observer.K8sIngress{
Name: "application-ingress",
Namespace: "default",
UID: "test-1/ingress-1-UID/host-1/",
Labels: map[string]string{"env": "prod"},
Scheme: "https",
Host: "host-1",
Path: "/",
},
}}, th.ListEndpoints())
}

func TestIngressEndpointsRemoved(t *testing.T) {
th := newTestHandler()
th.OnAdd(ingress, true)
th.OnDelete(ingress)
assert.Empty(t, th.ListEndpoints())
}

func TestIngressEndpointsChanged(t *testing.T) {
th := newTestHandler()
// Nothing changed.
th.OnUpdate(ingress, ingress)
require.Empty(t, th.ListEndpoints())

// Labels changed.
changedLabels := ingress.DeepCopy()
changedLabels.Labels["new-label"] = "value"
th.OnUpdate(ingress, changedLabels)

endpoints := th.ListEndpoints()
require.ElementsMatch(t,
[]observer.EndpointID{"test-1/ingress-1-UID/host-1/"},
[]observer.EndpointID{endpoints[0].ID},
)

// Running state changed, one added and one removed.
updatedIngress := ingress.DeepCopy()
updatedIngress.Labels["updated-label"] = "true"
th.OnUpdate(ingress, updatedIngress)
require.ElementsMatch(t, []observer.Endpoint{
{
ID: "test-1/ingress-1-UID/host-1/",
Target: "https://host-1/",
Details: &observer.K8sIngress{
Name: "application-ingress",
Namespace: "default",
UID: "test-1/ingress-1-UID/host-1/",
Labels: map[string]string{"env": "prod", "updated-label": "true"},
Scheme: "https",
Host: "host-1",
Path: "/",
},
},
}, th.ListEndpoints())
}

func TestNodeEndpointsAdded(t *testing.T) {
th := newTestHandler()
th.OnAdd(node1V1, true)
Expand Down
103 changes: 103 additions & 0 deletions extension/observer/k8sobserver/ingress_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"

import (
"fmt"
"net/url"
"strings"

v1 "k8s.io/api/networking/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

// convertIngressToEndpoints converts a ingress instance into a slice of endpoints. The endpoints
// include an endpoint for each path that is mapped to an ingress.
func convertIngressToEndpoints(idNamespace string, ingress *v1.Ingress) []observer.Endpoint {
endpoints := []observer.Endpoint{}

// Loop through every ingress rule to get every defined path.
for _, rule := range ingress.Spec.Rules {
scheme := getScheme(rule.Host, getTLSHosts(ingress))

if rule.HTTP != nil {
// Create endpoint for each ingress rule.
for _, path := range rule.HTTP.Paths {
endpointID := observer.EndpointID(fmt.Sprintf("%s/%s/%s%s", idNamespace, ingress.UID, rule.Host, path.Path))
endpoints = append(endpoints, observer.Endpoint{
ID: endpointID,
Target: (&url.URL{
Scheme: scheme,
Host: rule.Host,
Path: path.Path,
}).String(),
Details: &observer.K8sIngress{
Name: ingress.Name,
UID: string(endpointID),
Labels: ingress.Labels,
Annotations: ingress.Annotations,
Namespace: ingress.Namespace,
Scheme: scheme,
Host: rule.Host,
Path: path.Path,
},
})
}
}

}

return endpoints
}

// getTLSHosts return a list of tls hosts for an ingress ressource.
func getTLSHosts(i *v1.Ingress) []string {
var hosts []string

for _, tls := range i.Spec.TLS {
hosts = append(hosts, tls.Hosts...)
}

return hosts
}

// matchesHostPattern returns true if the host matches the host pattern or wildcard pattern.
func matchesHostPattern(pattern string, host string) bool {
// if host match the pattern (host pattern).
if pattern == host {
return true
}

// if string does not contains any dot, don't do the next part as it's for wildcard pattern.
if !strings.Contains(host, ".") {
return false
}

patternParts := strings.Split(pattern, ".")
hostParts := strings.Split(host, ".")

// If the first part of the pattern is not a wildcard pattern.
if patternParts[0] != "*" {
return false
}

// If host and pattern without wildcard part does not match.
if strings.Join(patternParts[1:], ".") != strings.Join(hostParts[1:], ".") {
return false
}

return true
}

// getScheme return the scheme of an ingress host based on tls configuration.
func getScheme(host string, tlsHosts []string) string {
for _, pattern := range tlsHosts {
if matchesHostPattern(pattern, host) {
return "https"
}
}

return "http"
}
Loading

0 comments on commit 7c573a9

Please sign in to comment.