Skip to content

Refactor action policy change handler #4563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
}

h.log.Debugf("handlerPolicyChange: emit configuration for action %+v", a)
err = h.handleFleetServerHosts(ctx, c)
err = h.handlePolicyChange(ctx, c)
if err != nil {
return err
}
Expand All @@ -118,104 +118,172 @@ func (h *PolicyChangeHandler) Watch() <-chan coordinator.ConfigChange {
return h.ch
}

func (h *PolicyChangeHandler) handleFleetServerHosts(ctx context.Context, c *config.Config) (err error) {
func (h *PolicyChangeHandler) validateFleetServerHosts(ctx context.Context, cfg *configuration.Configuration) (*remote.Config, error) {
// do not update fleet-server host from policy; no setters provided with local Fleet Server
if len(h.setters) == 0 {
return nil
}
data, err := c.ToMapStr()
if err != nil {
return errors.New(err, "could not convert the configuration from the policy", errors.TypeConfig)
}
if _, ok := data["fleet"]; !ok {
// no fleet information in the configuration (skip checking client)
return nil
}

cfg, err := configuration.NewFromConfig(c)
if err != nil {
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
return nil, nil
}

if clientEqual(h.config.Fleet.Client, cfg.Fleet.Client) {
// already the same hosts
return nil
return nil, nil
}

// only set protocol/hosts as that is all Fleet currently sends
prevProtocol := h.config.Fleet.Client.Protocol
prevPath := h.config.Fleet.Client.Path
prevHost := h.config.Fleet.Client.Host
prevHosts := h.config.Fleet.Client.Hosts
prevProxy := h.config.Fleet.Client.Transport.Proxy
h.config.Fleet.Client.Protocol = cfg.Fleet.Client.Protocol
h.config.Fleet.Client.Path = cfg.Fleet.Client.Path
h.config.Fleet.Client.Host = cfg.Fleet.Client.Host
h.config.Fleet.Client.Hosts = cfg.Fleet.Client.Hosts
// make a copy the current client config and apply the changes in place on this copy
newFleetClientConfig := h.config.Fleet.Client
updateFleetConfig(h.log, cfg.Fleet.Client, &newFleetClientConfig)

// Empty proxies from fleet are ignored. That way a proxy set by --proxy-url
// it won't be overridden by an absent or empty proxy from fleet-server.
// However, if there is a proxy sent by fleet-server, it'll take precedence.
// Therefore, it's not possible to remove a proxy once it's set.
if cfg.Fleet.Client.Transport.Proxy.URL == nil ||
cfg.Fleet.Client.Transport.Proxy.URL.String() == "" {
h.log.Debug("proxy from fleet is empty or null, the proxy will not be changed")
} else {
h.config.Fleet.Client.Transport.Proxy = cfg.Fleet.Client.Transport.Proxy
h.log.Debug("received proxy from fleet, applying it")
}

// rollback on failure
defer func() {
if err != nil {
h.config.Fleet.Client.Protocol = prevProtocol
h.config.Fleet.Client.Path = prevPath
h.config.Fleet.Client.Host = prevHost
h.config.Fleet.Client.Hosts = prevHosts
h.config.Fleet.Client.Transport.Proxy = prevProxy
}
}()
// Test new config
err := testFleetConfig(ctx, h.log, newFleetClientConfig, h.config.Fleet.AccessAPIKey)
if err != nil {
return nil, fmt.Errorf("validating fleet client config: %w", err)
}

client, err := client.NewAuthWithConfig(
h.log, h.config.Fleet.AccessAPIKey, h.config.Fleet.Client)
return &newFleetClientConfig, nil
}

func testFleetConfig(ctx context.Context, log *logger.Logger, clientConfig remote.Config, apiKey string) error {
fleetClient, err := client.NewAuthWithConfig(
log, apiKey, clientConfig)
if err != nil {
return errors.New(
err, "fail to create API client with updated config",
errors.TypeConfig,
errors.M("hosts", append(
h.config.Fleet.Client.Hosts, h.config.Fleet.Client.Host)))
clientConfig.Hosts, clientConfig.Host)))
}

