Skip to content

Commit

Permalink
Extract ReplicaSet name from pod name
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac committed Sep 25, 2024
1 parent fb456f1 commit 1b44502
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 20 deletions.
14 changes: 7 additions & 7 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,13 @@ func (wk *watcherKubeEnricher) getReplicaSetPods(namespace, name string) []*kube

func (wk *watcherKubeEnricher) updateNewPodsByOwnerIndex(pod *kube.PodInfo) {
if pod.Owner != nil {
wk.podsByOwner.Put(nsName{namespace: pod.Namespace, name: pod.Owner.Name}, pod.Name, pod)
wk.podsByOwner.Put(nsName{namespace: pod.Namespace, name: pod.Owner.OwnerName()}, pod.Name, pod)
}
}

func (wk *watcherKubeEnricher) updateDeletedPodsByOwnerIndex(pod *kube.PodInfo) {
if pod.Owner != nil {
wk.podsByOwner.Delete(nsName{namespace: pod.Namespace, name: pod.Owner.Name}, pod.Name)
wk.podsByOwner.Delete(nsName{namespace: pod.Namespace, name: pod.Owner.OwnerName()}, pod.Name)
}
}

Expand All @@ -325,16 +325,16 @@ func withMetadata(pp processAttrs, info *kube.PodInfo) processAttrs {
ret.podLabels = info.Labels
owner := info.Owner
for owner != nil {
ret.metadata[services.AttrOwnerName] = owner.Name
ret.metadata[services.AttrOwnerName] = info.Owner.OwnerName()
switch owner.LabelName {
case kube.OwnerDaemonSet:
ret.metadata[services.AttrDaemonSetName] = owner.Name
ret.metadata[services.AttrDaemonSetName] = info.Owner.OwnerName()
case kube.OwnerReplicaSet:
ret.metadata[services.AttrReplicaSetName] = owner.Name
ret.metadata[services.AttrReplicaSetName] = info.Owner.OwnerName()
case kube.OwnerDeployment:
ret.metadata[services.AttrDeploymentName] = owner.Name
ret.metadata[services.AttrDeploymentName] = info.Owner.OwnerName()
case kube.OwnerStatefulSet:
ret.metadata[services.AttrStatefulSetName] = owner.Name
ret.metadata[services.AttrStatefulSetName] = info.Owner.OwnerName()
}
owner = owner.Owner
}
Expand Down
12 changes: 3 additions & 9 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
// to report as owner.
func (k *Metadata) FetchPodOwnerInfo(pod *PodInfo) {
if pod.Owner != nil && pod.Owner.LabelName == OwnerReplicaSet {
if rsi, ok := k.GetReplicaSetInfo(pod.Namespace, pod.Owner.Name); ok {
if rsi, ok := k.GetReplicaSetInfo(pod.Namespace, pod.Owner.OwnerName()); ok {
pod.Owner.Owner = rsi.Owner
}
}
Expand Down Expand Up @@ -430,14 +430,8 @@ func (k *Metadata) AddNodeEventHandler(h cache.ResourceEventHandler) error {
}

func (i *PodInfo) ServiceName() string {
if i.Owner != nil {
// we have two levels of ownership at most
if i.Owner.Owner != nil {
return i.Owner.Owner.Name
}

return i.Owner.Name
if on := i.Owner.OwnerName(); on != "" {
return on
}

return i.Name
}
28 changes: 28 additions & 0 deletions pkg/internal/kube/owner.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kube

import (
"regexp"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -21,6 +22,7 @@ const (
type Owner struct {
Kind string
LabelName OwnerLabel
// Name of the Owner, usually you will need to use the `OwnerName` method
Name string
// Owner of the owner. For example, a ReplicaSet might be owned by a Deployment
Owner *Owner
Expand Down Expand Up @@ -62,6 +64,32 @@ func unrecognizedOwner(or *metav1.OwnerReference) *Owner {
}
}

var rsOwnerRegexp = regexp.MustCompile(`^([\w\-]+)(-\w{8})?$`)

// OwnerName returns the top-level name of the owner chain.
// For example, if the owner is a ReplicaSet, it will return the Deployment name.
func (o *Owner) OwnerName() string {
if o == nil {
return ""
}
if o.LabelName == OwnerReplicaSet {
// we have two levels of ownership at most
if o.Owner != nil {
return o.Owner.Name
}
// if the replicaset informer is disabled, we can't get the owner deployment,
// so we will heuristically extract it from the Pod Name (and cache it)
deploymentNames := rsOwnerRegexp.FindStringSubmatch(o.Name)
if len(deploymentNames) > 1 {
o.Owner = &Owner{
Name: deploymentNames[1],
LabelName: OwnerUnknown,
}
}
}
return o.Name
}

func (o *Owner) String() string {
sb := strings.Builder{}
o.string(&sb)
Expand Down
2 changes: 1 addition & 1 deletion pkg/transform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (md *metadataDecorator) appendMetadata(span *request.Span, info *kube.PodIn
}
owner := info.Owner
for owner != nil {
span.ServiceID.Metadata[attr.Name(owner.LabelName)] = owner.Name
span.ServiceID.Metadata[attr.Name(owner.LabelName)] = info.Owner.OwnerName()
owner = owner.Owner
}
// override hostname by the Pod name
Expand Down
2 changes: 1 addition & 1 deletion test/integration/components/rusttestserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ mio = "0.8.11"
actix-http = "3.6.0"
mime = "0.3.17"
actix-files = "0.6.5"
async-stream = "0.3.5"
async-stream = "0.3.5"
10 changes: 8 additions & 2 deletions test/integration/components/rusttestserver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::fs;
use std::io::Read;
use reqwest;
use tokio;
use std::env;

#[derive(Debug, Serialize, Deserialize)]
struct MyObj {
Expand Down Expand Up @@ -120,7 +121,12 @@ async fn dist2() -> HttpResponse {
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

log::info!("starting HTTP server at http://localhost:8090");
let port = env::var("PORT")
.unwrap_or_else(|_| "8090".to_string()) // Default to 8090 if the environment variable is not set
.parse()
.expect("PORT must be a number");

log::info!("starting HTTP server at http://localhost:{}", port);

HttpServer::new(|| {
App::new()
Expand All @@ -136,7 +142,7 @@ async fn main() -> std::io::Result<()> {
.service(web::resource("/download1").route(web::get().to(download1)))
.service(web::resource("/download2").route(web::get().to(download2)))
})
.bind(("0.0.0.0", 8090))?
.bind(("0.0.0.0", port))?
.run()
.await
}

0 comments on commit 1b44502

Please sign in to comment.