diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 93a3d67abc0..15e5a145387 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -85,7 +85,6 @@ type Runner struct { rawConfig *agentconfig.C config *config.Config - fleetConfig *config.Fleet outputConfig agentconfig.Namespace elasticsearchOutputConfig *agentconfig.C @@ -113,7 +112,6 @@ func NewRunner(args RunnerParams) (*Runner, error) { var unpackedConfig struct { APMServer *agentconfig.C `config:"apm-server"` Output agentconfig.Namespace `config:"output"` - Fleet *config.Fleet `config:"fleet"` DataStream struct { Namespace string `config:"namespace"` } `config:"data_stream"` @@ -147,7 +145,6 @@ func NewRunner(args RunnerParams) (*Runner, error) { rawConfig: args.Config, config: cfg, - fleetConfig: unpackedConfig.Fleet, outputConfig: unpackedConfig.Output, elasticsearchOutputConfig: elasticsearchOutputConfig, @@ -328,7 +325,7 @@ func (s *Runner) Run(ctx context.Context) error { publishReady := make(chan struct{}) drain := make(chan struct{}) g.Go(func() error { - if err := s.waitReady(ctx, kibanaClient, tracer); err != nil { + if err := s.waitReady(ctx, tracer); err != nil { // One or more preconditions failed; drop events. close(drain) return errors.Wrap(err, "error waiting for server to be ready") @@ -567,7 +564,6 @@ func linearScaledValue(perGBIncrement, memLimitGB, constant float64) int { // waitReady waits until the server is ready to index events. func (s *Runner) waitReady( ctx context.Context, - kibanaClient *kibana.Client, tracer *apm.Tracer, ) error { var preconditions []func(context.Context) error @@ -618,18 +614,6 @@ func (s *Runner) waitReady( }) } - // When running standalone with data streams enabled, by default we will add - // a precondition that ensures the integration is installed. - fleetManaged := s.fleetConfig != nil - if !fleetManaged && s.config.DataStreams.WaitForIntegration { - if kibanaClient == nil && esOutputClient == nil { - return errors.New("cannot wait for integration without either Kibana or Elasticsearch config") - } - preconditions = append(preconditions, func(ctx context.Context) error { - return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger) - }) - } - if len(preconditions) == 0 { return nil } diff --git a/internal/beater/checkintegration.go b/internal/beater/checkintegration.go deleted file mode 100644 index f0fe62ebc59..00000000000 --- a/internal/beater/checkintegration.go +++ /dev/null @@ -1,146 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beater - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - - "github.com/pkg/errors" - "golang.org/x/sync/errgroup" - - "github.com/elastic/elastic-agent-libs/logp" - - "github.com/elastic/apm-server/internal/elasticsearch" - "github.com/elastic/apm-server/internal/kibana" - "github.com/elastic/go-elasticsearch/v8/esapi" -) - -// checkIntegrationInstalled checks if the APM integration is installed by querying Kibana -// and/or Elasticsearch, returning nil if and only if it is installed. -func checkIntegrationInstalled( - ctx context.Context, - kibanaClient *kibana.Client, - esClient *elasticsearch.Client, - logger *logp.Logger, -) (err error) { - defer func() { - if err != nil { - // We'd like to include some remediation actions when the APM Integration isn't installed. - err = &actionableError{ - Err: err, - Name: "apm integration installed", - Remediation: "please install the apm integration: https://ela.st/apm-integration-quickstart", - } - } - }() - if kibanaClient != nil { - installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger) - if err != nil { - // We only return the Kibana error if we have no Elasticsearch client, - // as we may not have sufficient privileges to query the Fleet API. - if esClient == nil { - return fmt.Errorf("error querying Kibana for integration package status: %w", err) - } - } else if !installed { - // We were able to query Kibana, but the package is not yet installed. - // We should continue querying the package status via Kibana, as it is - // more authoritative than checking for index template installation. - return errors.New("integration package not yet installed") - } - // Fall through and query Elasticsearch (if we have a client). Kibana may prematurely - // report packages as installed: https://github.com/elastic/kibana/issues/108649 - } - if esClient != nil { - installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger) - if err != nil { - return fmt.Errorf("error querying Elasticsearch for integration index templates: %w", err) - } else if !installed { - return errors.New("integration index templates not installed") - } - } - return nil -} - -// checkIntegrationInstalledKibana checks if the APM integration package -// is installed by querying Kibana. -func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) { - resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil) - if err != nil { - return false, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return false, fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status, bytes.TrimSpace(body)) - } - var result struct { - Response struct { - Status string `json:"status"` - } `json:"response"` - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return false, errors.Wrap(err, "error decoding integration package response") - } - logger.Infof("integration package status: %s", result.Response.Status) - return result.Response.Status == "installed", nil -} - -func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient *elasticsearch.Client, _ *logp.Logger) (bool, error) { - // TODO(axw) generate the list of expected index templates. - templates := []string{ - "traces-apm", - "traces-apm.sampled", - "metrics-apm.app", - "metrics-apm.internal", - "logs-apm.error", - } - for _, intervals := range []string{"1m", "10m", "60m"} { - for _, ds := range []string{"metrics-apm.transaction", "metrics-apm.service_transaction", "metrics-apm.service_destination", "metrics-apm.service_summary"} { - templates = append(templates, fmt.Sprintf("%s.%s", ds, intervals)) - } - } - // IndicesGetIndexTemplateRequest accepts a slice of template names, - // but the REST API expects just one index template name. Query them - // in parallel. - g, ctx := errgroup.WithContext(ctx) - for _, template := range templates { - template := template // copy for closure - g.Go(func() error { - req := esapi.IndicesGetIndexTemplateRequest{Name: template} - resp, err := req.Do(ctx, esClient) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.IsError() { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body)) - } - return nil - }) - } - err := g.Wait() - return err == nil, err -} diff --git a/internal/beater/config/config_test.go b/internal/beater/config/config_test.go index ed99f0caea1..9b136f95b72 100644 --- a/internal/beater/config/config_test.go +++ b/internal/beater/config/config_test.go @@ -370,8 +370,7 @@ func TestUnpackConfig(t *testing.T) { }, DefaultServiceEnvironment: "overridden", DataStreams: DataStreamsConfig{ - Namespace: "default", - WaitForIntegration: true, + Namespace: "default", }, WaitReadyInterval: 5 * time.Second, }, @@ -419,8 +418,7 @@ func TestUnpackConfig(t *testing.T) { "storage_limit": "1GB", }, "data_streams": map[string]interface{}{ - "namespace": "foo", - "wait_for_integration": false, + "namespace": "foo", }, }, outCfg: &Config{ @@ -503,8 +501,7 @@ func TestUnpackConfig(t *testing.T) { }, }, DataStreams: DataStreamsConfig{ - Namespace: "foo", - WaitForIntegration: false, + Namespace: "foo", }, WaitReadyInterval: 5 * time.Second, }, diff --git a/internal/beater/config/data_streams.go b/internal/beater/config/data_streams.go index 0a0456eaa4c..0dbc6d53267 100644 --- a/internal/beater/config/data_streams.go +++ b/internal/beater/config/data_streams.go @@ -20,21 +20,10 @@ package config // DataStreamsConfig holds data streams configuration. type DataStreamsConfig struct { Namespace string `config:"namespace"` - - // WaitForIntegration controls whether APM Server waits for the Fleet - // integration package to be installed before indexing events. - // - // This config is ignored when running under Elastic Agent; it is intended - // for running APM Server standalone, relying on Fleet to install the integration - // for creating Elasticsearch index templates, ILM policies, and ingest pipelines. - // - // This configuration requires either a connection to Kibana or Elasticsearch. - WaitForIntegration bool `config:"wait_for_integration"` } func defaultDataStreamsConfig() DataStreamsConfig { return DataStreamsConfig{ - Namespace: "default", - WaitForIntegration: true, + Namespace: "default", } } diff --git a/internal/beater/config/fleet.go b/internal/beater/config/fleet.go deleted file mode 100644 index 3b584fe3b08..00000000000 --- a/internal/beater/config/fleet.go +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package config - -import ( - "github.com/elastic/elastic-agent-libs/transport/tlscommon" -) - -// Fleet holds configuration required for communicating with fleet-server. -type Fleet struct { - Hosts []string `config:"hosts"` - Protocol string `config:"protocol"` - AccessAPIKey string `config:"access_api_key"` - TLS *tlscommon.Config `config:"ssl"` -} diff --git a/systemtest/apmservertest/config.go b/systemtest/apmservertest/config.go index 48253f332b5..77bd71d2ae3 100644 --- a/systemtest/apmservertest/config.go +++ b/systemtest/apmservertest/config.go @@ -46,7 +46,6 @@ type Config struct { Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"` Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"` RUM *RUMConfig `json:"apm-server.rum,omitempty"` - WaitForIntegration *bool `json:"apm-server.data_streams.wait_for_integration,omitempty"` DefaultServiceEnvironment string `json:"apm-server.default_service_environment,omitempty"` AgentConfig *AgentConfig `json:"apm-server.agent.config,omitempty"` TLS *TLSConfig `json:"apm-server.ssl,omitempty"` diff --git a/systemtest/gencorpora/apmserver.go b/systemtest/gencorpora/apmserver.go index 18720e28786..ed9cc6fd94f 100644 --- a/systemtest/gencorpora/apmserver.go +++ b/systemtest/gencorpora/apmserver.go @@ -38,8 +38,6 @@ type APMServer struct { // using the Elasticsearch output. func NewAPMServer(ctx context.Context, esHost string) *apmservertest.Server { srv := apmservertest.NewUnstartedServer() - waitForIntegration := false - srv.Config.WaitForIntegration = &waitForIntegration srv.Config.Output.Elasticsearch.Hosts = []string{esHost} srv.Config.Kibana = nil return srv diff --git a/systemtest/logstash_test.go b/systemtest/logstash_test.go index 299333a494d..114a1b26377 100644 --- a/systemtest/logstash_test.go +++ b/systemtest/logstash_test.go @@ -38,13 +38,11 @@ func TestLogstashOutput(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, ls.Close()) }) - waitForIntegration := false srv := apmservertest.NewUnstartedServerTB(t, "-E", "queue.mem.events=20000", "-E", "queue.mem.flush.min_events=10000", "-E", "queue.mem.flush.timeout=60s", ) - srv.Config.WaitForIntegration = &waitForIntegration srv.Config.Output = apmservertest.OutputConfig{ Logstash: &apmservertest.LogstashOutputConfig{ Enabled: true,