diff --git a/pkg/osquery/runtime/osqueryinstance.go b/pkg/osquery/runtime/osqueryinstance.go index 573322b7b..8fea1616d 100644 --- a/pkg/osquery/runtime/osqueryinstance.go +++ b/pkg/osquery/runtime/osqueryinstance.go @@ -19,8 +19,14 @@ import ( "github.com/kolide/launcher/ee/agent/types" "github.com/kolide/launcher/pkg/backoff" "github.com/kolide/launcher/pkg/osquery/runtime/history" + "github.com/kolide/launcher/pkg/osquery/table" "github.com/kolide/launcher/pkg/traces" "github.com/osquery/osquery-go" + "github.com/osquery/osquery-go/plugin/config" + "github.com/osquery/osquery-go/plugin/distributed" + osquerylogger "github.com/osquery/osquery-go/plugin/logger" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) @@ -225,12 +231,16 @@ func (o osqueryOptions) requiredExtensions() []string { return requiredExtensions } -func newInstance(knapsack types.Knapsack) *OsqueryInstance { +func newInstance(knapsack types.Knapsack, opts ...OsqueryInstanceOption) *OsqueryInstance { i := &OsqueryInstance{ knapsack: knapsack, slogger: knapsack.Slogger().With("component", "osquery_instance"), } + for _, opt := range opts { + opt(i) + } + ctx, cancel := context.WithCancel(context.Background()) i.cancel = cancel i.errgroup, i.doneCtx = errgroup.WithContext(ctx) @@ -242,6 +252,388 @@ func newInstance(knapsack types.Knapsack) *OsqueryInstance { return i } +func (o *OsqueryInstance) launch() error { + ctx, span := traces.StartSpan(context.Background()) + defer span.End() + + // Based on the root directory, calculate the file names of all of the + // required osquery artifact files. + paths, err := calculateOsqueryPaths(o.knapsack.RootDirectory(), o.opts) + if err != nil { + traces.SetError(span, fmt.Errorf("could not calculate osquery file paths: %w", err)) + return fmt.Errorf("could not calculate osquery file paths: %w", err) + } + + // Populate augeas lenses, if requested + if o.opts.augeasLensFunc != nil { + if err := os.MkdirAll(paths.augeasPath, 0755); err != nil { + traces.SetError(span, fmt.Errorf("making augeas lenses directory: %w", err)) + return fmt.Errorf("making augeas lenses directory: %w", err) + } + + if err := o.opts.augeasLensFunc(paths.augeasPath); err != nil { + traces.SetError(span, fmt.Errorf("setting up augeas lenses: %w", err)) + return fmt.Errorf("setting up augeas lenses: %w", err) + } + } + + // If a config plugin has not been set by the caller, then it is likely + // that the instance will just be used for executing queries, so we + // will use a minimal config plugin that basically is a no-op. + if o.opts.configPluginFlag == "" { + generateConfigs := func(ctx context.Context) (map[string]string, error) { + return map[string]string{}, nil + } + o.opts.extensionPlugins = append(o.opts.extensionPlugins, config.NewPlugin("internal_noop", generateConfigs)) + o.opts.configPluginFlag = "internal_noop" + } + + // If a logger plugin has not been set by the caller, we set a logger + // plugin that outputs logs to the default application logger. + if o.opts.loggerPluginFlag == "" { + logString := func(ctx context.Context, typ osquerylogger.LogType, logText string) error { + return nil + } + o.opts.extensionPlugins = append(o.opts.extensionPlugins, osquerylogger.NewPlugin("internal_noop", logString)) + o.opts.loggerPluginFlag = "internal_noop" + } + + // If a distributed plugin has not been set by the caller, we set a + // distributed plugin that returns no queries. + if o.opts.distributedPluginFlag == "" { + getQueries := func(ctx context.Context) (*distributed.GetQueriesResult, error) { + return &distributed.GetQueriesResult{}, nil + } + writeResults := func(ctx context.Context, results []distributed.Result) error { + return nil + } + o.opts.extensionPlugins = append(o.opts.extensionPlugins, distributed.NewPlugin("internal_noop", getQueries, writeResults)) + o.opts.distributedPluginFlag = "internal_noop" + } + + // The knapsack will retrieve the correct version of osqueryd from the download library if available. + // If not available, it will fall back to the configured installed version of osqueryd. + currentOsquerydBinaryPath := o.knapsack.LatestOsquerydPath(ctx) + span.AddEvent("got_osqueryd_binary_path", trace.WithAttributes(attribute.String("path", currentOsquerydBinaryPath))) + + // Now that we have accepted options from the caller and/or determined what + // they should be due to them not being set, we are ready to create and start + // the *exec.Cmd instance that will run osqueryd. + o.cmd, err = o.createOsquerydCommand(currentOsquerydBinaryPath, paths) + if err != nil { + traces.SetError(span, fmt.Errorf("couldn't create osqueryd command: %w", err)) + return fmt.Errorf("couldn't create osqueryd command: %w", err) + } + + // Assign a PGID that matches the PID. This lets us kill the entire process group later. + o.cmd.SysProcAttr = setpgid() + + o.slogger.Log(ctx, slog.LevelInfo, + "launching osqueryd", + "path", o.cmd.Path, + "args", strings.Join(o.cmd.Args, " "), + ) + + // remove any socket already at the extension socket path to ensure + // that it's not left over from a previous instance + if err := os.RemoveAll(paths.extensionSocketPath); err != nil { + o.slogger.Log(ctx, slog.LevelWarn, + "error removing osquery extension socket", + "path", paths.extensionSocketPath, + "err", err, + ) + } + + // Launch osquery process (async) + err = o.startFunc(o.cmd) + if err != nil { + // Failure here is indicative of a failure to exec. A missing + // binary? Bad permissions? TODO: Consider catching errors in the + // update system and falling back to an earlier version. + msgPairs := append( + getOsqueryInfoForLog(o.cmd.Path), + "err", err, + ) + + o.slogger.Log(ctx, slog.LevelWarn, + "fatal error starting osquery -- could not exec.", + msgPairs..., + ) + traces.SetError(span, fmt.Errorf("fatal error starting osqueryd process: %w", err)) + return fmt.Errorf("fatal error starting osqueryd process: %w", err) + } + + span.AddEvent("launched_osqueryd") + o.slogger.Log(ctx, slog.LevelInfo, + "launched osquery process", + "osqueryd_pid", o.cmd.Process.Pid, + ) + + // wait for osquery to create the socket before moving on, + // this is intended to serve as a kind of health check + // for osquery, if it's started successfully it will create + // a socket + if err := backoff.WaitFor(func() error { + _, err := os.Stat(paths.extensionSocketPath) + if err != nil { + o.slogger.Log(ctx, slog.LevelDebug, + "osquery extension socket not created yet ... will retry", + "path", paths.extensionSocketPath, + ) + } + return err + }, 1*time.Minute, 1*time.Second); err != nil { + traces.SetError(span, fmt.Errorf("timeout waiting for osqueryd to create socket at %s: %w", paths.extensionSocketPath, err)) + return fmt.Errorf("timeout waiting for osqueryd to create socket at %s: %w", paths.extensionSocketPath, err) + } + + span.AddEvent("socket_created") + o.slogger.Log(ctx, slog.LevelDebug, + "osquery socket created", + ) + + stats, err := history.NewInstance() + if err != nil { + o.slogger.Log(ctx, slog.LevelWarn, + "could not create new osquery instance history", + "err", err, + ) + } + o.stats = stats + + // This loop runs in the background when the process was + // successfully started. ("successful" is independent of exit + // code. eg: this runs if we could exec. Failure to exec is above.) + o.errgroup.Go(func() error { + defer o.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "monitor osquery process", + ) + + err := o.cmd.Wait() + switch { + case err == nil, isExitOk(err): + o.slogger.Log(ctx, slog.LevelInfo, + "osquery exited successfully", + ) + // TODO: should this return nil? + return errors.New("osquery process exited successfully") + default: + msgPairs := append( + getOsqueryInfoForLog(o.cmd.Path), + "err", err, + ) + + o.slogger.Log(ctx, slog.LevelWarn, + "error running osquery command", + msgPairs..., + ) + return fmt.Errorf("running osqueryd command: %w", err) + } + }) + + // Kill osquery process on shutdown + o.errgroup.Go(func() error { + defer o.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "kill osquery process on shutdown", + ) + + <-o.doneCtx.Done() + o.slogger.Log(ctx, slog.LevelDebug, + "starting osquery shutdown", + ) + if o.cmd.Process != nil { + // kill osqueryd and children + if err := killProcessGroup(o.cmd); err != nil { + if strings.Contains(err.Error(), "process already finished") || strings.Contains(err.Error(), "no such process") { + o.slogger.Log(ctx, slog.LevelDebug, + "tried to stop osquery, but process already gone", + ) + } else { + o.slogger.Log(ctx, slog.LevelWarn, + "error killing osquery process", + "err", err, + ) + } + } + } + return o.doneCtx.Err() + }) + + // Here be dragons + // + // There are two thorny issues. First, we "invert" control of + // the osquery process. We don't really know when osquery will + // be running, so we need a bunch of retries on these connections + // + // Second, because launcher supplements the enroll + // information, this Start function must return fast enough + // that osquery can use the registered tables for + // enrollment. *But* there's been a lot of racy behaviors, + // likely due to time spent registering tables, and subtle + // ordering issues. + + // Start an extension manager for the extensions that osquery + // needs for config/log/etc. It's called `kolide_grpc` for mostly historic reasons + o.extensionManagerClient, err = o.StartOsqueryClient(paths) + if err != nil { + traces.SetError(span, fmt.Errorf("could not create an extension client: %w", err)) + return fmt.Errorf("could not create an extension client: %w", err) + } + span.AddEvent("extension_client_created") + + if len(o.opts.extensionPlugins) > 0 { + if err := o.StartOsqueryExtensionManagerServer("kolide_grpc", paths.extensionSocketPath, o.extensionManagerClient, o.opts.extensionPlugins); err != nil { + o.slogger.Log(ctx, slog.LevelInfo, + "unable to create initial extension server, stopping", + "err", err, + ) + traces.SetError(span, fmt.Errorf("could not create an extension server: %w", err)) + return fmt.Errorf("could not create an extension server: %w", err) + } + span.AddEvent("extension_server_created") + } + + // Now spawn an extension manager for the tables. We need to + // start this one in the background, because the runner.Start + // function needs to return promptly enough for osquery to use + // it to enroll. Very racy + // + // TODO: Consider chunking, if we find we can only have so + // many tables per extension manager + o.errgroup.Go(func() error { + defer o.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "kolide extension manager server launch", + ) + + plugins := table.PlatformTables(o.knapsack, o.knapsack.Slogger().With("component", "platform_tables"), currentOsquerydBinaryPath) + + if len(plugins) == 0 { + return nil + } + + if err := o.StartOsqueryExtensionManagerServer("kolide", paths.extensionSocketPath, o.extensionManagerClient, plugins); err != nil { + o.slogger.Log(ctx, slog.LevelWarn, + "unable to create tables extension server, stopping", + "err", err, + ) + return fmt.Errorf("could not create a table extension server: %w", err) + } + return nil + }) + + // All done with osquery setup! Mark instance as connected, then proceed + // with setting up remaining errgroups. + if err := o.stats.Connected(o); err != nil { + o.slogger.Log(ctx, slog.LevelWarn, + "could not set connection time for osquery instance history", + "err", err, + ) + } + + // Health check on interval + o.errgroup.Go(func() error { + defer o.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "health check on interval", + ) + + if o.knapsack != nil && o.knapsack.OsqueryHealthcheckStartupDelay() != 0*time.Second { + o.slogger.Log(ctx, slog.LevelDebug, + "entering delay before starting osquery healthchecks", + ) + select { + case <-time.After(o.knapsack.OsqueryHealthcheckStartupDelay()): + o.slogger.Log(ctx, slog.LevelDebug, + "exiting delay before starting osquery healthchecks", + ) + case <-o.doneCtx.Done(): + return o.doneCtx.Err() + } + } + + ticker := time.NewTicker(healthCheckInterval) + defer ticker.Stop() + for { + select { + case <-o.doneCtx.Done(): + return o.doneCtx.Err() + case <-ticker.C: + // If device is sleeping, we do not want to perform unnecessary healthchecks that + // may force an unnecessary restart. + if o.knapsack != nil && o.knapsack.InModernStandby() { + break + } + + // Health check! Allow a couple + // failures before we tear everything + // down. This is pretty simple, it + // hardcodes the timing. Might be + // better for a Limiter + maxHealthChecks := 5 + for i := 1; i <= maxHealthChecks; i++ { + err := o.Healthy() + if err == nil { + // err was nil, clear failed attempts + if i > 1 { + o.slogger.Log(ctx, slog.LevelDebug, + "healthcheck passed, clearing error", + "attempt", i, + ) + } + break + } + + if i == maxHealthChecks { + o.slogger.Log(ctx, slog.LevelInfo, + "healthcheck failed, giving up", + "attempt", i, + "err", err, + ) + return fmt.Errorf("health check failed: %w", err) + } + + o.slogger.Log(ctx, slog.LevelDebug, + "healthcheck failed, will retry", + "attempt", i, + "err", err, + ) + time.Sleep(1 * time.Second) + } + } + } + }) + + // Clean up PID file on shutdown + o.errgroup.Go(func() error { + defer o.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "cleanup PID file", + ) + + <-o.doneCtx.Done() + // We do a couple retries -- on Windows, the PID file may still be in use + // and therefore unable to be removed. + if err := backoff.WaitFor(func() error { + if err := os.Remove(paths.pidfilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing PID file: %w", err) + } + return nil + }, 5*time.Second, 500*time.Millisecond); err != nil { + o.slogger.Log(ctx, slog.LevelInfo, + "could not remove PID file, despite retries", + "pid_file", paths.pidfilePath, + "err", err, + ) + } + return o.doneCtx.Err() + }) + + return nil +} + // osqueryFilePaths is a struct which contains the relevant file paths needed to // launch an osqueryd instance. type osqueryFilePaths struct { diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index b4907dd42..4e1e47285 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -2,26 +2,13 @@ package runtime import ( "context" - "errors" "fmt" "log/slog" - "os" - "strings" "sync" "time" "github.com/kolide/launcher/ee/agent/flags/keys" "github.com/kolide/launcher/ee/agent/types" - - "github.com/kolide/launcher/pkg/backoff" - "github.com/kolide/launcher/pkg/osquery/runtime/history" - "github.com/kolide/launcher/pkg/osquery/table" - "github.com/kolide/launcher/pkg/traces" - "github.com/osquery/osquery-go/plugin/config" - "github.com/osquery/osquery-go/plugin/distributed" - osquerylogger "github.com/osquery/osquery-go/plugin/logger" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) const ( @@ -50,9 +37,13 @@ type Runner struct { } func New(k types.Knapsack, opts ...OsqueryInstanceOption) *Runner { - runner := newRunner(k, opts...) - runner.slogger = k.Slogger().With("component", "osquery_runner") - runner.knapsack = k + runner := &Runner{ + instance: newInstance(k, opts...), + slogger: k.Slogger().With("component", "osquery_runner"), + knapsack: k, + shutdown: make(chan struct{}), + opts: opts, + } k.RegisterChangeObserver(runner, keys.WatchdogEnabled, keys.WatchdogMemoryLimitMB, keys.WatchdogUtilizationLimitPercent, keys.WatchdogDelaySec, @@ -64,7 +55,7 @@ func New(k types.Knapsack, opts ...OsqueryInstanceOption) *Runner { func (r *Runner) Run() error { // Ensure we don't try to restart the instance before it's launched r.instanceLock.Lock() - if err := r.launchOsqueryInstance(); err != nil { + if err := r.instance.launch(); err != nil { r.slogger.Log(context.TODO(), slog.LevelWarn, "failed to launch osquery instance", "err", err, @@ -113,13 +104,8 @@ func (r *Runner) Run() error { } r.instanceLock.Lock() - opts := r.instance.opts - r.instance = newInstance(r.knapsack) - r.instance.opts = opts - for _, opt := range r.opts { - opt(r.instance) - } - if err := r.launchOsqueryInstance(); err != nil { + r.instance = newInstance(r.knapsack, r.opts...) + if err := r.instance.launch(); err != nil { r.slogger.Log(context.TODO(), slog.LevelWarn, "fatal error restarting instance, shutting down", "err", err, @@ -229,403 +215,3 @@ func (r *Runner) Healthy() error { defer r.instanceLock.Unlock() return r.instance.Healthy() } - -func (r *Runner) launchOsqueryInstance() error { - ctx, span := traces.StartSpan(context.Background()) - defer span.End() - - o := r.instance - - // Based on the root directory, calculate the file names of all of the - // required osquery artifact files. - paths, err := calculateOsqueryPaths(o.knapsack.RootDirectory(), o.opts) - if err != nil { - traces.SetError(span, fmt.Errorf("could not calculate osquery file paths: %w", err)) - return fmt.Errorf("could not calculate osquery file paths: %w", err) - } - - // Populate augeas lenses, if requested - if o.opts.augeasLensFunc != nil { - if err := os.MkdirAll(paths.augeasPath, 0755); err != nil { - traces.SetError(span, fmt.Errorf("making augeas lenses directory: %w", err)) - return fmt.Errorf("making augeas lenses directory: %w", err) - } - - if err := o.opts.augeasLensFunc(paths.augeasPath); err != nil { - traces.SetError(span, fmt.Errorf("setting up augeas lenses: %w", err)) - return fmt.Errorf("setting up augeas lenses: %w", err) - } - } - - // If a config plugin has not been set by the caller, then it is likely - // that the instance will just be used for executing queries, so we - // will use a minimal config plugin that basically is a no-op. - if o.opts.configPluginFlag == "" { - generateConfigs := func(ctx context.Context) (map[string]string, error) { - return map[string]string{}, nil - } - o.opts.extensionPlugins = append(o.opts.extensionPlugins, config.NewPlugin("internal_noop", generateConfigs)) - o.opts.configPluginFlag = "internal_noop" - } - - // If a logger plugin has not been set by the caller, we set a logger - // plugin that outputs logs to the default application logger. - if o.opts.loggerPluginFlag == "" { - logString := func(ctx context.Context, typ osquerylogger.LogType, logText string) error { - return nil - } - o.opts.extensionPlugins = append(o.opts.extensionPlugins, osquerylogger.NewPlugin("internal_noop", logString)) - o.opts.loggerPluginFlag = "internal_noop" - } - - // If a distributed plugin has not been set by the caller, we set a - // distributed plugin that returns no queries. - if o.opts.distributedPluginFlag == "" { - getQueries := func(ctx context.Context) (*distributed.GetQueriesResult, error) { - return &distributed.GetQueriesResult{}, nil - } - writeResults := func(ctx context.Context, results []distributed.Result) error { - return nil - } - o.opts.extensionPlugins = append(o.opts.extensionPlugins, distributed.NewPlugin("internal_noop", getQueries, writeResults)) - o.opts.distributedPluginFlag = "internal_noop" - } - - // The knapsack will retrieve the correct version of osqueryd from the download library if available. - // If not available, it will fall back to the configured installed version of osqueryd. - currentOsquerydBinaryPath := o.knapsack.LatestOsquerydPath(ctx) - span.AddEvent("got_osqueryd_binary_path", trace.WithAttributes(attribute.String("path", currentOsquerydBinaryPath))) - - // Now that we have accepted options from the caller and/or determined what - // they should be due to them not being set, we are ready to create and start - // the *exec.Cmd instance that will run osqueryd. - o.cmd, err = o.createOsquerydCommand(currentOsquerydBinaryPath, paths) - if err != nil { - traces.SetError(span, fmt.Errorf("couldn't create osqueryd command: %w", err)) - return fmt.Errorf("couldn't create osqueryd command: %w", err) - } - - // Assign a PGID that matches the PID. This lets us kill the entire process group later. - o.cmd.SysProcAttr = setpgid() - - r.slogger.Log(ctx, slog.LevelInfo, - "launching osqueryd", - "path", o.cmd.Path, - "args", strings.Join(o.cmd.Args, " "), - ) - - // remove any socket already at the extension socket path to ensure - // that it's not left over from a previous instance - if err := os.RemoveAll(paths.extensionSocketPath); err != nil { - r.slogger.Log(ctx, slog.LevelWarn, - "error removing osquery extension socket", - "path", paths.extensionSocketPath, - "err", err, - ) - } - - // Launch osquery process (async) - err = o.startFunc(o.cmd) - if err != nil { - // Failure here is indicative of a failure to exec. A missing - // binary? Bad permissions? TODO: Consider catching errors in the - // update system and falling back to an earlier version. - msgPairs := append( - getOsqueryInfoForLog(o.cmd.Path), - "err", err, - ) - - r.slogger.Log(ctx, slog.LevelWarn, - "fatal error starting osquery -- could not exec.", - msgPairs..., - ) - traces.SetError(span, fmt.Errorf("fatal error starting osqueryd process: %w", err)) - return fmt.Errorf("fatal error starting osqueryd process: %w", err) - } - - span.AddEvent("launched_osqueryd") - r.slogger.Log(ctx, slog.LevelInfo, - "launched osquery process", - "osqueryd_pid", o.cmd.Process.Pid, - ) - - // wait for osquery to create the socket before moving on, - // this is intended to serve as a kind of health check - // for osquery, if it's started successfully it will create - // a socket - if err := backoff.WaitFor(func() error { - _, err := os.Stat(paths.extensionSocketPath) - if err != nil { - r.slogger.Log(ctx, slog.LevelDebug, - "osquery extension socket not created yet ... will retry", - "path", paths.extensionSocketPath, - ) - } - return err - }, 1*time.Minute, 1*time.Second); err != nil { - traces.SetError(span, fmt.Errorf("timeout waiting for osqueryd to create socket at %s: %w", paths.extensionSocketPath, err)) - return fmt.Errorf("timeout waiting for osqueryd to create socket at %s: %w", paths.extensionSocketPath, err) - } - - span.AddEvent("socket_created") - r.slogger.Log(ctx, slog.LevelDebug, - "osquery socket created", - ) - - stats, err := history.NewInstance() - if err != nil { - r.slogger.Log(ctx, slog.LevelWarn, - "could not create new osquery instance history", - "err", err, - ) - } - o.stats = stats - - // This loop runs in the background when the process was - // successfully started. ("successful" is independent of exit - // code. eg: this runs if we could exec. Failure to exec is above.) - o.errgroup.Go(func() error { - defer r.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "monitor osquery process", - ) - - err := o.cmd.Wait() - switch { - case err == nil, isExitOk(err): - r.slogger.Log(ctx, slog.LevelInfo, - "osquery exited successfully", - ) - // TODO: should this return nil? - return errors.New("osquery process exited successfully") - default: - msgPairs := append( - getOsqueryInfoForLog(o.cmd.Path), - "err", err, - ) - - r.slogger.Log(ctx, slog.LevelWarn, - "error running osquery command", - msgPairs..., - ) - return fmt.Errorf("running osqueryd command: %w", err) - } - }) - - // Kill osquery process on shutdown - o.errgroup.Go(func() error { - defer r.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "kill osquery process on shutdown", - ) - - <-o.doneCtx.Done() - r.slogger.Log(ctx, slog.LevelDebug, - "starting osquery shutdown", - ) - if o.cmd.Process != nil { - // kill osqueryd and children - if err := killProcessGroup(o.cmd); err != nil { - if strings.Contains(err.Error(), "process already finished") || strings.Contains(err.Error(), "no such process") { - r.slogger.Log(ctx, slog.LevelDebug, - "tried to stop osquery, but process already gone", - ) - } else { - r.slogger.Log(ctx, slog.LevelWarn, - "error killing osquery process", - "err", err, - ) - } - } - } - return o.doneCtx.Err() - }) - - // Here be dragons - // - // There are two thorny issues. First, we "invert" control of - // the osquery process. We don't really know when osquery will - // be running, so we need a bunch of retries on these connections - // - // Second, because launcher supplements the enroll - // information, this Start function must return fast enough - // that osquery can use the registered tables for - // enrollment. *But* there's been a lot of racy behaviors, - // likely due to time spent registering tables, and subtle - // ordering issues. - - // Start an extension manager for the extensions that osquery - // needs for config/log/etc. It's called `kolide_grpc` for mostly historic reasons - o.extensionManagerClient, err = o.StartOsqueryClient(paths) - if err != nil { - traces.SetError(span, fmt.Errorf("could not create an extension client: %w", err)) - return fmt.Errorf("could not create an extension client: %w", err) - } - span.AddEvent("extension_client_created") - - if len(o.opts.extensionPlugins) > 0 { - if err := o.StartOsqueryExtensionManagerServer("kolide_grpc", paths.extensionSocketPath, o.extensionManagerClient, o.opts.extensionPlugins); err != nil { - r.slogger.Log(ctx, slog.LevelInfo, - "unable to create initial extension server, stopping", - "err", err, - ) - traces.SetError(span, fmt.Errorf("could not create an extension server: %w", err)) - return fmt.Errorf("could not create an extension server: %w", err) - } - span.AddEvent("extension_server_created") - } - - // Now spawn an extension manager for the tables. We need to - // start this one in the background, because the runner.Start - // function needs to return promptly enough for osquery to use - // it to enroll. Very racy - // - // TODO: Consider chunking, if we find we can only have so - // many tables per extension manager - o.errgroup.Go(func() error { - defer r.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "kolide extension manager server launch", - ) - - plugins := table.PlatformTables(r.knapsack, r.knapsack.Slogger().With("component", "platform_tables"), currentOsquerydBinaryPath) - - if len(plugins) == 0 { - return nil - } - - if err := o.StartOsqueryExtensionManagerServer("kolide", paths.extensionSocketPath, o.extensionManagerClient, plugins); err != nil { - r.slogger.Log(ctx, slog.LevelWarn, - "unable to create tables extension server, stopping", - "err", err, - ) - return fmt.Errorf("could not create a table extension server: %w", err) - } - return nil - }) - - // All done with osquery setup! Mark instance as connected, then proceed - // with setting up remaining errgroups. - if err := o.stats.Connected(o); err != nil { - r.slogger.Log(ctx, slog.LevelWarn, - "could not set connection time for osquery instance history", - "err", err, - ) - } - - // Health check on interval - o.errgroup.Go(func() error { - defer r.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "health check on interval", - ) - - if o.knapsack != nil && o.knapsack.OsqueryHealthcheckStartupDelay() != 0*time.Second { - r.slogger.Log(ctx, slog.LevelDebug, - "entering delay before starting osquery healthchecks", - ) - select { - case <-time.After(o.knapsack.OsqueryHealthcheckStartupDelay()): - r.slogger.Log(ctx, slog.LevelDebug, - "exiting delay before starting osquery healthchecks", - ) - case <-o.doneCtx.Done(): - return o.doneCtx.Err() - } - } - - ticker := time.NewTicker(healthCheckInterval) - defer ticker.Stop() - for { - select { - case <-o.doneCtx.Done(): - return o.doneCtx.Err() - case <-ticker.C: - // If device is sleeping, we do not want to perform unnecessary healthchecks that - // may force an unnecessary restart. - if o.knapsack != nil && o.knapsack.InModernStandby() { - break - } - - // Health check! Allow a couple - // failures before we tear everything - // down. This is pretty simple, it - // hardcodes the timing. Might be - // better for a Limiter - maxHealthChecks := 5 - for i := 1; i <= maxHealthChecks; i++ { - err := r.Healthy() - if err == nil { - // err was nil, clear failed attempts - if i > 1 { - r.slogger.Log(ctx, slog.LevelDebug, - "healthcheck passed, clearing error", - "attempt", i, - ) - } - break - } - - if i == maxHealthChecks { - r.slogger.Log(ctx, slog.LevelInfo, - "healthcheck failed, giving up", - "attempt", i, - "err", err, - ) - return fmt.Errorf("health check failed: %w", err) - } - - r.slogger.Log(ctx, slog.LevelDebug, - "healthcheck failed, will retry", - "attempt", i, - "err", err, - ) - time.Sleep(1 * time.Second) - } - } - } - }) - - // Clean up PID file on shutdown - o.errgroup.Go(func() error { - defer r.slogger.Log(ctx, slog.LevelInfo, - "exiting errgroup", - "errgroup", "cleanup PID file", - ) - - <-o.doneCtx.Done() - // We do a couple retries -- on Windows, the PID file may still be in use - // and therefore unable to be removed. - if err := backoff.WaitFor(func() error { - if err := os.Remove(paths.pidfilePath); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("removing PID file: %w", err) - } - return nil - }, 5*time.Second, 500*time.Millisecond); err != nil { - r.slogger.Log(ctx, slog.LevelInfo, - "could not remove PID file, despite retries", - "pid_file", paths.pidfilePath, - "err", err, - ) - } - return o.doneCtx.Err() - }) - - return nil -} - -func newRunner(knapsack types.Knapsack, opts ...OsqueryInstanceOption) *Runner { - // Create an OsqueryInstance and apply the functional options supplied by the - // caller. - i := newInstance(knapsack) - - for _, opt := range opts { - opt(i) - } - - return &Runner{ - instance: i, - shutdown: make(chan struct{}), - opts: opts, - } -} diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 9bca50db6..0bcba652b 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -617,8 +617,9 @@ func TestNotStarted(t *testing.T) { k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("RootDirectory").Return(rootDirectory).Maybe() + k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() k.On("Slogger").Return(multislogger.NewNopLogger()) - runner := newRunner(k) + runner := New(k) assert.Error(t, runner.Healthy()) assert.NoError(t, runner.Shutdown())