ctx, cancel := context.WithTimeout(ctx, apiStatusTimeout)
defer cancel()

resp, err := client.Send(ctx, http.MethodGet, "/api/status", nil, nil, nil)
// TODO: a HEAD should be enough as we need to test only the connectivity part
resp, err := fleetClient.Send(ctx, http.MethodGet, "/api/status", nil, nil, nil)
if err != nil {
return errors.New(
err, "fail to communicate with Fleet Server API client hosts",
errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Client.Hosts))
errors.TypeNetwork, errors.M("hosts", clientConfig.Hosts))
}

// discard body for proper cancellation and connection reuse
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()

reader, err := fleetToReader(h.agentInfo, h.config)
return nil
}

// updateFleetConfig copies the relevant Fleet client settings from src on dst. The destination struct is modified in-place
func updateFleetConfig(log *logger.Logger, src remote.Config, dst *remote.Config) {
dst.Protocol = src.Protocol
dst.Path = src.Path
dst.Host = src.Host
dst.Hosts = src.Hosts

// Empty proxies from fleet are ignored. That way a proxy set by --proxy-url
// it won't be overridden by an absent or empty proxy from fleet-server.
// However, if there is a proxy sent by fleet-server, it'll take precedence.
// Therefore, it's not possible to remove a proxy once it's set.

if src.Transport.Proxy.URL == nil ||
src.Transport.Proxy.URL.String() == "" {
log.Debug("proxy from fleet is empty or null, the proxy will not be changed")
} else {
// copy the proxy struct
dst.Transport.Proxy = src.Transport.Proxy

// replace in dst the attributes that are passed by reference within the proxy struct

// Headers map
dst.Transport.Proxy.Headers = map[string]string{}
for k, v := range src.Transport.Proxy.Headers {
dst.Transport.Proxy.Headers[k] = v
}

// Proxy URL
urlCopy := *src.Transport.Proxy.URL
dst.Transport.Proxy.URL = &urlCopy

log.Debug("received proxy from fleet, applying it")
}
}

func (h *PolicyChangeHandler) handlePolicyChange(ctx context.Context, c *config.Config) (err error) {
cfg, err := configuration.NewFromConfig(c)
if err != nil {
return errors.New(
err, "fail to persist new Fleet Server API client hosts",
errors.TypeUnexpected, errors.M("hosts", h.config.Fleet.Client.Hosts))
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
}

// validate Fleet connectivity with the new configuration
var validatedConfig *remote.Config
validatedConfig, err = h.validateFleetServerHosts(ctx, cfg)
if err != nil {
return fmt.Errorf("error validating Fleet client config: %w", err)
}

if validatedConfig != nil {
// there's a change in the fleet client settings
backupFleetClientCfg := h.config.Fleet.Client
// rollback in case of error
defer func() {
if err != nil {
h.config.Fleet.Client = backupFleetClientCfg
}
}()

// modify runtime handler config before saving
h.config.Fleet.Client = *validatedConfig
}

// persist configuration
err = saveConfig(h.agentInfo, h.config, h.store)
if err != nil {
return fmt.Errorf("saving FleetClientConfig: %w", err)
}

// apply the new configuration to the current clients
err = h.applyFleetClientConfig(validatedConfig)
if err != nil {
return fmt.Errorf("applying FleetClientConfig: %w", err)
}

return nil
}

func (h *PolicyChangeHandler) applyFleetClientConfig(validatedConfig *remote.Config) error {
if validatedConfig == nil || len(h.setters) == 0 {
// nothing to do for fleet hosts
return nil
}

err = h.store.Save(reader)
// the config has already been validated, no need for error handling
fleetClient, err := client.NewAuthWithConfig(
h.log, h.config.Fleet.AccessAPIKey, *validatedConfig)
if err != nil {
return fmt.Errorf("creating new fleet client with updated config: %w", err)
}
for _, setter := range h.setters {
setter.SetClient(fleetClient)
}

return nil
}

