Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions trafficUtil/kafkaUtil/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func UpdateDebugStringsFromFile() {

func checkDebugUrlAndPrint(url string, host string, message string) {
// url or host. [array string]
message = "mannakto: " + message
if len(DebugStrings) > 0 {
for _, debugString := range DebugStrings {
if strings.Contains(url, debugString) {
Expand Down Expand Up @@ -434,7 +435,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d
// Process id was captured from the eBPF program using bpf_get_current_pid_tgid()
// Shifting by 32 gives us the process id on host machine.
var pid = idfd >> 32
log := fmt.Sprintf("pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v",
log := fmt.Sprintf("direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v",
direction,
reqHeaderStr["host"],
value["path"],
Expand All @@ -444,27 +445,29 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d
pid,
hostName,
)
utils.PrintLog(log)
checkDebugUrlAndPrint(url, req.Host, log)
slog.Warn("before resolving labels pod direction log" + log)
checkDebugUrlAndPrint(url, req.Host, "before resolving labels pod direction log" + log)

if PodInformerInstance != nil && direction == utils.DirectionInbound {

if hostName == "" {
checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid))
slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName)
checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid) + log)
slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName, "log", log)
} else {
checkDebugUrlAndPrint(url, req.Host, "Resolving Pod Name to labels podName: " + hostName + "log: " + log)
podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host)
if err != nil {
slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err)
checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName)
checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName + "log: " + log)
} else {
value["tag"] = podLabels
checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName)
checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName + "log: " + log)
slog.Debug("Pod labels", "podName", hostName, "labels", podLabels)
}
}
} else {
checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction))
slog.Warn("Pod labels not resolved, PodInformerInstance is nil or direction is not inbound", "log", log)
checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound" + "log: " + log)
}

out, _ := json.Marshal(value)
Expand Down
11 changes: 8 additions & 3 deletions trafficUtil/kafkaUtil/podinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,13 @@ func (w *PodInformer) GetPodNameByProcessId(pid int32) string {
if hostName, ok := w.pidHostNameMap[pid]; ok {
return hostName
}
slog.Warn("Hostname not found for", "processId", pid)
cmd := exec.Command("sh", "-c", fmt.Sprintf("ps -p %d -o comm=", pid))
output, err := cmd.Output()
if err != nil {
slog.Error("Failed to get process name", "pid", pid)
}
slog.Warn("Hostname not found for", "processId", pid, "commandName", output)

return ""
}

Expand Down Expand Up @@ -152,7 +158,6 @@ func (w *PodInformer) BuildPidHostNameMap() {
}

func (w *PodInformer) ResolvePodLabels(podName string, url, reqHost string) (string, error) {
slog.Debug("Resolving Pod Name to labels", "podName", podName)
checkDebugUrlAndPrint(url, reqHost, "Resolving Pod Name to labels for "+podName)

// Step 1: Use the pod name as the key to find labels in podNameLabelsMap
Expand Down Expand Up @@ -345,4 +350,4 @@ func (w *PodInformer) handlePodDelete(obj interface{}) {
// Build the PID to Hostname map again to ensure it is up-to-date
// TODO: Optimize this ? What's the rate of pod add events?
w.BuildPidHostNameMap()
}
}
Loading