Skip to content

Commit

Permalink
Implement a key-based process table, rebuild interface
Browse files Browse the repository at this point in the history
Processes are now uniquely identified by a ProcessKey instead of
process IDs as those may be recycled quickly.

Most of the code interfacing with the process table has been pulled
into the early part of Coalesce::transform_event that handles SYSCALL
messages.

The test introduced with #168 now passes.

Fixes #146

Performance-wise, this is a slight improvement over current master.

This branch:

test parse_only      ... bench:  20,237,394 ns/iter (+/- 235,818)
test parse_serialize ... bench:  27,426,770 ns/iter (+/- 255,343)

master (f237694):

test parse_only      ... bench:  22,737,572 ns/iter (+/- 397,532)
test parse_serialize ... bench:  30,010,731 ns/iter (+/- 369,789)
  • Loading branch information
hillu committed Nov 6, 2023
1 parent c5cf5e1 commit 2cdc8df
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 306 deletions.
262 changes: 140 additions & 122 deletions src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use serde_json::json;
use crate::constants::{msg_type::*, ARCH_NAMES, SYSCALL_NAMES};
use crate::label_matcher::LabelMatcher;
use crate::parser::parse;
use crate::proc::{ProcTable, Process};
use crate::proc::{ContainerInfo, ProcTable, Process, ProcessKey};
#[cfg(feature = "procfs")]
use crate::procfs;
use crate::sockaddr::SocketAddr;
Expand Down Expand Up @@ -424,15 +424,15 @@ impl<'a> Coalesce<'a> {
match &k {
Key::Name(r) if r.ends_with(b"pid") => {
let key = Key::NameTranslated(r.clone());
#[cfg(feature = "procfs")]
let proc = self.processes.get_or_insert_from_procfs(pid as _)?;
#[cfg(not(feature = "procfs"))]
let proc = self.processes.get_process(pid as _)?;
if proc.event_id.is_none() && proc.exe.is_none() && proc.ppid == 0 {
let proc = self.processes.get_or_retrieve(pid as _)?;

if let (ProcessKey::Observed { time: _, pid: _ }, None, 0) =
(proc.key, &proc.exe, &proc.ppid)
{
None
} else {
let mut m = Vec::with_capacity(3);
if let Some(id) = proc.event_id {
if let ProcessKey::Event(id) = proc.key {
m.push((
SimpleKey::Literal("ID"),
SimpleValue::Str(rv.put(format!("{}", id))),
Expand Down Expand Up @@ -467,18 +467,17 @@ impl<'a> Coalesce<'a> {
fn transform_event(&mut self, ev: &mut Event) {
let mut arch: Option<u32> = None;
let mut syscall: Option<u32> = None;
let mut pid: Option<u32> = None;
let mut ppid: Option<u32> = None;
let mut comm: Option<NVec> = None;
let mut exe: Option<NVec> = None;
let mut key: Option<NVec> = None;

let mut arch_name: Option<&'static str> = None;
let mut syscall_name: Option<&'static str> = None;

let mut syscall_is_exec = false;
let mut current_process: Option<Process> = None;
let mut parent: Option<Process> = None;

if let Some(EventValues::Single(rv)) = ev.body.get_mut(&SYSCALL) {
let mut proc = Process::default();
let mut extra = 0;
if self.settings.translate_universal {
extra += 16 // syscall, arch
Expand All @@ -503,14 +502,14 @@ impl<'a> Coalesce<'a> {
(Common::Syscall, Number::Dec(n)) if syscall.is_none() => {
syscall = Some(*n as u32)
}
(Common::Pid, Number::Dec(n)) if pid.is_none() => pid = Some(*n as u32),
(Common::PPid, Number::Dec(n)) if ppid.is_none() => ppid = Some(*n as u32),
(Common::Pid, Number::Dec(n)) => proc.pid = *n as u32,
(Common::PPid, Number::Dec(n)) => proc.ppid = *n as u32,
_ => (),
},
(Key::Common(c), Value::Str(r, _)) => match c {
Common::Comm if comm.is_none() => comm = Some(rv.raw[r.clone()].into()),
Common::Exe if exe.is_none() => exe = Some(rv.raw[r.clone()].into()),
Common::Key if key.is_none() => key = Some(rv.raw[r.clone()].into()),
Common::Comm => proc.comm = Some(rv.raw[r.clone()].into()),
Common::Exe => proc.exe = Some(rv.raw[r.clone()].into()),
Common::Key => key = Some(rv.raw[r.clone()].into()),
_ => (),
},
(Key::Name(name), Value::Str(_, _)) => {
Expand All @@ -530,6 +529,97 @@ impl<'a> Coalesce<'a> {
new.push((Key::Literal("ARGV"), Value::List(argv)));
rv.elems = new;
rv.extend(nrv);

if let (Some(arch), Some(syscall)) = (arch, syscall) {
if let Some(an) = ARCH_NAMES.get(&arch) {
arch_name = Some(*an);
if let Some(sn) = SYSCALL_NAMES
.get(*an)
.and_then(|syscall_tbl| syscall_tbl.get(&syscall))
{
syscall_name = Some(sn);

// If we are processing an execve or execveat
// syscall, we'll create a new Process
// instance, assuming that the current process
// table entry for ppid holds the parent.
//
// For non-execve calls, we inspect the
// process table for the current pid entry. If
// the entry and the syscall do not match, we
// assume that we are dealing with a new
// process and create a new Process instance.
//
// If the entry and the syscall match, we keep
// using (and possibly updating) the existing
// Process entry.
syscall_is_exec = sn.contains("execve");
let pr = if !syscall_is_exec {
self.processes.get_or_retrieve(proc.pid)
} else {
None
};
match pr {
Some(pr) if proc.ppid == pr.ppid && proc.exe == pr.exe => {
// existing, plausible process in table
proc.key = pr.key;
proc.parent = pr.parent;
proc.labels = pr.labels.clone();
#[cfg(feature = "procfs")]
{
proc.container_info = pr.container_info.clone();
}
}
_ => {
// first syscall in new process
proc.key = ProcessKey::Event(ev.id);
let pa = self.processes.get_or_retrieve(proc.ppid);
if let Some(pa) = pa {
parent = Some(pa.clone());
proc.parent = Some(pa.key);
let propagated_labels = self
.settings
.proc_propagate_labels
.intersection(&pa.labels)
.cloned();
proc.labels.extend(propagated_labels);
}
#[cfg(feature = "procfs")]
if self.settings.enrich_container {
if let Ok(Some(id)) = procfs::parse_proc_pid_cgroup(proc.pid) {
proc.container_info = Some(ContainerInfo { id });
}
}
self.processes.insert(proc.clone());
}
};

if let Some(label_exe) = &self.settings.label_exe {
for label in label_exe.matches(&proc.exe.clone().unwrap()) {
proc.labels.insert(label.into());
}
}
if let Some(unlabel_exe) = &self.settings.unlabel_exe {
for label in unlabel_exe.matches(&proc.exe.clone().unwrap()) {
proc.labels.insert(label.into());
}
}
}
}
}

if let Some(key) = &key {
if self.settings.filter_keys.contains(key.as_ref()) {
ev.filter = true;
}
if self.settings.proc_label_keys.contains(key.as_ref()) {
proc.labels.insert(key.to_vec());
}
} else if self.settings.filter_null_keys {
ev.filter = true;
}

current_process = Some(proc);
}

if let Some(EventValues::Single(rv)) = ev.body.get_mut(&EXECVE) {
Expand Down Expand Up @@ -614,97 +704,27 @@ impl<'a> Coalesce<'a> {

// ENV
#[cfg(feature = "procfs")]
match pid {
Some(pid) if !self.settings.execve_env.is_empty() => {
if let Ok(vars) =
procfs::get_environ(pid, |k| self.settings.execve_env.contains(k))
{
let map = vars
.iter()
.map(|(k, v)| (SimpleKey::Str(rv.put(k)), SimpleValue::Str(rv.put(v))))
.collect();
new.push((Key::Literal("ENV"), Value::Map(map)));
}
}
_ => (),
};

rv.elems = new;
}

if let (Some(arch), Some(syscall)) = (arch, syscall) {
if let Some(an) = ARCH_NAMES.get(&(arch as u32)) {
arch_name = Some(*an);
if let Some(sn) = SYSCALL_NAMES
.get(*an)
.and_then(|syscall_tbl| syscall_tbl.get(&(syscall as u32)))
if let (Some(proc), false) = (&current_process, self.settings.execve_env.is_empty()) {
if let Ok(vars) =
procfs::get_environ(proc.pid, |k| self.settings.execve_env.contains(k))
{
syscall_name = Some(sn);
if sn.contains("execve") {
syscall_is_exec = true;
}
}
}
}

// register process, add propagated labels from
// parent if applicable
#[cfg(feature = "procfs")]
let parent: Option<Process> =
ppid.and_then(|ppid| self.processes.get_or_insert_from_procfs(ppid));
#[cfg(not(feature = "procfs"))]
let parent: Option<Process> = ppid.and_then(|ppid| self.processes.get_process(ppid));

if let (Some(pid), Some(ppid)) = (pid, ppid) {
#[cfg(feature = "procfs")]
let cond = syscall_is_exec || self.processes.get_or_insert_from_procfs(pid).is_none();
#[cfg(not(feature = "procfs"))]
let cond = syscall_is_exec || self.processes.get_process(pid).is_none();
if cond {
self.processes.add_process(
pid,
ppid,
ev.id,
comm.as_ref().map(|s| s.to_vec()),
exe.as_ref().map(|s| s.to_vec()),
);

if let Some(parent) = &parent {
for l in self
.settings
.proc_propagate_labels
.intersection(&parent.labels)
{
self.processes.add_label(pid, l);
}
let map = vars
.iter()
.map(|(k, v)| (SimpleKey::Str(rv.put(k)), SimpleValue::Str(rv.put(v))))
.collect();
new.push((Key::Literal("ENV"), Value::Map(map)));
}
}
}

if let (Some(pid), Some(key)) = (&pid, &key) {
if self.settings.proc_label_keys.contains(key.as_ref()) {
self.processes.add_label(*pid, key);
}
}

if let (Some(exe), Some(pid), true) = (&exe, &pid, syscall_is_exec) {
if let Some(label_exe) = &self.settings.label_exe {
for label in label_exe.matches(exe) {
self.processes.add_label(*pid, label);
}
}
if let Some(unlabel_exe) = &self.settings.unlabel_exe {
for label in unlabel_exe.matches(exe) {
self.processes.remove_label(*pid, label);
}
}
rv.elems = new;
}

// Handle script enrichment
#[cfg(feature = "procfs")]
let script: Option<NVec> = match (self.settings.enrich_script, self.settings.label_script) {
(false, None) => None,
_ => match (&exe, pid, ev.body.get(&PATH), syscall_is_exec) {
(Some(exe), Some(pid), Some(EventValues::Multi(paths)), true) => {
_ => match (&current_process, ev.body.get(&PATH), syscall_is_exec) {
(Some(proc), Some(EventValues::Multi(paths)), true) => {
let mut cwd = &b"/"[..];
if let Some(EventValues::Single(r)) = ev.body.get(&CWD) {
if let Some(rv) = r.get("cwd") {
Expand All @@ -713,43 +733,42 @@ impl<'a> Coalesce<'a> {
}
}
};
path_script_name(&paths[0], pid, cwd, exe)
path_script_name(
&paths[0],
proc.pid,
cwd,
&proc.exe.clone().unwrap_or_default(),
)
}
_ => None,
},
};

#[cfg(feature = "procfs")]
if let (Some(pid), Some(script)) = (pid, &script) {
if let (Some(ref mut proc), Some(script)) = (&mut current_process, &script) {
if let Some(label_script) = self.settings.label_script {
for label in label_script.matches(script.as_ref()) {
self.processes.add_label(pid, label);
proc.labels.insert(label.into());
}
}
if let Some(unlabel_script) = self.settings.unlabel_script {
for label in unlabel_script.matches(script.as_ref()) {
self.processes.remove_label(pid, label);
proc.labels.remove(label);
}
}
}

// Since the event may be dropped here, manipulation of any
// other state should not occur below.
if let Some(key) = &key {
if self.settings.filter_keys.contains(key.as_ref()) {
ev.filter = true;
return;
}
} else if self.settings.filter_null_keys {
ev.filter = true;
if let Some(proc) = &current_process {
self.processes.set_labels(&proc.key, &proc.labels)
}

if ev.filter {
return;
}

#[cfg(feature = "procfs")]
let proc: Option<Process> =
pid.and_then(|pid| self.processes.get_or_insert_from_procfs(pid));
#[cfg(not(feature = "procfs"))]
let proc: Option<Process> = pid.and_then(|pid| self.processes.get_process(pid));
// Since the event may have been dropped here, don't
// manipulate the current process below.
let current_process = current_process;

for tv in ev.body.iter_mut() {
match tv {
Expand Down Expand Up @@ -826,7 +845,7 @@ impl<'a> Coalesce<'a> {
// PARENT_INFO
if let (true, Some(parent)) = (self.settings.enrich_parent_info, &parent) {
let mut pi = Record::default();
if let Some(id) = parent.event_id {
if let ProcessKey::Event(id) = parent.key {
let r = pi.put(format!("{}", id));
pi.elems
.push((Key::Literal("ID"), Value::Str(r, Quote::None)));
Expand Down Expand Up @@ -859,7 +878,7 @@ impl<'a> Coalesce<'a> {

if let (true, Some(parent)) = (self.settings.enrich_pid, &parent) {
let mut m = Vec::with_capacity(4);
if let Some(id) = &parent.event_id {
if let ProcessKey::Event(id) = &parent.key {
m.push((
SimpleKey::Literal("EVENT_ID"),
SimpleValue::Str(sc.put(format!("{}", id))),
Expand Down Expand Up @@ -889,11 +908,11 @@ impl<'a> Coalesce<'a> {
sc.elems.push((k, v));
}

if let Some(proc) = proc {
if let (true, Some(event_id)) = (self.settings.enrich_pid, proc.event_id) {
if let Some(proc) = current_process {
if let (true, ProcessKey::Event(id)) = (self.settings.enrich_pid, proc.key) {
let m = Value::Map(vec![(
SimpleKey::Literal("EVENT_ID"),
SimpleValue::Str(sc.put(format!("{}", event_id))),
SimpleValue::Str(sc.put(format!("{}", id))),
)]);
sc.elems.push((Key::Literal("PID"), m));
}
Expand Down Expand Up @@ -1517,7 +1536,6 @@ mod test {
}

#[test]
#[should_panic(expected = "Did not get correct parent for 1697091526.357:2638035")]
fn shell_proc_trace_confusion() {
let s1 = Settings {
proc_label_keys: [b"test-script".to_vec()].into(),
Expand Down
Loading

0 comments on commit 2cdc8df

Please sign in to comment.