Skip to content

Commit

Permalink
Remove wait_for_integration config
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Mar 18, 2024
1 parent 0a0ff99 commit 2ca8889
Show file tree
Hide file tree
Showing 8 changed files with 5 additions and 216 deletions.
18 changes: 1 addition & 17 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ type Runner struct {
rawConfig *agentconfig.C

config *config.Config
fleetConfig *config.Fleet
outputConfig agentconfig.Namespace
elasticsearchOutputConfig *agentconfig.C

Expand Down Expand Up @@ -113,7 +112,6 @@ func NewRunner(args RunnerParams) (*Runner, error) {
var unpackedConfig struct {
APMServer *agentconfig.C `config:"apm-server"`
Output agentconfig.Namespace `config:"output"`
Fleet *config.Fleet `config:"fleet"`
DataStream struct {
Namespace string `config:"namespace"`
} `config:"data_stream"`
Expand Down Expand Up @@ -147,7 +145,6 @@ func NewRunner(args RunnerParams) (*Runner, error) {
rawConfig: args.Config,

config: cfg,
fleetConfig: unpackedConfig.Fleet,
outputConfig: unpackedConfig.Output,
elasticsearchOutputConfig: elasticsearchOutputConfig,

Expand Down Expand Up @@ -328,7 +325,7 @@ func (s *Runner) Run(ctx context.Context) error {
publishReady := make(chan struct{})
drain := make(chan struct{})
g.Go(func() error {
if err := s.waitReady(ctx, kibanaClient, tracer); err != nil {
if err := s.waitReady(ctx, tracer); err != nil {
// One or more preconditions failed; drop events.
close(drain)
return errors.Wrap(err, "error waiting for server to be ready")
Expand Down Expand Up @@ -567,7 +564,6 @@ func linearScaledValue(perGBIncrement, memLimitGB, constant float64) int {
// waitReady waits until the server is ready to index events.
func (s *Runner) waitReady(
ctx context.Context,
kibanaClient *kibana.Client,
tracer *apm.Tracer,
) error {
var preconditions []func(context.Context) error
Expand Down Expand Up @@ -618,18 +614,6 @@ func (s *Runner) waitReady(
})
}

// When running standalone with data streams enabled, by default we will add
// a precondition that ensures the integration is installed.
fleetManaged := s.fleetConfig != nil
if !fleetManaged && s.config.DataStreams.WaitForIntegration {
if kibanaClient == nil && esOutputClient == nil {
return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
}
preconditions = append(preconditions, func(ctx context.Context) error {
return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger)
})
}

if len(preconditions) == 0 {
return nil
}
Expand Down
146 changes: 0 additions & 146 deletions internal/beater/checkintegration.go

This file was deleted.

9 changes: 3 additions & 6 deletions internal/beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,7 @@ func TestUnpackConfig(t *testing.T) {
},
DefaultServiceEnvironment: "overridden",
DataStreams: DataStreamsConfig{
Namespace: "default",
WaitForIntegration: true,
Namespace: "default",
},
WaitReadyInterval: 5 * time.Second,
},
Expand Down Expand Up @@ -419,8 +418,7 @@ func TestUnpackConfig(t *testing.T) {
"storage_limit": "1GB",
},
"data_streams": map[string]interface{}{
"namespace": "foo",
"wait_for_integration": false,
"namespace": "foo",
},
},
outCfg: &Config{
Expand Down Expand Up @@ -503,8 +501,7 @@ func TestUnpackConfig(t *testing.T) {
},
},
DataStreams: DataStreamsConfig{
Namespace: "foo",
WaitForIntegration: false,
Namespace: "foo",
},
WaitReadyInterval: 5 * time.Second,
},
Expand Down
13 changes: 1 addition & 12 deletions internal/beater/config/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,10 @@ package config
// DataStreamsConfig holds data streams configuration.
type DataStreamsConfig struct {
Namespace string `config:"namespace"`

// WaitForIntegration controls whether APM Server waits for the Fleet
// integration package to be installed before indexing events.
//
// This config is ignored when running under Elastic Agent; it is intended
// for running APM Server standalone, relying on Fleet to install the integration
// for creating Elasticsearch index templates, ILM policies, and ingest pipelines.
//
// This configuration requires either a connection to Kibana or Elasticsearch.
WaitForIntegration bool `config:"wait_for_integration"`
}

func defaultDataStreamsConfig() DataStreamsConfig {
return DataStreamsConfig{
Namespace: "default",
WaitForIntegration: true,
Namespace: "default",
}
}
30 changes: 0 additions & 30 deletions internal/beater/config/fleet.go

This file was deleted.

1 change: 0 additions & 1 deletion systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type Config struct {
Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"`
Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"`
RUM *RUMConfig `json:"apm-server.rum,omitempty"`
WaitForIntegration *bool `json:"apm-server.data_streams.wait_for_integration,omitempty"`
DefaultServiceEnvironment string `json:"apm-server.default_service_environment,omitempty"`
AgentConfig *AgentConfig `json:"apm-server.agent.config,omitempty"`
TLS *TLSConfig `json:"apm-server.ssl,omitempty"`
Expand Down
2 changes: 0 additions & 2 deletions systemtest/gencorpora/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ type APMServer struct {
// using the Elasticsearch output.
func NewAPMServer(ctx context.Context, esHost string) *apmservertest.Server {
srv := apmservertest.NewUnstartedServer()
waitForIntegration := false
srv.Config.WaitForIntegration = &waitForIntegration
srv.Config.Output.Elasticsearch.Hosts = []string{esHost}
srv.Config.Kibana = nil
return srv
Expand Down
2 changes: 0 additions & 2 deletions systemtest/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ func TestLogstashOutput(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, ls.Close()) })

waitForIntegration := false
srv := apmservertest.NewUnstartedServerTB(t,
"-E", "queue.mem.events=20000",
"-E", "queue.mem.flush.min_events=10000",
"-E", "queue.mem.flush.timeout=60s",
)
srv.Config.WaitForIntegration = &waitForIntegration
srv.Config.Output = apmservertest.OutputConfig{
Logstash: &apmservertest.LogstashOutputConfig{
Enabled: true,
Expand Down

0 comments on commit 2ca8889

Please sign in to comment.