Skip to content

Commit

Permalink
add monitoring filebeat test
Browse files Browse the repository at this point in the history
  • Loading branch information
khushijain21 committed Feb 14, 2025
1 parent 261d628 commit 5268e2e
Showing 1 changed file with 307 additions and 2 deletions.
309 changes: 307 additions & 2 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/testing/estools"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/control/v2/client"
aTesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools/estools"
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext"
"github.com/elastic/go-elasticsearch/v8"
)
Expand Down Expand Up @@ -577,7 +578,7 @@ func TestOtelAPMIngestion(t *testing.T) {
"run",
"-e",
"-E", "output.elasticsearch.hosts=['" + esHost + "']",
"-E", "output.elasticsearch.api_key=" + fmt.Sprintf("%s:%s", esApiKey.ID, esApiKey.APIKey),
"-E", "output.elasticsearch.api_key=" + fmt.Sprintf("%s:%s", esApiKey.Id, esApiKey.APIKey),
"-E", "apm-server.host=127.0.0.1:8200",
"-E", "apm-server.ssl.enabled=false",
}
Expand Down Expand Up @@ -1839,6 +1840,310 @@ service:
cmd.Wait()
}

func TestMonitoringAgentE2E(t *testing.T) {
// This test compares the document ingested by filebeat started by the agent.monitoring section,
// with its otel.yml counterpart
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
OS: []define.OS{
{Type: define.Linux},
{Type: define.Darwin},
{Type: define.Windows},
},
Stack: &define.Stack{},
})

fbMonitoringIndex := "logs-elastic_agent-default"
fbReceiverMonitoringIndex := "logs-otel-default"
logsDrop := filepath.Dir(loggingPath("unit", "darwin"))
tmpDir := t.TempDir()
// endpoint := utils.SocketURLWithFallback(unit, paths.TempDir())

type configOptions struct {
InputPath string
// HttpEndpoint string
HomeDir string
ESEndpoint string
ESApiKey string
BeatsESApiKey string
FBReceiverIndex string
}
esEndpoint, err := getESHost()
require.NoError(t, err, "error getting elasticsearch endpoint")
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)

configTemplate := `receivers:
filebeatreceiver/filestream-monitoring-agent:
filebeat:
inputs:
- type: filestream
enabled: true
id: filestream-monitoring-agent
index: logs-elastic_agent-default
data_stream:
dataset: otel
namespace: default
type: logs
paths:
- {{.InputPath}}/elastic-agent-*.ndjson
- {{.InputPath}}/elastic-agent-watcher-*.ndjson
close:
on_state_change:
inactive: 5m
parsers:
- ndjson:
add_error_key: true
message_key: message
overwrite_keys: true
target: ""
processors:
- add_fields:
fields:
input_id: filestream-monitoring-agent
target: '@metadata'
- add_fields:
fields:
dataset: elastic_agent
namespace: default
type: logs
target: data_stream
- add_fields:
fields:
dataset: elastic_agent
target: event
- add_fields:
fields:
stream_id: filestream-monitoring-agent
target: '@metadata'
- add_fields:
fields:
id: 0ddca301-e7c0-4eac-8432-7dd05bc9cb06
snapshot: false
version: 8.19.0
target: elastic_agent
- add_fields:
fields:
id: 0879f47d-df41-464d-8462-bc2b8fef45bf
target: agent
- drop_event:
when:
regexp:
component:
id: .*-monitoring$
- drop_event:
when:
regexp:
message: ^Non-zero metrics in the last
- copy_fields:
fields:
- from: data_stream.dataset
to: data_stream.dataset_original
- drop_fields:
fields:
- data_stream.dataset
- copy_fields:
fail_on_error: false
fields:
- from: component.dataset
to: data_stream.dataset
ignore_missing: true
- copy_fields:
fail_on_error: false
fields:
- from: data_stream.dataset_original
to: data_stream.dataset
- drop_fields:
fields:
- data_stream.dataset_original
- event.dataset
- copy_fields:
fields:
- from: data_stream.dataset
to: event.dataset
- drop_fields:
fields:
- ecs.version
ignore_missing: true
- add_formatted_index:
index: '%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}'
output:
otelconsumer:
path:
data: {{.HomeDir}}/filestream-monitoring
queue:
mem:
flush:
timeout: 0s
logging:
level: info
selectors:
- '*'
processors:
- add_host_metadata:
when:
not:
contains:
tags: forwarded
- add_cloud_data: ~
- add_docker_metadata: ~
- add_kubernetes_metadata:~
setup.ilm.enabled: false
setup.template.enabled: false
filebeat.config.modules.enabled: false
logging.event_data.to_stderr: true
logging.event_data.to_files: false
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- {{.ESEndpoint}}
compression: none
api_key: {{.ESApiKey}}
logs_index: {{.fbReceiverMonitoringIndex}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
- filebeatreceiver/filestream-monitoring-agent
exporters:
- elasticsearch/log
`

beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded)
require.NoError(t, err, "error decoding api key")

var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
configOptions{
InputPath: logsDrop,
// HttpEndpoint: endpoint,
HomeDir: tmpDir,
ESEndpoint: esEndpoint,
ESApiKey: esApiKey.Encoded,
BeatsESApiKey: string(beatsApiKey),
FBReceiverIndex: fbReceiverMonitoringIndex,
}))
configContents := configBuffer.Bytes()
fmt.Println(string(configContents))
t.Cleanup(func() {
if t.Failed() {
t.Logf("Contents of agent config file:\n%s\n", string(configContents))
}
})

// Now we can actually create the fixture and run it
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
defer cancel()

err = fixture.Prepare(ctx)
require.NoError(t, err)
err = fixture.Configure(ctx, configContents)
require.NoError(t, err)

cmd, err := fixture.PrepareAgentCommand(ctx, nil)
require.NoError(t, err)
cmd.WaitDelay = 1 * time.Second

var output strings.Builder
cmd.Stderr = &output
cmd.Stdout = &output

err = cmd.Start()
require.NoError(t, err)

t.Cleanup(func() {
if t.Failed() {
t.Log("Elastic-Agent output:")
t.Log(output.String())
}
})

require.Eventually(t, func() bool {
err = fixture.IsHealthy(ctx)
if err != nil {
t.Logf("waiting for agent healthy: %s", err.Error())
return false
}
return true
}, 1*time.Minute, 1*time.Second)

var docs estools.Documents
var docs2 estools.Documents
actualHits := &struct {
Hits int
}{}
require.Eventually(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbMonitoringIndex+"*", map[string]interface{}{
"data_stream.dataset": "elastic_agent",
})

docs2, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbReceiverMonitoringIndex+"*", map[string]interface{}{
"data_stream.dataset": "otel",
})
require.NoError(t, err)

actualHits.Hits = docs.Hits.Total.Value

return actualHits.Hits == docs2.Hits.Total.Value
},
1*time.Minute, 1*time.Second, "the number of logs in both index are not same")

doc1 := docs.Hits.Hits[0].Source
doc2 := docs.Hits.Hits[1].Source
ignoredFields := []string{
// Expected to change between filebeat and fbreceiver
"@timestamp",
"agent.ephemeral_id",
"agent.id",

// Missing from fbreceiver doc
"elastic_agent.id",
"elastic_agent.snapshot",
"elastic_agent.version",

// TODO: fbreceiver adds metadata fields that are internal in filebeat.
// Remove this once https://github.com/elastic/beats/pull/42412
// is available in agent.
"@metadata.beat",
"@metadata.type",
"@metadata.version",
}

assertMapsEqual(t, doc1, doc2, ignoredFields, "expected documents to be equal")
cancel()
cmd.Wait()
}

var logFileFormatWin = "%s\\logs\\%s"
var logFileFormat = "%s/logs/%s"

func loggingPath(id, operatingSystem string) string {
id = strings.ReplaceAll(id, string(filepath.Separator), "-")
if operatingSystem == "windows" {
return fmt.Sprintf(logFileFormatWin, paths.Home(), id)
}

return fmt.Sprintf(logFileFormat, paths.Home(), id)
}

func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) {
t.Helper()

Expand Down

0 comments on commit 5268e2e

Please sign in to comment.