Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/k8sattributes] Operator resource attributes #37114

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
13 changes: 13 additions & 0 deletions .chloggen/operator-resource-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
change_type: enhancement

component: k8sattributesprocessor

note: Add option to configure resource attributes using the same logic as the OTel operator

issues: [37114]

subtext: |
If you are using the file log receiver, you can now create the same resource attributes as traces (via OTLP) received
from an application instrumented with the OpenTelemetry Operator -
simply by adding the `extract: { operator_rules: { enabled: true } }` configuration to the `k8sattributesprocessor` processor.
See the [documentation](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/k8sattributesprocessor/README.md#config-example) for more details.
17 changes: 13 additions & 4 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,19 @@ k8sattributes/2:
- k8s.node.name
- k8s.pod.start_time
labels:
# This label extraction rule takes the value 'app.kubernetes.io/component' label and maps it to the 'app.label.component' attribute which will be added to the associated resources
- tag_name: app.label.component
key: app.kubernetes.io/component
from: pod
# This label extraction rule takes the value 'app.kubernetes.io/component' label and maps it to the 'app.label.component' attribute which will be added to the associated resources
- tag_name: app.label.component
key: app.kubernetes.io/component
from: pod
operator_rules:
# Apply the operator rules - see https://github.com/open-telemetry/opentelemetry-operator#configure-resource-attributes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid calling this feature operator_rules. If we decide on supporting this from the Collector at first place then it should have a generic name not coupled to the Operator project.

Also from this section it's not clear if we support both labels and annotations (as described in the Operator's docs). Is sth like resource.opentelemetry.io/service.name: "my-service" supported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid calling this feature...

good idea - I just didn't find a good name so far

annotations and labels

the linked page describes it:

Annotations like this

 annotations:
    # this is just an example, you can create any resource attributes you need
    resource.opentelemetry.io/service.name: "my-service"
    resource.opentelemetry.io/service.version: "1.0.0"
    resource.opentelemetry.io/deployment.environment.name: "production"

labels like this

apiVersion: v1
kind: Pod
metadata:
  name: example-pod
  labels:
    app.kubernetes.io/name: "my-service"
    app.kubernetes.io/version: "1.0.0"
    app.kubernetes.io/part-of: "shop"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#27261 suggests well_known_labels

enabled: true
# Also translate the following labels to the specified resource attributes:
# app.kubernetes.io/name => service.name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mentioned before that these should be standardized at some point: open-telemetry/opentelemetry-helm-charts#905

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - I'll work with semconv before I continue with this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created open-telemetry/semantic-conventions#1756 for the spec part

# app.kubernetes.io/version => service.version
# app.kubernetes.io/part-of => service.namespace
# This setting is ignored if 'enabled' is set to false
labels: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use like resource_labels_enabled instead of labels? I think labels are easy to be confused with other field also called labels, but they are of slice type.

pod_association:
- sources:
# This rule associates all resources containing the 'k8s.pod.ip' attribute with the matching pods. If this attribute is not present in the resource, this rule will not be able to find the matching pod.
Expand Down
2 changes: 2 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ type ExtractConfig struct {
// It is a list of FieldExtractConfig type. See FieldExtractConfig
// documentation for more details.
Labels []FieldExtractConfig `mapstructure:"labels"`

OperatorRules kube.OperatorRules `mapstructure:"operator_rules"`
}

// FieldExtractConfig allows specifying an extraction rule to extract a resource attribute from pod (or namespace)
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func createProcessorOpts(cfg component.Config) []option {
opts = append(opts, withExtractMetadata(oCfg.Extract.Metadata...))
opts = append(opts, withExtractLabels(oCfg.Extract.Labels...))
opts = append(opts, withExtractAnnotations(oCfg.Extract.Annotations...))
opts = append(opts, withOperatorExtractRules(oCfg.Extract.OperatorRules))

// filters
opts = append(opts, withFilterNode(oCfg.Filter.Node, oCfg.Filter.NodeFromEnvVar))
Expand Down
77 changes: 61 additions & 16 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,15 @@ func (c *WatchClient) GetNode(nodeName string) (*Node, bool) {
return nil, false
}

func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) (map[string]string, map[string]string) {
tags := map[string]string{}
serviceNames := map[string]string{}
if c.Rules.PodName {
tags[conventions.AttributeK8SPodName] = pod.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SPodName] = pod.Name
}

