Skip to content

Commit

Permalink
[receiver/receiver_creator] Add support for enabling logs' collecting…
Browse files Browse the repository at this point in the history
… from K8s hints (#36581)

#### Description

This PR adds the logs part for
#34427
based on the design decided at
#34427 (comment).

See the README docs for the description of this feature:
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35617/files#diff-4127365c4062a7510fb7fede0fa239e9232549732898303d94c12fef0433d39d

#### Link to tracking issue
Fixes
#34427

#### Testing
Added unit-tests

#### Documentation
Added README section

#### How to test this manually

1. Deploy the Collector helm chart:
```yaml
mode: daemonset

image:
  repository: otelcontribcol-dev
  tag: "latest"
  pullPolicy: IfNotPresent

command:
  name: otelcontribcol

clusterRole:
  create: true
  rules:
   - apiGroups:
     - ''
     resources:
     - 'pods'
     - 'nodes'
     verbs:
     - 'get'
     - 'list'
     - 'watch'
   - apiGroups: [ "" ]
     resources: [ "nodes/proxy"]
     verbs: [ "get" ]
   - apiGroups:
       - ""
     resources:
       - nodes/stats
     verbs:
       - get
   - nonResourceURLs:
       - "/metrics"
     verbs:
       - get

extraVolumeMounts:
 - name: varlogpods
   mountPath: /var/log/pods
   readOnly: true

extraVolumes:
  - name: varlogpods
    hostPath:
      path: /var/log/pods

config:
  extensions:
    k8s_observer:
      auth_type: serviceAccount
      node: ${env:K8S_NODE_NAME}
      observe_nodes: true
  exporters:
    debug:
      verbosity: detailed

  receivers:
    receiver_creator/metrics:
      watch_observers: [ k8s_observer ]
      discovery:
        enabled: true
        ignore_receivers:
          - nginx2
      receivers:

    receiver_creator/logs:
      watch_observers: [ k8s_observer ]
      discovery:
        enabled: true
        default_logs_discovery: false
      receivers:


  service:
    extensions: [health_check, k8s_observer]
    telemetry:
      logs:
        level: INFO
    pipelines:
      metrics:
        receivers: [ receiver_creator/metrics ]
        processors: [ batch ]
        exporters: [ debug ]
      logs/discovery:
        receivers: [ receiver_creator/logs ]
        #processors: [ batch ]
        exporters: [ debug ]
```
2. Then deploy a target Pod with 2 containers:
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis-deployment
  labels:
    app: redis
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
      annotations:
        io.opentelemetry.discovery.metrics.6379/enabled: "true"
        io.opentelemetry.discovery.metrics.6379/scraper: redis
        io.opentelemetry.discovery.metrics.6379/signals: metrics
        io.opentelemetry.discovery.metrics.6379/config: |
          collection_interval: "20s"
          timeout: "10s"

        io.opentelemetry.discovery.logs.busybox/enabled: "true"
        io.opentelemetry.discovery.logs.busybox/config: |
          operators:
            - id: some
              type: add
              field: attributes.tag
              value: hints
    spec:
      containers:
        - image: redis
          imagePullPolicy: IfNotPresent
          name: redis
          ports:
            - name: redis
              containerPort: 6379
              protocol: TCP
        - name: busybox
          image: busybox
          args:
            - /bin/sh
            - -c
            - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 15s; done
```
3. Esnure that logs are collected from both containers and that Redis
metrics are collected from the Redis container:
```console
2024-11-28T11:04:14.921Z	info	receivercreator@v0.114.0/observerhandler.go:201	starting receiver	{"kind": "receiver", "name": "receiver_creator/metrics", "data_type": "metrics", "name": "redis/91ec7d5c-c6fb-4977-9dbb-c24a85101326_6379", "endpoint": "10.244.0.6:6379", "endpoint_id": "k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/redis(6379)", "config": {"collection_interval":"20s","endpoint":"10.244.0.6:6379","timeout":"10s"}}
2024-11-28T11:04:14.921Z	info	receivercreator@v0.114.0/observerhandler.go:201	starting receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/91ec7d5c-c6fb-4977-9dbb-c24a85101326_busybox", "endpoint": "10.244.0.6", "endpoint_id": "k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox", "config": {"include":["/var/log/pods/default_redis-deployment-7777bf7db4-5rm6d_91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox/*.log"],"include_file_name":false,"include_file_path":true,"operators":[{"id":"container-parser","type":"container"},{"field":"attributes.tag","id":"some","type":"add","value":"hints"}]}}
2024-11-28T11:04:14.922Z	info	adapter/receiver.go:41	Starting stanza receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/91ec7d5c-c6fb-4977-9dbb-c24a85101326_busybox/receiver_creator/logs{endpoint=\"10.244.0.6\"}/k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox"}
2024-11-28T11:04:15.122Z	info	fileconsumer/file.go:265	Started watching file	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/91ec7d5c-c6fb-4977-9dbb-c24a85101326_busybox/receiver_creator/logs{endpoint=\"10.244.0.6\"}/k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox", "component": "fileconsumer", "path": "/var/log/pods/default_redis-deployment-7777bf7db4-5rm6d_91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox/0.log"}
2024-11-28T11:04:15.979Z	info	Metrics	{"kind": "exporter", "data_type": "metrics", "name": "debug/2", "resource metrics": 1, "metrics": 26, "data points": 31}
```

### Follow-ups

1. File an issue for enhancing default behaviors:
#36581 (comment)

---------

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark authored Jan 7, 2025
1 parent 04d7a69 commit 363c837
Show file tree
Hide file tree
Showing 8 changed files with 714 additions and 15 deletions.
27 changes: 27 additions & 0 deletions .chloggen/f_hints_logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receivercreator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for starting logs' collection based on provided k8s annotations' hints

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34427]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
99 changes: 96 additions & 3 deletions receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ receiver_creator/metrics:
# ignore_receivers: []
```

Find bellow the supported annotations that user can define to automatically enable receivers to start collecting metrics signals from the target Pods/containers.
Find bellow the supported annotations that user can define to automatically enable receivers to start
collecting metrics and logs signals from the target Pods/containers.

### Supported metrics annotations

Expand Down Expand Up @@ -511,11 +512,76 @@ The current implementation relies on the implementation of `k8sobserver` extensi
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
The hints are evaluated per container by extracting the annotations from each [`Port` endpoint](#Port) that is emitted.

### Supported logs annotations

This feature enables `filelog` receiver in order to collect logs from the discovered Pods.

#### Enable/disable discovery

`io.opentelemetry.discovery.logs/enabled` (Required. Example: `"true"`)

By default `"false"`.

#### Define configuration

The default configuration for the `filelog` receiver is the following:

```yaml
include:
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
include_file_name: false
include_file_path: true
operators:
- id: container-parser
type: container
```
This default can be extended or overridden using the respective annotation:
`io.opentelemetry.discovery.logs/config`

**Example:**

```yaml
io.opentelemetry.discovery.logs/config: |
include_file_name: true
max_log_size: "2MiB"
operators:
- type: container
id: container-parser
- type: regex_parser
regex: "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$"
```

`include` cannot be overridden and is fixed to discovered container's log file path.

#### Support multiple target containers

Users can target the annotation to a specific container by suffixing it with the name of that container:
`io.opentelemetry.discovery.logs.<container_name>/endpoint`.
For example:
```yaml
io.opentelemetry.discovery.logs.busybox/config: |
max_log_size: "3MiB"
operators:
- type: container
id: container-parser
- id: some
type: add
field: attributes.tag
value: hints
```
where `busybox` is the name of the target container.

If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and
the Pod level hints are used as a fallback (see detailed example bellow).

The current implementation relies on the implementation of `k8sobserver` extension and specifically
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
The hints are evaluated per container by extracting the annotations from each [`Pod Container` endpoint](#Pod Container) that is emitted.


### Examples

#### Metrics example
#### Metrics and Logs example

Collector's configuration:
```yaml
Expand All @@ -525,12 +591,22 @@ receivers:
discovery:
enabled: true
receivers:
receiver_creator/logs:
watch_observers: [ k8s_observer ]
discovery:
enabled: true
receivers:
service:
extensions: [ k8s_observer]
pipelines:
metrics:
receivers: [ receiver_creator ]
receivers: [ receiver_creator/metrics ]
processors: []
exporters: [ debug ]
logs:
receivers: [ receiver_creator/logs ]
processors: []
exporters: [ debug ]
```
Expand Down Expand Up @@ -600,6 +676,23 @@ spec:
endpoint: "http://`endpoint`/nginx_status"
collection_interval: "30s"
timeout: "20s"

# redis pod container logs hints
io.opentelemetry.discovery.logs.redis/enabled: "true"
io.opentelemetry.discovery.logs.redis/config: |
max_log_size: "4MiB"
operators:
- type: container
id: container-parser
- id: some
type: add
field: attributes.tag
value: logs_hints
# nginx pod container logs hints
io.opentelemetry.discovery.logs.webserver/enabled: "true"
io.opentelemetry.discovery.logs.webserver/config: |
max_log_size: "3MiB"
spec:
volumes:
- name: nginx-conf
Expand Down
37 changes: 37 additions & 0 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,40 @@ func (*nopWithoutEndpointFactory) CreateTraces(
cfg: cfg,
}, nil
}

type nopWithFilelogConfig struct {
Include []string `mapstructure:"include"`
IncludeFileName bool `mapstructure:"include_file_name"`
IncludeFilePath bool `mapstructure:"include_file_path"`
Operators []any `mapstructure:"operators"`
}

type nopWithFilelogFactory struct {
rcvr.Factory
}

type nopWithFilelogReceiver struct {
mockComponent
consumer.Logs
consumer.Metrics
consumer.Traces
rcvr.Settings
cfg component.Config
}

func (*nopWithFilelogFactory) CreateDefaultConfig() component.Config {
return &nopWithFilelogConfig{}
}

func (*nopWithFilelogFactory) CreateLogs(
_ context.Context,
rcs rcvr.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (rcvr.Logs, error) {
return &nopWithEndpointReceiver{
Logs: nextConsumer,
Settings: rcs,
cfg: cfg,
}, nil
}
98 changes: 94 additions & 4 deletions receiver/receivercreator/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ const (

// hint suffix for metrics
otelMetricsHints = otelHints + ".metrics"
otelLogsHints = otelHints + ".logs"

// hints definitions
discoveryEnabledHint = "enabled"
scraperHint = "scraper"
configHint = "config"

logsReceiver = "filelog"
defaultLogPathPattern = "/var/log/pods/%s_%s_%s/%s/*.log"
)

// k8sHintsBuilder creates configurations from hints provided as Pod's annotations.
Expand Down Expand Up @@ -57,7 +61,7 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
return nil, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env))
}

if endpointType != string(observer.PortType) {
if endpointType != string(observer.PortType) && endpointType != string(observer.PodContainerType) {
return nil, nil
}

Expand All @@ -72,7 +76,14 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
return nil, nil
}

return builder.createScraper(pod.Annotations, env)
switch endpointType {
case string(observer.PortType):
return builder.createScraper(pod.Annotations, env)
case string(observer.PodContainerType):
return builder.createLogsReceiver(pod.Annotations, env)
default:
return nil, nil
}
}

func (builder *k8sHintsBuilder) createScraper(
Expand All @@ -91,7 +102,7 @@ func (builder *k8sHintsBuilder) createScraper(
port = p.Port
pod := p.Pod

if !discoveryMetricsEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
if !discoveryEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
return nil, nil
}

Expand All @@ -118,6 +129,48 @@ func (builder *k8sHintsBuilder) createScraper(
return &recTemplate, err
}

func (builder *k8sHintsBuilder) createLogsReceiver(
annotations map[string]string,
env observer.EndpointEnv,
) (*receiverTemplate, error) {
if _, ok := builder.ignoreReceivers[logsReceiver]; ok {
// receiver is ignored
return nil, nil
}

var containerName string
var c observer.PodContainer
err := mapstructure.Decode(env, &c)
if err != nil {
return nil, fmt.Errorf("could not extract pod's container: %v", zap.Any("env", env))
}
if c.Name == "" {
return nil, fmt.Errorf("could not extract container name: %v", zap.Any("container", c))
}
containerName = c.Name
pod := c.Pod

if !discoveryEnabled(annotations, otelLogsHints, containerName) {
return nil, nil
}

subreceiverKey := logsReceiver
builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey))

userConfMap := createLogsConfig(
annotations,
containerName,
pod.UID,
pod.Name,
pod.Namespace,
builder.logger)

recTemplate, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, pod.UID, containerName), userConfMap)
recTemplate.signals = receiverSignals{metrics: false, logs: true, traces: false}

return &recTemplate, err
}

func getScraperConfFromAnnotations(
annotations map[string]string,
defaultEndpoint, scopeSuffix string,
Expand Down Expand Up @@ -149,6 +202,43 @@ func getScraperConfFromAnnotations(
return conf, nil
}

func createLogsConfig(
annotations map[string]string,
containerName, podUID, podName, namespace string,
logger *zap.Logger,
) userConfigMap {
scopeSuffix := containerName
logPath := fmt.Sprintf(defaultLogPathPattern, namespace, podName, podUID, containerName)
cont := []any{map[string]any{"id": "container-parser", "type": "container"}}
defaultConfMap := userConfigMap{
"include": []string{logPath},
"include_file_path": true,
"include_file_name": false,
"operators": cont,
}

configStr, found := getHintAnnotation(annotations, otelLogsHints, configHint, scopeSuffix)
if !found || configStr == "" {
return defaultConfMap
}

userConf := make(map[string]any)
if err := yaml.Unmarshal([]byte(configStr), &userConf); err != nil {
logger.Debug("could not unmarshal configuration from hint", zap.Error(err))
}

for k, v := range userConf {
if k == "include" {
// path cannot be other than the one of the target container
logger.Warn("include setting cannot be set through annotation's hints")
continue
}
defaultConfMap[k] = v
}

return defaultConfMap
}

func getHintAnnotation(annotations map[string]string, hintBase string, hintKey string, suffix string) (string, bool) {
// try to scope the hint more on container level by suffixing
// with .<port> in case of Port event or # TODO: .<container_name> in case of Pod Container event
Expand All @@ -162,7 +252,7 @@ func getHintAnnotation(annotations map[string]string, hintBase string, hintKey s
return podLevelHint, ok
}

func discoveryMetricsEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
func discoveryEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
enabledHint, found := getHintAnnotation(annotations, hintBase, discoveryEnabledHint, scopeSuffix)
if !found {
return false
Expand Down
Loading

0 comments on commit 363c837

Please sign in to comment.