From f48ee408fcb4aaaf42914773fa7a3d30bf5f11aa Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 19 Feb 2025 12:05:04 +0000 Subject: [PATCH 1/2] fix install prevention on Linux (#6866) (#6921) * fix install prevention on Linux * changelog * remove changelog fragment as the bug didn't land in any release yet * upgate go-sysinfo module * go mod tidy * notice * CRLF to LF (cherry picked from commit f7fc674f6994d69fe45491bb428d6e1694afdf7c) Co-authored-by: Leszek Kubik <39905449+intxgo@users.noreply.github.com> --- NOTICE.txt | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- specs/endpoint-security.spec.yml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 601eb612ae4..83962719506 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -2375,11 +2375,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-licenser@v0. -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-sysinfo -Version: v1.15.0 +Version: v1.15.1 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.15.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.15.1/LICENSE.txt: Apache License diff --git a/go.mod b/go.mod index 4c71bc3deda..c26c1149d5f 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/elastic/elastic-transport-go/v8 v8.6.0 github.com/elastic/go-elasticsearch/v8 v8.17.0 github.com/elastic/go-licenser v0.4.2 - github.com/elastic/go-sysinfo v1.15.0 + github.com/elastic/go-sysinfo v1.15.1 github.com/elastic/go-ucfg v0.8.8 github.com/elastic/mock-es v0.0.0-20241101195702-0a41fa3d30d9 github.com/elastic/opentelemetry-collector-components/connector/signaltometricsconnector v0.3.0 diff --git a/go.sum b/go.sum index abc3ce6aca2..f4237af0424 100644 --- a/go.sum +++ b/go.sum @@ -482,8 +482,8 @@ github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 h1:yuiN60oaQUz2PtN github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727/go.mod h1:sw1pzz4pIqzDQxFWt3dFoG2uIUFAfThxlMfWpjH590E= github.com/elastic/go-structform v0.0.12 h1:HXpzlAKyej8T7LobqKDThUw7BMhwV6Db24VwxNtgxCs= github.com/elastic/go-structform v0.0.12/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= -github.com/elastic/go-sysinfo v1.15.0 h1:54pRFlAYUlVNQ2HbXzLVZlV+fxS7Eax49stzg95M4Xw= -github.com/elastic/go-sysinfo v1.15.0/go.mod h1:jPSuTgXG+dhhh0GKIyI2Cso+w5lPJ5PvVqKlL8LV/Hk= +github.com/elastic/go-sysinfo v1.15.1 h1:zBmTnFEXxIQ3iwcQuk7MzaUotmKRp3OabbbWM8TdzIQ= +github.com/elastic/go-sysinfo v1.15.1/go.mod h1:jPSuTgXG+dhhh0GKIyI2Cso+w5lPJ5PvVqKlL8LV/Hk= github.com/elastic/go-ucfg v0.8.8 h1:54KIF/2zFKfl0MzsSOCGOsZ3O2bnjFQJ0nDJcLhviyk= github.com/elastic/go-ucfg v0.8.8/go.mod h1:4E8mPOLSUV9hQ7sgLEJ4bvt0KhMuDJa8joDT2QGAEKA= github.com/elastic/go-windows v1.0.2 h1:yoLLsAsV5cfg9FLhZ9EXZ2n2sQFKeDYrHenkcivY4vI= diff --git a/specs/endpoint-security.spec.yml b/specs/endpoint-security.spec.yml index c8dd247d23f..37b744b4f6f 100644 --- a/specs/endpoint-security.spec.yml +++ b/specs/endpoint-security.spec.yml @@ -14,7 +14,7 @@ inputs: - UPGRADE runtime: preventions: - - condition: ${runtime.arch} != ${runtime.native_arch} + - condition: ${runtime.native_arch} != '' and ${runtime.arch} != ${runtime.native_arch} message: "Elastic Defend cannot run on an emulated architecture" - condition: ${runtime.arch} == 'arm64' and ${runtime.family} == 'redhat' and ${runtime.major} == 7 message: "Elastic Defend doesn't support RHEL7 on arm64" From 003c9231ced26eec4aaca7fa0f0121b143dddec6 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 19 Feb 2025 17:35:38 +0000 Subject: [PATCH 2/2] otel: add test for document equivalence between agent filestream input and filebeat receiver (#6681) (#6867) * otel: add test for agent hybrid * add missing fingerprint config options to index small files * ensure new lines are written to the input file * disable compression level so we inspect the requests * add queue timeout to normal beats and separate path.home for fbreceiver * add path.home for normal filebeat * fix agent not starting * print elastic-agent output if test fails * add note for metadata field * fail if ignored field is not present in both maps * add host metadata processor, remove 'host.*' from ignored fields * Use global processor * more strict rules for ignored fields to avoid false positives * use add_fields to add missing fields, use a single index * add add_cloud_metadata processor * avoid checking cmd.Wait error since it is bogus (cherry picked from commit 41882ac0c5ca1caa39b7de00b4732bf460efc484) Co-authored-by: Mauri de Souza Meneguzzo --- pkg/testing/fixture.go | 2 +- testing/integration/otel_test.go | 271 +++++++++++++++++++++++++++++++ 2 files changed, 272 insertions(+), 1 deletion(-) diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 89968451747..794d82c67a3 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -726,7 +726,7 @@ func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (Ag status := AgentStatusOutput{} if uerr := json.Unmarshal(out, &status); uerr != nil { return AgentStatusOutput{}, - fmt.Errorf("could not unmarshal agent status output: %w", errors.Join(uerr, err)) + fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out) } else if status.IsZero() { return status, fmt.Errorf("agent status output is empty: %w", err) } diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 937dbdd3656..ac9977da2d2 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -9,6 +9,7 @@ package integration import ( "bytes" "context" + "encoding/base64" "errors" "fmt" "net/url" @@ -21,9 +22,11 @@ import ( "text/template" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/elastic-agent/pkg/control/v2/client" aTesting "github.com/elastic/elastic-agent/pkg/testing" @@ -1101,6 +1104,274 @@ service: require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) } +func TestHybridAgentE2E(t *testing.T) { + // This test is a hybrid agent test that ingests a single log with + // filebeat and fbreceiver. It then compares the final documents in + // Elasticsearch to ensure they have no meaningful differences. + info := define.Require(t, define.Requirements{ + Group: Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + numEvents := 1 + fbIndex := "logs-generic-default" + fbReceiverIndex := "logs-generic-default" + + inputFile, err := os.CreateTemp(tmpDir, "input-*.log") + require.NoError(t, err, "failed to create input log file") + inputFilePath := inputFile.Name() + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d", i))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + _, err = inputFile.Write([]byte("\n")) + require.NoErrorf(t, err, "failed to write newline to input file") + time.Sleep(100 * time.Millisecond) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data input file") + + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(inputFilePath) + if err != nil { + t.Logf("no data file to import at %s", inputFilePath) + return + } + t.Logf("contents of input file: %s\n", string(contents)) + } + }) + + type configOptions struct { + InputPath string + HomeDir string + ESEndpoint string + ESApiKey string + BeatsESApiKey string + FBReceiverIndex string + } + esEndpoint, err := getESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + + configTemplate := `agent.logging.level: info +agent.logging.to_stderr: true +inputs: + - id: filestream-filebeat + type: filestream + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + use_output: default + queue.mem.flush.timeout: 0s + path.home: {{.HomeDir}}/filebeat +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: {{.BeatsESApiKey}} + compression_level: 0 +receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_fields: + fields: + dataset: generic + namespace: default + type: logs + target: data_stream + - add_fields: + fields: + dataset: generic + target: event + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + path.home: {{.HomeDir}}/fbreceiver + queue.mem.flush.timeout: 0s +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + compression: none + api_key: {{.ESApiKey}} + logs_index: {{.FBReceiverIndex}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log + - debug +` + + beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded) + require.NoError(t, err, "error decoding api key") + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + InputPath: inputFilePath, + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + BeatsESApiKey: string(beatsApiKey), + FBReceiverIndex: fbReceiverIndex, + })) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Contents of agent config file:\n%s\n", string(configContents)) + } + }) + + // Now we can actually create the fixture and run it + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + + err = fixture.Prepare(ctx) + require.NoError(t, err) + err = fixture.Configure(ctx, configContents) + require.NoError(t, err) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err) + cmd.WaitDelay = 1 * time.Second + + var output strings.Builder + cmd.Stderr = &output + cmd.Stdout = &output + + err = cmd.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + var docs estools.Documents + actualHits := &struct { + Hits int + }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + docs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbIndex+"*", map[string]interface{}{ + "log.file.path": inputFilePath, + }) + require.NoError(t, err) + + actualHits.Hits = docs.Hits.Total.Value + + return actualHits.Hits == numEvents*2 // filebeat + fbreceiver + }, + 1*time.Minute, 1*time.Second, + "Expected %d logs in elasticsearch, got: %v", numEvents, actualHits) + + doc1 := docs.Hits.Hits[0].Source + doc2 := docs.Hits.Hits[1].Source + ignoredFields := []string{ + // Expected to change between filebeat and fbreceiver + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "agent.version", + + // Missing from fbreceiver doc + "elastic_agent.id", + "elastic_agent.snapshot", + "elastic_agent.version", + + // TODO: fbreceiver adds metadata fields that are internal in filebeat. + // Remove this once https://github.com/elastic/beats/pull/42412 + // is available in agent. + "@metadata.beat", + "@metadata.type", + "@metadata.version", + } + + assertMapsEqual(t, doc1, doc2, ignoredFields, "expected documents to be equal") + cancel() + cmd.Wait() +} + +func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + t.Helper() + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + for _, f := range ignoredFields { + hasKeyM1, _ := flatM1.HasKey(f) + hasKeyM2, _ := flatM2.HasKey(f) + + if !hasKeyM1 && !hasKeyM2 { + assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) + } + + // If the ignored field exists and is equal in both maps then it shouldn't be ignored + if hasKeyM1 && hasKeyM2 { + valM1, _ := flatM1.GetValue(f) + valM2, _ := flatM2.GetValue(f) + if valM1 == valM2 { + assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) + } + } + + flatM1.Delete(f) + flatM2.Delete(f) + } + require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") +} + func TestFBOtelRestartE2E(t *testing.T) { // This test ensures that filebeatreceiver is able to deliver logs even // in advent of a collector restart.