func saveConfig(agentInfo info.Agent, validatedConfig *configuration.Configuration, store storage.Store) error {
if validatedConfig == nil {
// nothing to do for fleet hosts
return nil
}
reader, err := fleetToReader(agentInfo.AgentID(), agentInfo.Headers(), validatedConfig)
if err != nil {
return errors.New(
err, "fail to persist new Fleet Server API client hosts",
errors.TypeFilesystem, errors.M("hosts", h.config.Fleet.Client.Hosts))
errors.TypeUnexpected, errors.M("hosts", validatedConfig.Fleet.Client.Hosts))
}

for _, setter := range h.setters {
setter.SetClient(client)
err = store.Save(reader)
if err != nil {
return errors.New(
err, "fail to persist new Fleet Server API client hosts",
errors.TypeFilesystem, errors.M("hosts", validatedConfig.Fleet.Client.Hosts))
}
return nil
}
Expand Down Expand Up @@ -264,12 +332,12 @@ func clientEqual(k1 remote.Config, k2 remote.Config) bool {
return true
}

func fleetToReader(agentInfo info.Agent, cfg *configuration.Configuration) (io.Reader, error) {
func fleetToReader(agentID string, headers map[string]string, cfg *configuration.Configuration) (io.Reader, error) {
configToStore := map[string]interface{}{
"fleet": cfg.Fleet,
"agent": map[string]interface{}{
"id": agentInfo.AgentID(),
"headers": agentInfo.Headers(),
"id": agentID,
"headers": headers,
"logging.level": cfg.Settings.LoggingConfig.Level,
"monitoring.http": cfg.Settings.MonitoringConfig.HTTP,
"monitoring.pprof": cfg.Settings.MonitoringConfig.Pprof,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestPolicyAcked(t *testing.T) {
})
}

func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) {
mockProxy := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write(nil)
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
"fleet.proxy_url": "http://some.proxy",
})

err := h.handleFleetServerHosts(context.Background(), cfg)
err := h.handlePolicyChange(context.Background(), cfg)
require.Error(t, err) // it needs to fail to rollback

assert.Equal(t, 0, setterCalledCount)
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
"fleet.proxy_url": "http://some.proxy",
})

err := h.handleFleetServerHosts(context.Background(), cfg)
err := h.handlePolicyChange(context.Background(), cfg)
require.Error(t, err) // it needs to fail to rollback

assert.Equal(t, 0, setterCalledCount)
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
map[string]interface{}{
"fleet.host": fleetServer.URL})

err := h.handleFleetServerHosts(context.Background(), cfg)
err := h.handlePolicyChange(context.Background(), cfg)
require.NoError(t, err)

assert.Equal(t, 1, setterCalledCount)
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
map[string]interface{}{
"fleet.hosts": wantHosts})

err := h.handleFleetServerHosts(context.Background(), cfg)
err := h.handlePolicyChange(context.Background(), cfg)
require.NoError(t, err)

assert.Equal(t, 1, setterCalledCount)
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
"fleet.proxy_url": mockProxy.URL,
"fleet.host": fleetServer.URL})

err := h.handleFleetServerHosts(context.Background(), cfg)
err := h.handlePolicyChange(context.Background(), cfg)
require.NoError(t, err)

assert.Equal(t, 1, setterCalledCount)
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
"fleet.proxy_url": "",
"fleet.host": fleetServer.URL})

err = h.handleFleetServerHosts(context.Background(), cfg)
err = h.handlePolicyChange(context.Background(), cfg)
require.NoError(t, err)

assert.Equal(t, 1, setterCalledCount)
Expand Down