if c.Rules.PodHostName {
tags[tagHostName] = pod.Spec.Hostname
Expand Down Expand Up @@ -494,7 +498,7 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName {
c.Rules.CronJobName || c.Rules.OperatorRules.Enabled {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
Expand All @@ -504,11 +508,19 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.ReplicaSetName {
tags[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.DeploymentName {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name
}
name := c.deploymentName(ref)
if name != "" {
tags[conventions.AttributeK8SDeploymentName] = name
}
}
if c.Rules.OperatorRules.Enabled {
name := c.deploymentName(ref)
if name != "" {
serviceNames[conventions.AttributeK8SDeploymentName] = name
}
}
if c.Rules.DeploymentUID {
Expand All @@ -525,18 +537,30 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.DaemonSetName {
tags[conventions.AttributeK8SDaemonSetName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SDaemonSetName] = ref.Name
}
case "StatefulSet":
if c.Rules.StatefulSetUID {
tags[conventions.AttributeK8SStatefulSetUID] = string(ref.UID)
}
if c.Rules.StatefulSetName {
tags[conventions.AttributeK8SStatefulSetName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SStatefulSetName] = ref.Name
}
case "Job":
if c.Rules.CronJobName {
if c.Rules.CronJobName || c.Rules.OperatorRules.Enabled {
parts := c.cronJobRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
tags[conventions.AttributeK8SCronJobName] = parts[1]
name := parts[1]
if c.Rules.CronJobName {
tags[conventions.AttributeK8SCronJobName] = name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SCronJobName] = name
}
}
}
if c.Rules.JobUID {
Expand All @@ -545,6 +569,9 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.JobName {
tags[conventions.AttributeK8SJobName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SJobName] = ref.Name
}
}
}
}
Expand All @@ -568,7 +595,16 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
for _, r := range c.Rules.Annotations {
r.extractFromPodMetadata(pod.Annotations, tags, "k8s.pod.annotations.%s")
}
return tags
return tags, serviceNames
}

func (c *WatchClient) deploymentName(ref meta_v1.OwnerReference) string {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
return replicaset.Deployment.Name
}
}
return ""
}

// This function removes all data from the Pod except what is required by extraction rules and pod association
Expand Down Expand Up @@ -633,7 +669,7 @@ func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Po
removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container {
transformedContainer := api_v1.Container{}
transformedContainer.Name = c.Name // we always need the name, it's used for identification
if rules.ContainerImageName || rules.ContainerImageTag {
if rules.ContainerImageName || rules.ContainerImageTag || rules.OperatorRules.Enabled {
transformedContainer.Image = c.Image
}
return transformedContainer
Expand Down Expand Up @@ -698,7 +734,7 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if !needContainerAttributes(c.Rules) {
return containers
}
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag {
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag || c.Rules.OperatorRules.Enabled {
for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
container := &Container{}
name, tag, err := parseNameAndTagFromImage(spec.Image)
Expand All @@ -709,18 +745,26 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if c.Rules.ContainerImageTag {
container.ImageTag = tag
}
if c.Rules.OperatorRules.Enabled {
container.ServiceVersion = tag
}
}
containers.ByName[spec.Name] = container
}
}
for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
container, ok := containers.ByName[apiStatus.Name]
containerName := apiStatus.Name
container, ok := containers.ByName[containerName]
if !ok {
container = &Container{}
containers.ByName[apiStatus.Name] = container
containers.ByName[containerName] = container
}
if c.Rules.ContainerName {
container.Name = apiStatus.Name
container.Name = containerName
}
if c.Rules.OperatorRules.Enabled {
container.ServiceInstanceID = operatorServiceInstanceID(pod, containerName)
container.ServiceName = containerName
}
containerID := apiStatus.ContainerID
// Remove container runtime prefix
Expand Down Expand Up @@ -796,7 +840,7 @@ func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod {
if c.shouldIgnorePod(pod) {
newPod.Ignore = true
} else {
newPod.Attributes = c.extractPodAttributes(pod)
newPod.Attributes, newPod.ServiceNames = c.extractPodAttributes(pod)
if needContainerAttributes(c.Rules) {
newPod.Containers = c.extractPodContainersAttributes(pod)
}
Expand Down Expand Up @@ -1040,7 +1084,8 @@ func needContainerAttributes(rules ExtractionRules) bool {
rules.ContainerName ||
rules.ContainerImageTag ||
rules.ContainerImageRepoDigests ||
rules.ContainerID
rules.ContainerID ||
rules.OperatorRules.Enabled
}

func (c *WatchClient) handleReplicaSetAdd(obj any) {
Expand Down
Loading
Loading