Skip to content

Commit

Permalink
fix(tests): fix goroutines leakage in integration tests (#4047)
Browse files Browse the repository at this point in the history
Fix tests so they will make sure to close channels properly and clean goroutines.
  • Loading branch information
AlonZivony authored May 16, 2024
1 parent a83133e commit cd7ed0d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 26 deletions.
13 changes: 9 additions & 4 deletions tests/integration/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ import (
"unsafe"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"gotest.tools/assert"

"github.com/aquasecurity/tracee/tests/testutils"
)

func Test_TraceeCapture(t *testing.T) {
// Make sure we don't leak any goroutines since we run Tracee many times in this test.
// If a test case fails, ignore the leak since it's probably caused by the aborted test.
defer goleak.VerifyNone(t)

if !testutils.IsSudoCmdAvailableForThisUser() {
t.Skip("skipping: sudo command is not available for this user")
}
Expand Down Expand Up @@ -90,12 +95,12 @@ func Test_TraceeCapture(t *testing.T) {
err := tc.test(t, captureDir, homeDir)
if err != nil {
t.Errorf("test %s failed: %v", tc.name, err)
runErr = running.Stop() // stop tracee
require.NoError(t, runErr)
cmdErrs := running.Stop() // stop tracee
require.Empty(t, cmdErrs)
t.Fail()
}
runErr = running.Stop() // stop tracee
require.NoError(t, runErr)
cmdErrs := running.Stop() // stop tracee
require.Empty(t, cmdErrs)
})
}
}
Expand Down
13 changes: 8 additions & 5 deletions tests/perftests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/aquasecurity/tracee/tests/testutils"
)
Expand Down Expand Up @@ -97,6 +98,10 @@ func checkIfPprofExist() error {

// TestMetricsExist tests if the metrics endpoint returns all metrics.
func TestMetricsandPprofExist(t *testing.T) {
// Make sure we don't leak any goroutines since we run Tracee many times in this test.
// If a test case fails, ignore the leak since it's probably caused by the aborted test.
defer goleak.VerifyNone(t)

if !testutils.IsSudoCmdAvailableForThisUser() {
t.Skip("skipping: sudo command is not available for this user")
}
Expand All @@ -108,11 +113,6 @@ func TestMetricsandPprofExist(t *testing.T) {
ready, runErr := running.Start(testutils.TraceeDefaultStartupTimeout)
require.NoError(t, runErr)

t.Cleanup(func() {
runErr = running.Stop() // stop tracee
require.NoError(t, runErr)
})

r := <-ready // block until tracee is ready (or not)
switch r {
case testutils.TraceeFailed:
Expand All @@ -130,4 +130,7 @@ func TestMetricsandPprofExist(t *testing.T) {
// check if all metrics exist
require.NoError(t, metricsErr)
require.NoError(t, pprofErr)

cmdErrs := running.Stop() // stop tracee
require.Empty(t, cmdErrs)
}
36 changes: 24 additions & 12 deletions tests/testutils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func ExecPinnedCmdWithTimeout(command string, timeout time.Duration) (int, error
// ExecCmdBgWithSudoAndCtx executes a command with sudo in the background, and returns the PID of
// the process and a channel to wait for the command to exit (Check RunningTracee object about how
// to use this).
func ExecCmdBgWithSudoAndCtx(ctx context.Context, command string) (int, chan error) {
// The function will return an error if the command execution fails
func ExecCmdBgWithSudoAndCtx(ctx context.Context, command string) (int, chan error, error) {
cmdStatus := make(chan error)

// Use sudo to raise privileges (sysattrs require capabilities).
Expand All @@ -107,20 +108,22 @@ func ExecCmdBgWithSudoAndCtx(ctx context.Context, command string) (int, chan err
command, args, err := ParseCmd(command)
if err != nil {
fmt.Printf("Failed to parse command\n")
cmdStatus <- &failedToParseCmd{command: command, err: err}
return -1, cmdStatus
close(cmdStatus)
return -1, cmdStatus, &failedToParseCmd{command: command, err: err}
}

cmd := exec.Command(command, args...) // CommandContext can't be used due to sudo privileges
cmd.Stderr = os.Stderr

pid := atomic.Int64{}
wg := sync.WaitGroup{}
commandStartWG := sync.WaitGroup{}
commandEndWG := sync.WaitGroup{}

// Start the command in a separate, pinned and locked goroutine (to a single CPU and OS thread).
// TODO: Adjust here so amount of CPUs is controlled ?

wg.Add(1)
var commandStartErr error
commandStartWG.Add(1)
go func(pid *atomic.Int64) {
// Will make the command to inherit the current process' CPU affinity.
_ = PinProccessToCPU() // pin this goroutine to a specific CPU
Expand All @@ -131,22 +134,30 @@ func ExecCmdBgWithSudoAndCtx(ctx context.Context, command string) (int, chan err
if err != nil {
// This isn't a cmd exec failed error, but rather a cmd start failed error.
pid.Store(-1)
cmdStatus <- &failedToStartCommand{command: command, err: err}
commandStartErr = &failedToStartCommand{command: command, err: err}
} else {
commandEndWG.Add(1)
go func() {
// Note: cmd exec failed errors are async and happen here on cmd.Wait().
pid.Store(int64(cmd.Process.Pid)) // store PID
err := cmd.Wait() // block until command exits
pid.Store(-1) // so PID is non positive on failed executions
cmdStatus <- err // signal command exited
if err != nil {
pid.Store(-1) // so PID is non-positive on failed executions
cmdStatus <- err
}
commandEndWG.Done()
}()
}

time.Sleep(1 * time.Second) // wait 1 sec for the command to start (or not)
wg.Done() // signal command started
commandStartWG.Done() // signal command started
}(&pid)

wg.Wait() // synchronize: wait for 1 sec feedback (cmd has started or not)
commandStartWG.Wait() // synchronize: wait for 1 sec feedback (cmd has started or not)
if commandStartErr != nil {
close(cmdStatus)
return -1, cmdStatus, commandStartErr
}

// Kill the command if the context is canceled (and signal that it was killed).

Expand All @@ -167,11 +178,12 @@ func ExecCmdBgWithSudoAndCtx(ctx context.Context, command string) (int, chan err
}
}
}
cmdStatus <- nil // signal command exited
commandEndWG.Wait()
close(cmdStatus) // signal command exited
}(&pid)

// Return the PID (or -1) and the channel to wait for the command to exit.
return int(pid.Load()), cmdStatus
return int(pid.Load()), cmdStatus, nil
}

// DiscoverChildProcesses discovers all child processes of a given PID.
Expand Down
13 changes: 8 additions & 5 deletions tests/testutils/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ func (r *RunningTracee) Start(timeout time.Duration) (<-chan TraceeStatus, error
goto exit
}

r.pid, r.cmdStatus = ExecCmdBgWithSudoAndCtx(r.ctx, r.cmdLine)
if r.pid < 0 {
err = <-r.cmdStatus // receive error from the command
r.pid, r.cmdStatus, err = ExecCmdBgWithSudoAndCtx(r.ctx, r.cmdLine)
if err != nil {
imReady(TraceeFailed) // ready: failed
goto exit
}
Expand All @@ -104,13 +103,17 @@ exit:
}

// Stop stops the tracee process.
func (r *RunningTracee) Stop() error {
func (r *RunningTracee) Stop() []error {
if r.pid == 0 {
return nil // cmd was never started
}

r.cancel()
return <-r.cmdStatus // will receive nil if the process exited successfully
var errs []error
for err := range r.cmdStatus {
errs = append(errs, err)
}
return errs
}

// IsReady checks if the tracee process is ready.
Expand Down

0 comments on commit cd7ed0d

Please sign in to comment.