From 32995224fc663f2152947a80a109c8eed0d425c0 Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Wed, 30 Oct 2024 14:58:37 -0400 Subject: [PATCH 1/3] Move extension creation into osqueryinstance --- cmd/launcher/extension.go | 68 ---------- cmd/launcher/launcher.go | 15 +-- pkg/osquery/runtime/osqueryinstance.go | 157 ++++++++++++++++------ pkg/osquery/runtime/runner.go | 31 +++-- pkg/osquery/runtime/runtime_posix_test.go | 10 ++ pkg/osquery/runtime/runtime_test.go | 83 ++++++++++-- 6 files changed, 216 insertions(+), 148 deletions(-) delete mode 100644 cmd/launcher/extension.go diff --git a/cmd/launcher/extension.go b/cmd/launcher/extension.go deleted file mode 100644 index cdebe0c60..000000000 --- a/cmd/launcher/extension.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log/slog" - - "github.com/kolide/launcher/ee/agent/types" - "github.com/kolide/launcher/pkg/osquery" - "github.com/kolide/launcher/pkg/service" - "github.com/kolide/launcher/pkg/traces" -) - -func createExtensionRuntime(ctx context.Context, k types.Knapsack, launcherClient service.KolideService) (*osquery.Extension, error) { - ctx, span := traces.StartSpan(ctx) - defer span.End() - - slogger := k.Slogger().With("component", "osquery_extension_creator") - - // create the osquery extension - extOpts := osquery.ExtensionOpts{ - LoggingInterval: k.LoggingInterval(), - RunDifferentialQueriesImmediately: k.EnableInitialRunner(), - } - - // Setting MaxBytesPerBatch is a tradeoff. If it's too low, we - // can never send a large result. But if it's too high, we may - // not be able to send the data over a low bandwidth - // connection before the connection is timed out. - // - // The logic for setting this is spread out. The underlying - // extension defaults to 3mb, to support GRPC's hardcoded 4MB - // limit. But as we're transport aware here. we can set it to - // 5MB for others. - if k.LogMaxBytesPerBatch() != 0 { - if k.Transport() == "grpc" && k.LogMaxBytesPerBatch() > 3 { - slogger.Log(ctx, slog.LevelInfo, - "LogMaxBytesPerBatch is set above the grpc recommended maximum of 3. Expect errors", - "log_max_bytes_per_batch", k.LogMaxBytesPerBatch(), - ) - } - extOpts.MaxBytesPerBatch = k.LogMaxBytesPerBatch() << 20 - } else if k.Transport() == "grpc" { - extOpts.MaxBytesPerBatch = 3 << 20 - } else if k.Transport() != "grpc" { - extOpts.MaxBytesPerBatch = 5 << 20 - } - - // create the extension - ext, err := osquery.NewExtension(ctx, launcherClient, k, extOpts) - if err != nil { - return nil, fmt.Errorf("creating new extension: %w", err) - } - - // Immediately attempt enrollment in the background - go func() { - _, nodeInvalid, err := ext.Enroll(ctx) - if nodeInvalid || err != nil { - slogger.Log(ctx, slog.LevelWarn, - "could not perform initial attempt at enrollment, will retry later", - "node_invalid", nodeInvalid, - "err", err, - ) - } - }() - - return ext, nil -} diff --git a/cmd/launcher/launcher.go b/cmd/launcher/launcher.go index fe850d249..954f777b0 100644 --- a/cmd/launcher/launcher.go +++ b/cmd/launcher/launcher.go @@ -61,9 +61,6 @@ import ( "github.com/kolide/launcher/pkg/service" "github.com/kolide/launcher/pkg/traces" "github.com/kolide/launcher/pkg/traces/exporter" - "github.com/osquery/osquery-go/plugin/config" - "github.com/osquery/osquery-go/plugin/distributed" - osquerylogger "github.com/osquery/osquery-go/plugin/logger" "go.etcd.io/bbolt" ) @@ -362,15 +359,10 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl return fmt.Errorf("error initializing osquery instance history: %w", err) } - // create the osquery extension - extension, err := createExtensionRuntime(ctx, k, client) - if err != nil { - return fmt.Errorf("create extension with runtime: %w", err) - } - runGroup.Add("osqueryExtension", extension.Execute, extension.Shutdown) // create the runner that will launch osquery osqueryRunner := osqueryruntime.New( k, + client, osqueryruntime.WithStdout(kolidelog.NewOsqueryLogAdapter( k.Slogger().With( "component", "osquery", @@ -388,11 +380,6 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl kolidelog.WithLevel(slog.LevelInfo), )), osqueryruntime.WithAugeasLensFunction(augeas.InstallLenses), - osqueryruntime.WithOsqueryExtensionPlugins( - config.NewPlugin(osqueryruntime.KolideSaasExtensionName, extension.GenerateConfigs), - distributed.NewPlugin(osqueryruntime.KolideSaasExtensionName, extension.GetQueries, extension.WriteResults), - osquerylogger.NewPlugin(osqueryruntime.KolideSaasExtensionName, extension.LogString), - ), ) runGroup.Add("osqueryRunner", osqueryRunner.Run, osqueryRunner.Interrupt) diff --git a/pkg/osquery/runtime/osqueryinstance.go b/pkg/osquery/runtime/osqueryinstance.go index 4630aa210..af1ee1859 100644 --- a/pkg/osquery/runtime/osqueryinstance.go +++ b/pkg/osquery/runtime/osqueryinstance.go @@ -17,11 +17,17 @@ import ( "github.com/kolide/kit/ulid" "github.com/kolide/launcher/ee/agent/types" + "github.com/kolide/launcher/ee/gowrapper" "github.com/kolide/launcher/pkg/backoff" + launcherosq "github.com/kolide/launcher/pkg/osquery" "github.com/kolide/launcher/pkg/osquery/runtime/history" "github.com/kolide/launcher/pkg/osquery/table" + "github.com/kolide/launcher/pkg/service" "github.com/kolide/launcher/pkg/traces" "github.com/osquery/osquery-go" + "github.com/osquery/osquery-go/plugin/config" + "github.com/osquery/osquery-go/plugin/distributed" + osquerylogger "github.com/osquery/osquery-go/plugin/logger" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -45,16 +51,6 @@ const ( // https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis type OsqueryInstanceOption func(*OsqueryInstance) -// WithOsqueryExtensionPlugins is a functional option which allows the user to -// declare a number of osquery plugins (ie: config plugin, logger plugin, tables, -// etc) which can be loaded when calling LaunchOsqueryInstance. You can load as -// many plugins as you'd like. -func WithOsqueryExtensionPlugins(plugins ...osquery.OsqueryPlugin) OsqueryInstanceOption { - return func(i *OsqueryInstance) { - i.opts.extensionPlugins = append(i.opts.extensionPlugins, plugins...) - } -} - // WithExtensionSocketPath is a functional option which allows the user to // define the path of the extension socket path that osqueryd will open to // communicate with other processes. @@ -93,12 +89,14 @@ func WithAugeasLensFunction(f func(dir string) error) OsqueryInstanceOption { // OsqueryInstance is the type which represents a currently running instance // of osqueryd. type OsqueryInstance struct { - opts osqueryOptions - knapsack types.Knapsack - slogger *slog.Logger + opts osqueryOptions + knapsack types.Knapsack + slogger *slog.Logger + serviceClient service.KolideService // the following are instance artifacts that are created and held as a result // of launching an osqueryd process errgroup *errgroup.Group + saasExtension *launcherosq.Extension doneCtx context.Context // nolint:containedctx cancel context.CancelFunc cmd *exec.Cmd @@ -173,16 +171,16 @@ type osqueryOptions struct { // the following are options which may or may not be set by the functional // options included by the caller of LaunchOsqueryInstance augeasLensFunc func(dir string) error - extensionPlugins []osquery.OsqueryPlugin extensionSocketPath string stderr io.Writer stdout io.Writer } -func newInstance(knapsack types.Knapsack, opts ...OsqueryInstanceOption) *OsqueryInstance { +func newInstance(knapsack types.Knapsack, serviceClient service.KolideService, opts ...OsqueryInstanceOption) *OsqueryInstance { i := &OsqueryInstance{ - knapsack: knapsack, - slogger: knapsack.Slogger().With("component", "osquery_instance"), + knapsack: knapsack, + slogger: knapsack.Slogger().With("component", "osquery_instance"), + serviceClient: serviceClient, } for _, opt := range opts { @@ -204,6 +202,12 @@ func (i *OsqueryInstance) launch() error { ctx, span := traces.StartSpan(context.Background()) defer span.End() + // Create SaaS extension immediately + if err := i.startKolideSaasExtension(ctx); err != nil { + traces.SetError(span, fmt.Errorf("could not create Kolide SaaS extension: %w", err)) + return fmt.Errorf("creating Kolide SaaS extension: %w", err) + } + // Based on the root directory, calculate the file names of all of the // required osquery artifact files. paths, err := calculateOsqueryPaths(i.knapsack.RootDirectory(), i.opts) @@ -375,19 +379,6 @@ func (i *OsqueryInstance) launch() error { return i.doneCtx.Err() }) - // Here be dragons - // - // There are two thorny issues. First, we "invert" control of - // the osquery process. We don't really know when osquery will - // be running, so we need a bunch of retries on these connections - // - // Second, because launcher supplements the enroll - // information, this Start function must return fast enough - // that osquery can use the registered tables for - // enrollment. *But* there's been a lot of racy behaviors, - // likely due to time spent registering tables, and subtle - // ordering issues. - // Start an extension manager for the extensions that osquery // needs for config/log/etc. i.extensionManagerClient, err = i.StartOsqueryClient(paths) @@ -397,17 +388,21 @@ func (i *OsqueryInstance) launch() error { } span.AddEvent("extension_client_created") - if len(i.opts.extensionPlugins) > 0 { - if err := i.StartOsqueryExtensionManagerServer(KolideSaasExtensionName, paths.extensionSocketPath, i.extensionManagerClient, i.opts.extensionPlugins); err != nil { - i.slogger.Log(ctx, slog.LevelInfo, - "unable to create Kolide SaaS extension server, stopping", - "err", err, - ) - traces.SetError(span, fmt.Errorf("could not create Kolide SaaS extension server: %w", err)) - return fmt.Errorf("could not create Kolide SaaS extension server: %w", err) - } - span.AddEvent("extension_server_created") + kolideSaasPlugins := []osquery.OsqueryPlugin{ + config.NewPlugin(KolideSaasExtensionName, i.saasExtension.GenerateConfigs), + distributed.NewPlugin(KolideSaasExtensionName, i.saasExtension.GetQueries, i.saasExtension.WriteResults), + osquerylogger.NewPlugin(KolideSaasExtensionName, i.saasExtension.LogString), + } + + if err := i.StartOsqueryExtensionManagerServer(KolideSaasExtensionName, paths.extensionSocketPath, i.extensionManagerClient, kolideSaasPlugins); err != nil { + i.slogger.Log(ctx, slog.LevelInfo, + "unable to create Kolide SaaS extension server, stopping", + "err", err, + ) + traces.SetError(span, fmt.Errorf("could not create Kolide SaaS extension server: %w", err)) + return fmt.Errorf("could not create Kolide SaaS extension server: %w", err) } + span.AddEvent("extension_server_created") // Now spawn an extension manager for the tables. We need to // start this one in the background, because the runner.Start @@ -549,6 +544,88 @@ func (i *OsqueryInstance) launch() error { return nil } +// startKolideSaasExtension creates the Kolide SaaS extension, which provides configuration, +// distributed queries, and a log destination for the osquery process. +func (i *OsqueryInstance) startKolideSaasExtension(ctx context.Context) error { + ctx, span := traces.StartSpan(ctx) + defer span.End() + + // create the osquery extension + extOpts := launcherosq.ExtensionOpts{ + LoggingInterval: i.knapsack.LoggingInterval(), + } + + // Setting MaxBytesPerBatch is a tradeoff. If it's too low, we + // can never send a large result. But if it's too high, we may + // not be able to send the data over a low bandwidth + // connection before the connection is timed out. + // + // The logic for setting this is spread out. The underlying + // extension defaults to 3mb, to support GRPC's hardcoded 4MB + // limit. But as we're transport aware here. we can set it to + // 5MB for others. + if i.knapsack.LogMaxBytesPerBatch() != 0 { + if i.knapsack.Transport() == "grpc" && i.knapsack.LogMaxBytesPerBatch() > 3 { + i.slogger.Log(ctx, slog.LevelInfo, + "LogMaxBytesPerBatch is set above the grpc recommended maximum of 3. Expect errors", + "log_max_bytes_per_batch", i.knapsack.LogMaxBytesPerBatch(), + ) + } + extOpts.MaxBytesPerBatch = i.knapsack.LogMaxBytesPerBatch() << 20 + } else if i.knapsack.Transport() == "grpc" { + extOpts.MaxBytesPerBatch = 3 << 20 + } else if i.knapsack.Transport() != "grpc" { + extOpts.MaxBytesPerBatch = 5 << 20 + } + + // Create the extension + var err error + i.saasExtension, err = launcherosq.NewExtension(ctx, i.serviceClient, i.knapsack, extOpts) + if err != nil { + return fmt.Errorf("creating new extension: %w", err) + } + + // Immediately attempt enrollment in the background. We don't want to put this in our errgroup + // because we don't need to shut down the whole instance if we can't enroll -- we can always + // retry later. + gowrapper.Go(ctx, i.slogger, func() { + _, nodeInvalid, err := i.saasExtension.Enroll(ctx) + if nodeInvalid || err != nil { + i.slogger.Log(ctx, slog.LevelWarn, + "could not perform initial attempt at enrollment, will retry later", + "node_invalid", nodeInvalid, + "err", err, + ) + } + }, func(r any) {}) + + // Run extension + i.errgroup.Go(func() error { + defer i.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "saas extension execute", + ) + if err := i.saasExtension.Execute(); err != nil { + return fmt.Errorf("kolide_grpc extension returned error: %w", err) + } + return nil + }) + + // Register shutdown group for extension + i.errgroup.Go(func() error { + defer i.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "saas extension cleanup", + ) + <-i.doneCtx.Done() + + i.saasExtension.Shutdown(i.doneCtx.Err()) + return nil + }) + + return nil +} + // osqueryFilePaths is a struct which contains the relevant file paths needed to // launch an osqueryd instance. type osqueryFilePaths struct { diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 4e1e47285..a463d66ab 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -9,6 +9,7 @@ import ( "github.com/kolide/launcher/ee/agent/flags/keys" "github.com/kolide/launcher/ee/agent/types" + "github.com/kolide/launcher/pkg/service" ) const ( @@ -27,22 +28,24 @@ const ( ) type Runner struct { - instance *OsqueryInstance - instanceLock sync.Mutex - slogger *slog.Logger - knapsack types.Knapsack - shutdown chan struct{} - interrupted bool - opts []OsqueryInstanceOption + instance *OsqueryInstance + instanceLock sync.Mutex + slogger *slog.Logger + knapsack types.Knapsack + serviceClient service.KolideService + shutdown chan struct{} + interrupted bool + opts []OsqueryInstanceOption } -func New(k types.Knapsack, opts ...OsqueryInstanceOption) *Runner { +func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryInstanceOption) *Runner { runner := &Runner{ - instance: newInstance(k, opts...), - slogger: k.Slogger().With("component", "osquery_runner"), - knapsack: k, - shutdown: make(chan struct{}), - opts: opts, + instance: newInstance(k, serviceClient, opts...), + slogger: k.Slogger().With("component", "osquery_runner"), + knapsack: k, + serviceClient: serviceClient, + shutdown: make(chan struct{}), + opts: opts, } k.RegisterChangeObserver(runner, @@ -104,7 +107,7 @@ func (r *Runner) Run() error { } r.instanceLock.Lock() - r.instance = newInstance(r.knapsack, r.opts...) + r.instance = newInstance(r.knapsack, r.serviceClient, r.opts...) if err := r.instance.launch(); err != nil { r.slogger.Log(context.TODO(), slog.LevelWarn, "fatal error restarting instance, shutting down", diff --git a/pkg/osquery/runtime/runtime_posix_test.go b/pkg/osquery/runtime/runtime_posix_test.go index 7b7bdd169..82ad06275 100644 --- a/pkg/osquery/runtime/runtime_posix_test.go +++ b/pkg/osquery/runtime/runtime_posix_test.go @@ -53,10 +53,15 @@ func TestOsquerySlowStart(t *testing.T) { slogger := multislogger.New(slog.NewJSONHandler(&logBytes, &slog.HandlerOptions{Level: slog.LevelDebug})) k.On("Slogger").Return(slogger.Logger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) runner := New( k, + mockServiceClient(), WithStartFunc(func(cmd *exec.Cmd) error { err := cmd.Start() if err != nil { @@ -102,11 +107,16 @@ func TestExtensionSocketPath(t *testing.T) { k.On("OsqueryVerbose").Return(true).Maybe() k.On("OsqueryFlags").Return([]string{}).Maybe() k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) extensionSocketPath := filepath.Join(rootDirectory, "sock") runner := New( k, + mockServiceClient(), WithExtensionSocketPath(extensionSocketPath), ) go runner.Run() diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 30b2540b3..c92e0d0e7 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -27,7 +27,11 @@ import ( "github.com/kolide/launcher/pkg/log/multislogger" "github.com/kolide/launcher/pkg/osquery/runtime/history" "github.com/kolide/launcher/pkg/packaging" + "github.com/kolide/launcher/pkg/service" + servicemock "github.com/kolide/launcher/pkg/service/mock" "github.com/kolide/launcher/pkg/threadsafebuffer" + "github.com/osquery/osquery-go/plugin/distributed" + "github.com/osquery/osquery-go/plugin/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -131,7 +135,7 @@ func TestCreateOsqueryCommand(t *testing.T) { k.On("OsqueryFlags").Return([]string{}) k.On("Slogger").Return(multislogger.NewNopLogger()) - i := newInstance(k) + i := newInstance(k, mockServiceClient()) i.opts = *osqOpts i.knapsack = k @@ -155,7 +159,7 @@ func TestCreateOsqueryCommandWithFlags(t *testing.T) { k.On("OsqueryVerbose").Return(true) k.On("Slogger").Return(multislogger.NewNopLogger()) - i := newInstance(k) + i := newInstance(k, mockServiceClient()) i.opts = *osqOpts i.knapsack = k @@ -191,7 +195,7 @@ func TestCreateOsqueryCommand_SetsEnabledWatchdogSettingsAppropriately(t *testin k.On("OsqueryVerbose").Return(true) k.On("OsqueryFlags").Return([]string{}) - i := newInstance(k) + i := newInstance(k, mockServiceClient()) i.opts = *osqOpts i.knapsack = k @@ -243,7 +247,7 @@ func TestCreateOsqueryCommand_SetsDisabledWatchdogSettingsAppropriately(t *testi k.On("OsqueryVerbose").Return(true) k.On("OsqueryFlags").Return([]string{}) - i := newInstance(k) + i := newInstance(k, mockServiceClient()) i.opts = *osqOpts i.knapsack = k @@ -323,9 +327,13 @@ func TestBadBinaryPath(t *testing.T) { k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryVerbose").Return(true) k.On("OsqueryFlags").Return([]string{}) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) - runner := New(k) + runner := New(k, mockServiceClient()) assert.Error(t, runner.Run()) k.AssertExpectations(t) @@ -350,9 +358,13 @@ func TestWithOsqueryFlags(t *testing.T) { k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{"verbose=false"}) k.On("OsqueryVerbose").Return(false) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) - runner := New(k) + runner := New(k, mockServiceClient()) go runner.Run() waitHealthy(t, runner, &logBytes) waitShutdown(t, runner, &logBytes) @@ -383,10 +395,14 @@ func TestFlagsChanged(t *testing.T) { k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{"verbose=false"}) k.On("OsqueryVerbose").Return(false) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) // Start the runner - runner := New(k) + runner := New(k, mockServiceClient()) go runner.Run() // Wait for the instance to start @@ -513,9 +529,13 @@ func TestSimplePath(t *testing.T) { k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{}) k.On("OsqueryVerbose").Return(true) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) - runner := New(k) + runner := New(k, mockServiceClient()) go runner.Run() waitHealthy(t, runner, &logBytes) @@ -545,9 +565,13 @@ func TestMultipleShutdowns(t *testing.T) { k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{}) k.On("OsqueryVerbose").Return(true) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) - runner := New(k) + runner := New(k, mockServiceClient()) go runner.Run() waitHealthy(t, runner, &logBytes) @@ -576,9 +600,13 @@ func TestOsqueryDies(t *testing.T) { k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{}) k.On("OsqueryVerbose").Return(true) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) - runner := New(k) + runner := New(k, mockServiceClient()) go runner.Run() waitHealthy(t, runner, &logBytes) @@ -607,7 +635,7 @@ func TestNotStarted(t *testing.T) { k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() k.On("Slogger").Return(multislogger.NewNopLogger()) - runner := New(k) + runner := New(k, mockServiceClient()) assert.Error(t, runner.Healthy()) assert.NoError(t, runner.Shutdown()) @@ -674,9 +702,13 @@ func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, logBytes *threa k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{}).Maybe() k.On("OsqueryVerbose").Return(true).Maybe() + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() setUpMockStores(t, k) - runner = New(k) + runner = New(k, mockServiceClient()) go runner.Run() waitHealthy(t, runner, logBytes) @@ -698,5 +730,32 @@ func setUpMockStores(t *testing.T, k *typesMocks.Knapsack) { k.On("ServerProvidedDataStore").Return(inmemory.NewStore()).Maybe() k.On("AgentFlagsStore").Return(inmemory.NewStore()).Maybe() k.On("AutoupdateErrorsStore").Return(inmemory.NewStore()).Maybe() + k.On("StatusLogsStore").Return(inmemory.NewStore()).Maybe() + k.On("ResultLogsStore").Return(inmemory.NewStore()).Maybe() k.On("BboltDB").Return(storageci.SetupDB(t)).Maybe() } + +// mockServiceClient returns a mock KolideService that returns the minimum possible response +// for all methods. +func mockServiceClient() *servicemock.KolideService { + return &servicemock.KolideService{ + RequestEnrollmentFunc: func(ctx context.Context, enrollSecret, hostIdentifier string, details service.EnrollmentDetails) (string, bool, error) { + return "testnodekey", false, nil + }, + RequestConfigFunc: func(ctx context.Context, nodeKey string) (string, bool, error) { + return "", false, errors.New("transport") + }, + PublishLogsFunc: func(ctx context.Context, nodeKey string, logType logger.LogType, logs []string) (string, string, bool, error) { + return "", "", false, nil + }, + RequestQueriesFunc: func(ctx context.Context, nodeKey string) (*distributed.GetQueriesResult, bool, error) { + return nil, false, errors.New("transport") + }, + PublishResultsFunc: func(ctx context.Context, nodeKey string, results []distributed.Result) (string, string, bool, error) { + return "", "", false, nil + }, + CheckHealthFunc: func(ctx context.Context) (int32, error) { + return 0, nil + }, + } +} From 4fb41aaff4918743d337b19b81a67f55a2e3f084 Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Wed, 30 Oct 2024 15:19:15 -0400 Subject: [PATCH 2/3] Standardize adding routines to errgroup --- pkg/osquery/runtime/osqueryinstance.go | 130 +++++++++---------------- 1 file changed, 47 insertions(+), 83 deletions(-) diff --git a/pkg/osquery/runtime/osqueryinstance.go b/pkg/osquery/runtime/osqueryinstance.go index af1ee1859..8c5bf7a5d 100644 --- a/pkg/osquery/runtime/osqueryinstance.go +++ b/pkg/osquery/runtime/osqueryinstance.go @@ -322,19 +322,13 @@ func (i *OsqueryInstance) launch() error { // This loop runs in the background when the process was // successfully started. ("successful" is independent of exit // code. eg: this runs if we could exec. Failure to exec is above.) - i.errgroup.Go(func() error { - defer i.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "monitor osquery process", - ) - + i.addGoroutineToErrgroup(ctx, "monitor_osquery_process", func() error { err := i.cmd.Wait() switch { case err == nil, isExitOk(err): i.slogger.Log(ctx, slog.LevelInfo, "osquery exited successfully", ) - // TODO: should this return nil? return errors.New("osquery process exited successfully") default: msgPairs := append( @@ -351,16 +345,7 @@ func (i *OsqueryInstance) launch() error { }) // Kill osquery process on shutdown - i.errgroup.Go(func() error { - defer i.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "kill osquery process on shutdown", - ) - - <-i.doneCtx.Done() - i.slogger.Log(ctx, slog.LevelDebug, - "starting osquery shutdown", - ) + i.addShutdownGoroutineToErrgroup(ctx, "kill_osquery_process", func() error { if i.cmd.Process != nil { // kill osqueryd and children if err := killProcessGroup(i.cmd); err != nil { @@ -411,12 +396,7 @@ func (i *OsqueryInstance) launch() error { // // TODO: Consider chunking, if we find we can only have so // many tables per extension manager - i.errgroup.Go(func() error { - defer i.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "kolide tables extension manager server launch", - ) - + i.addGoroutineToErrgroup(ctx, "kolide_extension_launch", func() error { plugins := table.PlatformTables(i.knapsack, i.knapsack.Slogger().With("component", "platform_tables"), currentOsquerydBinaryPath) plugins = append(plugins, table.LauncherTables(i.knapsack)...) @@ -444,12 +424,7 @@ func (i *OsqueryInstance) launch() error { } // Health check on interval - i.errgroup.Go(func() error { - defer i.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "health check on interval", - ) - + i.addGoroutineToErrgroup(ctx, "healthcheck", func() error { if i.knapsack != nil && i.knapsack.OsqueryHealthcheckStartupDelay() != 0*time.Second { i.slogger.Log(ctx, slog.LevelDebug, "entering delay before starting osquery healthchecks", @@ -517,13 +492,7 @@ func (i *OsqueryInstance) launch() error { }) // Clean up PID file on shutdown - i.errgroup.Go(func() error { - defer i.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "cleanup PID file", - ) - - <-i.doneCtx.Done() + i.addShutdownGoroutineToErrgroup(ctx, "remove_pid_file", func() error { // We do a couple retries -- on Windows, the PID file may still be in use // and therefore unable to be removed. if err := backoff.WaitFor(func() error { @@ -600,11 +569,7 @@ func (i *OsqueryInstance) startKolideSaasExtension(ctx context.Context) error { }, func(r any) {}) // Run extension - i.errgroup.Go(func() error { - defer i.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "saas extension execute", - ) + i.addGoroutineToErrgroup(ctx, "saas_extension_execute", func() error { if err := i.saasExtension.Execute(); err != nil { return fmt.Errorf("kolide_grpc extension returned error: %w", err) } @@ -612,18 +577,50 @@ func (i *OsqueryInstance) startKolideSaasExtension(ctx context.Context) error { }) // Register shutdown group for extension + i.addShutdownGoroutineToErrgroup(ctx, "saas_extension_cleanup", func() error { + i.saasExtension.Shutdown(i.doneCtx.Err()) + return i.doneCtx.Err() + }) + + return nil +} + +// addGoroutineToErrgroup adds the given goroutine to the errgroup, ensuring that we log its start and exit. +func (i *OsqueryInstance) addGoroutineToErrgroup(ctx context.Context, goroutineName string, goroutine func() error) { i.errgroup.Go(func() error { defer i.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "saas extension cleanup", + "exiting goroutine in errgroup", + "goroutine_name", goroutineName, ) - <-i.doneCtx.Done() - i.saasExtension.Shutdown(i.doneCtx.Err()) - return nil + i.slogger.Log(ctx, slog.LevelInfo, + "starting goroutine in errgroup", + "goroutine_name", goroutineName, + ) + + return goroutine() }) +} - return nil +// addShutdownGoroutineToErrgroup adds the given goroutine to the errgroup, ensuring that we log its start and exit. +// The goroutine will not execute until the instance has received a signal to exit. +func (i *OsqueryInstance) addShutdownGoroutineToErrgroup(ctx context.Context, goroutineName string, goroutine func() error) { + i.errgroup.Go(func() error { + defer i.slogger.Log(ctx, slog.LevelInfo, + "exiting shutdown goroutine in errgroup", + "goroutine_name", goroutineName, + ) + + // Wait for errgroup to exit + <-i.doneCtx.Done() + + i.slogger.Log(ctx, slog.LevelInfo, + "starting shutdown goroutine in errgroup", + "goroutine_name", goroutineName, + ) + + return goroutine() + }) } // osqueryFilePaths is a struct which contains the relevant file paths needed to @@ -773,11 +770,6 @@ func (i *OsqueryInstance) StartOsqueryClient(paths *osqueryFilePaths) (*osquery. // startOsqueryExtensionManagerServer takes a set of plugins, creates // an osquery.NewExtensionManagerServer for them, and then starts it. func (i *OsqueryInstance) StartOsqueryExtensionManagerServer(name string, socketPath string, client *osquery.ExtensionManagerClient, plugins []osquery.OsqueryPlugin) error { - i.slogger.Log(context.TODO(), slog.LevelDebug, - "starting startOsqueryExtensionManagerServer", - "extension_name", name, - ) - var extensionManagerServer *osquery.ExtensionManagerServer if err := backoff.WaitFor(func() error { var newErr error @@ -789,11 +781,6 @@ func (i *OsqueryInstance) StartOsqueryExtensionManagerServer(name string, socket ) return newErr }, socketOpenTimeout, socketOpenInterval); err != nil { - i.slogger.Log(context.TODO(), slog.LevelDebug, - "could not create an extension server", - "extension_name", name, - "err", err, - ) return fmt.Errorf("could not create an extension server: %w", err) } @@ -805,13 +792,7 @@ func (i *OsqueryInstance) StartOsqueryExtensionManagerServer(name string, socket i.extensionManagerServers = append(i.extensionManagerServers, extensionManagerServer) // Start! - i.errgroup.Go(func() error { - defer i.slogger.Log(context.TODO(), slog.LevelDebug, - "exiting errgroup", - "errgroup", "run extension manager server", - "extension_name", name, - ) - + i.addGoroutineToErrgroup(context.TODO(), name, func() error { if err := extensionManagerServer.Start(); err != nil { i.slogger.Log(context.TODO(), slog.LevelInfo, "extension manager server startup got error", @@ -820,24 +801,12 @@ func (i *OsqueryInstance) StartOsqueryExtensionManagerServer(name string, socket ) return fmt.Errorf("running extension server: %w", err) } + return errors.New("extension manager server exited") }) // register a shutdown routine - i.errgroup.Go(func() error { - defer i.slogger.Log(context.TODO(), slog.LevelDebug, - "exiting errgroup", - "errgroup", "shut down extension manager server", - "extension_name", name, - ) - - <-i.doneCtx.Done() - - i.slogger.Log(context.TODO(), slog.LevelDebug, - "starting extension shutdown", - "extension_name", name, - ) - + i.addShutdownGoroutineToErrgroup(context.TODO(), fmt.Sprintf("%s_cleanup", name), func() error { if err := extensionManagerServer.Shutdown(context.TODO()); err != nil { i.slogger.Log(context.TODO(), slog.LevelInfo, "got error while shutting down extension server", @@ -848,11 +817,6 @@ func (i *OsqueryInstance) StartOsqueryExtensionManagerServer(name string, socket return i.doneCtx.Err() }) - i.slogger.Log(context.TODO(), slog.LevelDebug, - "clean finish startOsqueryExtensionManagerServer", - "extension_name", name, - ) - return nil } From f5403d4779c03cd6f5e9dee331b4a7365c7f5afb Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Wed, 30 Oct 2024 15:25:20 -0400 Subject: [PATCH 3/3] Combine extensions --- pkg/osquery/runtime/osqueryinstance.go | 33 ++++---------------------- 1 file changed, 4 insertions(+), 29 deletions(-) diff --git a/pkg/osquery/runtime/osqueryinstance.go b/pkg/osquery/runtime/osqueryinstance.go index 8c5bf7a5d..3a131f721 100644 --- a/pkg/osquery/runtime/osqueryinstance.go +++ b/pkg/osquery/runtime/osqueryinstance.go @@ -36,13 +36,11 @@ import ( const ( // KolideSaasExtensionName is the name of the extension that provides the config, - // distributed queries, and log destination for the osquery process. This extension + // distributed queries, and log destination for the osquery process. It also provides + // provides Kolide's additional tables: platform tables and launcher tables. This extension // is required for osquery startup. It is called kolide_grpc for mostly historic reasons; // communication with Kolide SaaS happens over JSONRPC. KolideSaasExtensionName = "kolide_grpc" - // kolideTablesExtensionName is the name of the extension that provides Kolide's additional - // tables: platform tables and launcher tables. - kolideTablesExtensionName = "kolide" ) // OsqueryInstanceOption is a functional option pattern for defining how an @@ -378,6 +376,8 @@ func (i *OsqueryInstance) launch() error { distributed.NewPlugin(KolideSaasExtensionName, i.saasExtension.GetQueries, i.saasExtension.WriteResults), osquerylogger.NewPlugin(KolideSaasExtensionName, i.saasExtension.LogString), } + kolideSaasPlugins = append(kolideSaasPlugins, table.PlatformTables(i.knapsack, i.knapsack.Slogger().With("component", "platform_tables"), currentOsquerydBinaryPath)...) + kolideSaasPlugins = append(kolideSaasPlugins, table.LauncherTables(i.knapsack)...) if err := i.StartOsqueryExtensionManagerServer(KolideSaasExtensionName, paths.extensionSocketPath, i.extensionManagerClient, kolideSaasPlugins); err != nil { i.slogger.Log(ctx, slog.LevelInfo, @@ -389,31 +389,6 @@ func (i *OsqueryInstance) launch() error { } span.AddEvent("extension_server_created") - // Now spawn an extension manager for the tables. We need to - // start this one in the background, because the runner.Start - // function needs to return promptly enough for osquery to use - // it to enroll. Very racy - // - // TODO: Consider chunking, if we find we can only have so - // many tables per extension manager - i.addGoroutineToErrgroup(ctx, "kolide_extension_launch", func() error { - plugins := table.PlatformTables(i.knapsack, i.knapsack.Slogger().With("component", "platform_tables"), currentOsquerydBinaryPath) - plugins = append(plugins, table.LauncherTables(i.knapsack)...) - - if len(plugins) == 0 { - return nil - } - - if err := i.StartOsqueryExtensionManagerServer(kolideTablesExtensionName, paths.extensionSocketPath, i.extensionManagerClient, plugins); err != nil { - i.slogger.Log(ctx, slog.LevelWarn, - "unable to create Kolide tables extension server, stopping", - "err", err, - ) - return fmt.Errorf("could not create Kolide tables extension server: %w", err) - } - return nil - }) - // All done with osquery setup! Mark instance as connected, then proceed // with setting up remaining errgroups. if err := i.stats.Connected(i); err != nil {