From 4117a4c1117818db19f2817096d5d60f1c1bdeb3 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Wed, 18 Oct 2023 07:52:11 -0700 Subject: [PATCH 01/32] [cmd/otecontribcol] Add sending data step to exporter lifecycle tests (#27824) As the next step, I'll update it to send readonly data to immutable exporters --- cmd/otelcontribcol/exporters_test.go | 134 +++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 10 deletions(-) diff --git a/cmd/otelcontribcol/exporters_test.go b/cmd/otelcontribcol/exporters_test.go index 530d05a666f2..8ab6de82c640 100644 --- a/cmd/otelcontribcol/exporters_test.go +++ b/cmd/otelcontribcol/exporters_test.go @@ -49,6 +49,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/mezmoexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sentryexporter" @@ -60,6 +61,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tencentcloudlogserviceexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" ) func TestDefaultExporters(t *testing.T) { @@ -70,9 +72,10 @@ func TestDefaultExporters(t *testing.T) { endpoint := testutil.GetAvailableLocalAddress(t) tests := []struct { - getConfigFn getExporterConfigFn - exporter component.Type - skipLifecycle bool + getConfigFn getExporterConfigFn + exporter component.Type + skipLifecycle bool + expectConsumeErr bool }{ { exporter: "awscloudwatchlogs", @@ -80,12 +83,16 @@ func TestDefaultExporters(t *testing.T) { cfg := expFactories["awscloudwatchlogs"].CreateDefaultConfig().(*awscloudwatchlogsexporter.Config) cfg.Endpoint = "http://" + endpoint cfg.Region = "local" + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, - skipLifecycle: true, + expectConsumeErr: true, }, { - exporter: "awss3", + exporter: "awss3", + expectConsumeErr: true, }, { exporter: "file", @@ -102,8 +109,12 @@ func TestDefaultExporters(t *testing.T) { cfg.Brokers = []string{"invalid:9092"} // this disables contacting the broker so we can successfully create the exporter cfg.Metadata.Full = false + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "debug", @@ -118,8 +129,12 @@ func TestDefaultExporters(t *testing.T) { cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: endpoint, } + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "otlp", @@ -128,16 +143,24 @@ func TestDefaultExporters(t *testing.T) { cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: endpoint, } + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "otlphttp", getConfigFn: func() component.Config { cfg := expFactories["otlphttp"].CreateDefaultConfig().(*otlphttpexporter.Config) cfg.Endpoint = "http://" + endpoint + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "prometheus", @@ -149,6 +172,14 @@ func TestDefaultExporters(t *testing.T) { }, { exporter: "prometheusremotewrite", + getConfigFn: func() component.Config { + cfg := expFactories["prometheusremotewrite"].CreateDefaultConfig().(*prometheusremotewriteexporter.Config) + // disable queue/retry to validate passing the test data synchronously + cfg.RemoteWriteQueue.Enabled = false + cfg.RetrySettings.Enabled = false + return cfg + }, + expectConsumeErr: true, }, { exporter: "pulsar", @@ -164,8 +195,12 @@ func TestDefaultExporters(t *testing.T) { getConfigFn: func() component.Config { cfg := expFactories["sapm"].CreateDefaultConfig().(*sapmexporter.Config) cfg.Endpoint = "http://" + endpoint + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "signalfx", @@ -174,8 +209,12 @@ func TestDefaultExporters(t *testing.T) { cfg.AccessToken = "my_fake_token" cfg.IngestURL = "http://" + endpoint cfg.APIURL = "http://" + endpoint + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "splunk_hec", @@ -183,16 +222,24 @@ func TestDefaultExporters(t *testing.T) { cfg := expFactories["splunk_hec"].CreateDefaultConfig().(*splunkhecexporter.Config) cfg.Token = "my_fake_token" cfg.Endpoint = "http://" + endpoint + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "zipkin", getConfigFn: func() component.Config { cfg := expFactories["zipkin"].CreateDefaultConfig().(*zipkinexporter.Config) cfg.Endpoint = endpoint + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "awskinesis", @@ -221,6 +268,7 @@ func TestDefaultExporters(t *testing.T) { cfg.Region = "local" return cfg }, + expectConsumeErr: true, }, { exporter: "awsxray", @@ -230,6 +278,7 @@ func TestDefaultExporters(t *testing.T) { cfg.Region = "local" return cfg }, + expectConsumeErr: true, }, { exporter: "azuredataexplorer", @@ -259,6 +308,7 @@ func TestDefaultExporters(t *testing.T) { cfg.Endpoint = "http://" + endpoint return cfg }, + expectConsumeErr: true, }, { exporter: "clickhouse", @@ -285,16 +335,24 @@ func TestDefaultExporters(t *testing.T) { cfg.Traces.Endpoint = endpoint cfg.Logs.Endpoint = endpoint cfg.Metrics.Endpoint = endpoint + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "datadog", getConfigFn: func() component.Config { cfg := expFactories["datadog"].CreateDefaultConfig().(*datadogexporter.Config) cfg.API.Key = "cutedogsgotoheaven" + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "dataset", @@ -302,8 +360,13 @@ func TestDefaultExporters(t *testing.T) { cfg := expFactories["dataset"].CreateDefaultConfig().(*datasetexporter.Config) cfg.DatasetURL = "https://" + endpoint cfg.APIKey = "secret-key" + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, + skipLifecycle: true, // shutdown fails if there is buffered data }, { exporter: "dynatrace", @@ -311,14 +374,20 @@ func TestDefaultExporters(t *testing.T) { cfg := expFactories["dynatrace"].CreateDefaultConfig().(*dtconf.Config) cfg.Endpoint = "http://" + endpoint cfg.APIToken = "dynamictracing" + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "elasticsearch", getConfigFn: func() component.Config { cfg := expFactories["elasticsearch"].CreateDefaultConfig().(*elasticsearchexporter.Config) cfg.Endpoints = []string{"http://" + endpoint} + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false return cfg }, }, @@ -329,7 +398,9 @@ func TestDefaultExporters(t *testing.T) { cfg.Endpoint = "http://" + endpoint cfg.Source = "magic-source" cfg.AuthConfig.CredentialFile = filepath.Join(t.TempDir(), "random.file") - + // disable queue/retry to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, skipLifecycle: true, @@ -351,6 +422,9 @@ func TestDefaultExporters(t *testing.T) { getConfigFn: func() component.Config { cfg := expFactories["influxdb"].CreateDefaultConfig().(*influxdbexporter.Config) cfg.Endpoint = "http://" + endpoint + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, skipLifecycle: true, @@ -363,6 +437,7 @@ func TestDefaultExporters(t *testing.T) { cfg.AgentKey = "Key1" return cfg }, + expectConsumeErr: true, }, { exporter: "loadbalancing", @@ -371,11 +446,15 @@ func TestDefaultExporters(t *testing.T) { cfg.Resolver = loadbalancingexporter.ResolverSettings{Static: &loadbalancingexporter.StaticResolver{Hostnames: []string{"127.0.0.1"}}} return cfg }, + expectConsumeErr: true, // the exporter requires traces with service.name resource attribute }, { exporter: "logicmonitor", getConfigFn: func() component.Config { cfg := expFactories["logicmonitor"].CreateDefaultConfig().(*logicmonitorexporter.Config) + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, skipLifecycle: true, @@ -385,22 +464,33 @@ func TestDefaultExporters(t *testing.T) { getConfigFn: func() component.Config { cfg := expFactories["logzio"].CreateDefaultConfig().(*logzioexporter.Config) cfg.Endpoint = "http://" + endpoint + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "loki", getConfigFn: func() component.Config { cfg := expFactories["loki"].CreateDefaultConfig().(*lokiexporter.Config) cfg.Endpoint = "http://" + endpoint + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "mezmo", getConfigFn: func() component.Config { cfg := expFactories["mezmo"].CreateDefaultConfig().(*mezmoexporter.Config) cfg.Endpoint = "http://" + endpoint + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, }, @@ -416,6 +506,9 @@ func TestDefaultExporters(t *testing.T) { exporter: "skywalking", getConfigFn: func() component.Config { cfg := expFactories["skywalking"].CreateDefaultConfig().(*skywalkingexporter.Config) + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, skipLifecycle: true, @@ -425,9 +518,12 @@ func TestDefaultExporters(t *testing.T) { getConfigFn: func() component.Config { cfg := expFactories["sumologic"].CreateDefaultConfig().(*sumologicexporter.Config) cfg.Endpoint = "http://" + endpoint - + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "tanzuobservability", @@ -435,16 +531,20 @@ func TestDefaultExporters(t *testing.T) { cfg := expFactories["tanzuobservability"].CreateDefaultConfig().(*tanzuobservabilityexporter.Config) cfg.Traces.Endpoint = "http://" + endpoint cfg.Metrics.Endpoint = "http://" + endpoint + // disable queue to validate passing the test data synchronously + cfg.QueueSettings.Enabled = false + cfg.RetrySettings.Enabled = false return cfg }, + expectConsumeErr: true, }, { exporter: "tencentcloud_logservice", getConfigFn: func() component.Config { cfg := expFactories["tencentcloud_logservice"].CreateDefaultConfig().(*tencentcloudlogserviceexporter.Config) - return cfg }, + expectConsumeErr: true, }, } @@ -460,7 +560,7 @@ func TestDefaultExporters(t *testing.T) { if tt.skipLifecycle { t.SkipNow() } - verifyExporterLifecycle(t, factory, tt.getConfigFn) + verifyExporterLifecycle(t, factory, tt.getConfigFn, tt.expectConsumeErr) }) }) } @@ -474,7 +574,7 @@ type getExporterConfigFn func() component.Config // verifyExporterLifecycle is used to test if an exporter type can handle the typical // lifecycle of a component. The getConfigFn parameter only need to be specified if // the test can't be done with the default configuration for the component. -func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn getExporterConfigFn) { +func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn getExporterConfigFn, expectErr bool) { ctx := context.Background() host := newAssertNoErrorHost(t) expCreateSettings := exportertest.NewNopCreateSettings() @@ -502,6 +602,20 @@ func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn exps = append(exps, exp) } for _, exp := range exps { + var err error + assert.NotPanics(t, func() { + switch e := exp.(type) { + case exporter.Logs: + err = e.ConsumeLogs(ctx, testdata.GenerateLogsManyLogRecordsSameResource(2)) + case exporter.Metrics: + err = e.ConsumeMetrics(ctx, testdata.GenerateMetricsTwoMetrics()) + case exporter.Traces: + err = e.ConsumeTraces(ctx, testdata.GenerateTracesTwoSpansSameResource()) + } + }) + if !expectErr { + assert.NoError(t, err) + } assert.NoError(t, exp.Shutdown(ctx)) } } From b517424c9400d1498ff8287befe6c641852b075b Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 18 Oct 2023 17:19:50 +0200 Subject: [PATCH 02/32] [chore][exporter/syslog] docs: describe default values, add examples (#27831) Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21242 --- exporter/syslogexporter/README.md | 126 ++++++++++++++++++++++++++++-- 1 file changed, 118 insertions(+), 8 deletions(-) diff --git a/exporter/syslogexporter/README.md b/exporter/syslogexporter/README.md index d68c1af15723..02de6fbf9d3b 100644 --- a/exporter/syslogexporter/README.md +++ b/exporter/syslogexporter/README.md @@ -11,12 +11,10 @@ [development]: https://github.com/open-telemetry/opentelemetry-collector#development -The syslog exporter supports sending messages to a remote syslog server. - -- This exporter can forward syslog messages to syslog server using [RFC5424][RFC5424] and [RFC3164][RFC3164]. -- It is recommended that this syslog exporter be used with the [syslog receiver][syslog_receiver] or with [filelog receiver][filelog_receiver] along with [syslog_parser][syslog_parser] configured in the receiver, please see [examples](./examples/) - This ensures that all the syslog message headers are populated with the expected values. -- Not using the `syslog_parser` will result in the syslog message being populated with default header values. +The Syslog exporter sends logs in [syslog][syslog_wikipedia] format to a remote syslog server. +It supports syslog protocols [RFC5424][RFC5424] and [RFC3164][RFC3164] and can send data over `TCP` or `UDP`. +The exporter aims to be compatible with the [Syslog receiver][syslog_receiver]. +This means that syslog messages received via the Syslog receiver and exported via the Syslog exporter should be unchanged. ## Configuration @@ -52,12 +50,124 @@ The syslog exporter supports sending messages to a remote syslog server. - `storage` (default = `none`): When set, enables persistence and uses the component specified as a storage extension for the [persistent queue][persistent_queue] - `timeout` (default = 5s) Time to wait per individual attempt to send data to a backend +## Examples + +### RFC5424 + +When configured with `protocol: rfc5424`, the exporter creates one syslog message for each log record, +based on the following record-level attributes of the log. +If an attribute is missing, the default value is used. +The log's timestamp field is used for the syslog message's time. + +| Attribute name | Type | Default value | +| ----------------- | ------ | -------------- | +| `appname` | string | `-` | +| `hostname` | string | `-` | +| `message` | string | empty string | +| `msg_id` | string | `-` | +| `priority` | int | `165` | +| `proc_id` | string | `-` | +| `structured_data` | map | `-` | +| `version` | int | `1` | + +Here's a simplified representation of an input log record: + +```json +{ + "body": "", + "timeUnixNano": 1065903255003000000, + "attributes": + { + "appname": "su", + "hostname": "mymachine.example.com", + "message": "'su root' failed for lonvick on /dev/pts/8", + "priority": 34, + } +} +``` + +And here's the output message based on the above log record: + +```console +<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - - - 'su root' failed for lonvick on /dev/pts/8 +``` + +Here'a another example, this includes the structured data and other attributes: + +```json +{ + "body": "", + "timeUnixNano": 1438811939693012000, + "attributes": + { + "appname": "SecureAuth0", + "hostname": "192.168.2.132", + "message": "Found the user for retrieving user's profile", + "msg_id": "ID52020", + "priority": 86, + "proc_id": "23108", + "structured_data": + { + "SecureAuth@27389": + { + "UserHostAddress":"192.168.2.132", + "Realm":"SecureAuth0", + "UserID":"Tester2", + "PEN":"27389" + } + }, + "version": 1 + } +} +``` + +Output: + +```console +<86>1 2015-08-05T21:58:59.693012Z 192.168.2.132 SecureAuth0 23108 ID52020 [SecureAuth@27389 UserHostAddress="192.168.2.132" Realm="SecureAuth0" UserID="Tester2" PEN="27389"] Found the user for retrieving user's profile +``` + +### RFC3164 + +When configured with `protocol: rfc3164`, the exporter creates one syslog message for each log record, +based on the following record-level attributes of the log. +If an attribute is missing, the default value is used. +The log's timestamp field is used for the syslog message's time. + +| Attribute name | Type | Default value | +| ----------------- | ------ | -------------- | +| `appname` | string | empty string | +| `hostname` | string | `-` | +| `message` | string | empty string | +| `priority` | int | `165` | + +Here's a simplified representation of an input log record: + +```json +{ + "body": "", + "timeUnixNano": 1697062455000000000, + "attributes": + { + "appname": "su", + "hostname": "mymachine", + "message": "'su root' failed for lonvick on /dev/pts/8", + "priority": 34 + } +} +``` + +Output: + +```console +<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8 +``` + Please see [example configurations](./examples/). +[syslog_wikipedia]: https://en.wikipedia.org/wiki/Syslog [RFC5424]: https://www.rfc-editor.org/rfc/rfc5424 [RFC3164]: https://www.rfc-editor.org/rfc/rfc3164 -[syslog_parser]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/docs/operators/syslog_parser.md [syslog_receiver]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/syslogreceiver -[filelog_receiver]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/filelogreceiver [cryptoTLS]: https://github.com/golang/go/blob/518889b35cb07f3e71963f2ccfc0f96ee26a51ce/src/crypto/tls/common.go#L706-L709 [persistent_queue]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md#persistent-queue From bf0a3f4ca94a86475f92d8a82a1831f4dbcabb91 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Wed, 18 Oct 2023 10:24:59 -0700 Subject: [PATCH 03/32] [chore] Send readonly data to immutable exporters in lifecycle tests (#27825) This should help to catch exporters that are incorrectly claimed as not mutating. --- cmd/otelcontribcol/exporters_test.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cmd/otelcontribcol/exporters_test.go b/cmd/otelcontribcol/exporters_test.go index 8ab6de82c640..949e7e41048d 100644 --- a/cmd/otelcontribcol/exporters_test.go +++ b/cmd/otelcontribcol/exporters_test.go @@ -606,11 +606,23 @@ func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn assert.NotPanics(t, func() { switch e := exp.(type) { case exporter.Logs: - err = e.ConsumeLogs(ctx, testdata.GenerateLogsManyLogRecordsSameResource(2)) + logs := testdata.GenerateLogsManyLogRecordsSameResource(2) + if !e.Capabilities().MutatesData { + logs.MarkReadOnly() + } + err = e.ConsumeLogs(ctx, logs) case exporter.Metrics: - err = e.ConsumeMetrics(ctx, testdata.GenerateMetricsTwoMetrics()) + metrics := testdata.GenerateMetricsTwoMetrics() + if !e.Capabilities().MutatesData { + metrics.MarkReadOnly() + } + err = e.ConsumeMetrics(ctx, metrics) case exporter.Traces: - err = e.ConsumeTraces(ctx, testdata.GenerateTracesTwoSpansSameResource()) + traces := testdata.GenerateTracesTwoSpansSameResource() + if !e.Capabilities().MutatesData { + traces.MarkReadOnly() + } + err = e.ConsumeTraces(ctx, traces) } }) if !expectErr { From c2f343b392058e8bdc86cd191451bac994e69dfb Mon Sep 17 00:00:00 2001 From: hovavza <147598197+hovavza@users.noreply.github.com> Date: Wed, 18 Oct 2023 20:55:22 +0300 Subject: [PATCH 04/32] UDP input operator - async mode - separate between readers & processors (#27805) **Description:** adding a feature - when async mode is enabled in the UDP receiver (udp input operator), separating reading from processing operations. This is important to reduce data-loss in high scale UDP scenarios. See original issue for more details. The async config block is changed now. Instead of readers field (determining the concurrency level of how many threads the udp receiver is running, all reading from the UDP port, processing, and sending downstream), it will now have 2 fields: - readers - determines the concurrency level of threads only reading from UDP port and pushing the packets to a channel. - processors - determines the concurrency level of threads reading from the channel, processing the packets, and sending downstream. - max_queue_length - determines the max size of the channel between the readers & the processors. Setting it high enough, allows to prevent data-loss in cases of downstream temporary latency. Once channel is full, the readers thread will stop until there's room in the queue (so to prevent unlimited memory usage). This improves performance and reduces UDP packet loss in high-scale scenarios. Note that async mode only supports this separation of readers from processors. If async config block isn't included, the default state **Link to tracking Issue:** 27613 **Testing:** Local stress tests ran all types of async config (no 'async', with 'async', etc.). Updating existing udp test accordingly. Also, ran scale tests and saw improvement in data-loss. **Documentation:** Updated md file for both udplogreceiver & stanza udp_input operator with the new flags. --------- Co-authored-by: Daniel Jaglowski --- ...er-to-reader-and-processor-with-async.yaml | 27 +++ pkg/stanza/docs/operators/udp_input.md | 6 +- pkg/stanza/operator/input/udp/config_test.go | 7 +- .../operator/input/udp/testdata/config.yaml | 4 +- pkg/stanza/operator/input/udp/udp.go | 156 +++++++++++++----- pkg/stanza/operator/input/udp/udp_test.go | 7 +- receiver/udplogreceiver/README.md | 6 +- receiver/udplogreceiver/udp_test.go | 7 +- 8 files changed, 171 insertions(+), 49 deletions(-) create mode 100644 .chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml diff --git a/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml new file mode 100644 index 000000000000..bd33138b4325 --- /dev/null +++ b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: When async is enabled for udp receiver, separate logic into readers (only read logs from udp port and push to channel), and processors (read logs from channel and process; decode, split, add attributes, and push downstream), allowing to change concurrency level for both readers and processors separately. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27613] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/pkg/stanza/docs/operators/udp_input.md b/pkg/stanza/docs/operators/udp_input.md index 2a08d16716bb..555ddc97faba 100644 --- a/pkg/stanza/docs/operators/udp_input.md +++ b/pkg/stanza/docs/operators/udp_input.md @@ -50,11 +50,13 @@ for other encodings available. If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently. -**note** If `async` is not set at all, a single thread will read lines synchronously. +**note** If `async` is not set at all, a single thread will read & process lines synchronously. | Field | Default | Description | | --- | --- | --- | -| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). | +| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). | +| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. | +| `max_queue_length` | 100 | Determines max number of messages which may be waiting for a processor. While the queue is full, the readers will wait until there's room (readers will not drop messages, but they will not read additional incoming messages during that period). | ### Example Configurations diff --git a/pkg/stanza/operator/input/udp/config_test.go b/pkg/stanza/operator/input/udp/config_test.go index 7b806ed61985..a65f8ea6cc3d 100644 --- a/pkg/stanza/operator/input/udp/config_test.go +++ b/pkg/stanza/operator/input/udp/config_test.go @@ -43,8 +43,11 @@ func TestUnmarshal(t *testing.T) { cfg.Encoding = "utf-8" cfg.SplitConfig.LineStartPattern = "ABC" cfg.SplitConfig.LineEndPattern = "" - cfg.AsyncConfig = NewAsyncConfig() - cfg.AsyncConfig.Readers = 2 + cfg.AsyncConfig = &AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } return cfg }(), }, diff --git a/pkg/stanza/operator/input/udp/testdata/config.yaml b/pkg/stanza/operator/input/udp/testdata/config.yaml index 4acbf3621df8..4353dd894ee4 100644 --- a/pkg/stanza/operator/input/udp/testdata/config.yaml +++ b/pkg/stanza/operator/input/udp/testdata/config.yaml @@ -17,4 +17,6 @@ all_with_async: line_start_pattern: ABC line_end_pattern: "" async: - readers: 2 \ No newline at end of file + readers: 2 + processors: 2 + max_queue_length: 100 diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index cc2b69952df3..4593f9ead5e5 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -27,6 +27,10 @@ const ( // Maximum UDP packet size MaxUDPSize = 64 * 1024 + + defaultReaders = 1 + defaultProcessors = 1 + defaultMaxQueueLength = 100 ) func init() { @@ -59,14 +63,9 @@ type Config struct { } type AsyncConfig struct { - Readers int `mapstructure:"readers,omitempty"` -} - -// NewAsyncConfig creates a new AsyncConfig with default values. -func NewAsyncConfig() *AsyncConfig { - return &AsyncConfig{ - Readers: 1, - } + Readers int `mapstructure:"readers,omitempty"` + Processors int `mapstructure:"processors,omitempty"` + MaxQueueLength int `mapstructure:"max_queue_length,omitempty"` } // BaseConfig is the details configuration of a udp input operator. @@ -113,12 +112,16 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { resolver = helper.NewIPResolver() } - if c.AsyncConfig == nil { - c.AsyncConfig = NewAsyncConfig() - } - - if c.AsyncConfig.Readers <= 0 { - return nil, fmt.Errorf("async readers must be greater than 0") + if c.AsyncConfig != nil { + if c.AsyncConfig.Readers <= 0 { + c.AsyncConfig.Readers = defaultReaders + } + if c.AsyncConfig.Processors <= 0 { + c.AsyncConfig.Processors = defaultProcessors + } + if c.AsyncConfig.MaxQueueLength <= 0 { + c.AsyncConfig.MaxQueueLength = defaultMaxQueueLength + } } udpInput := &Input{ @@ -132,6 +135,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { OneLogPerPacket: c.OneLogPerPacket, AsyncConfig: c.AsyncConfig, } + + if c.AsyncConfig != nil { + udpInput.messageQueue = make(chan messageAndAddress, c.AsyncConfig.MaxQueueLength) + } return udpInput, nil } @@ -151,6 +158,14 @@ type Input struct { encoding encoding.Encoding splitFunc bufio.SplitFunc resolver *helper.IPResolver + + messageQueue chan messageAndAddress + stopOnce sync.Once +} + +type messageAndAddress struct { + Message []byte + RemoteAddr net.Addr } // Start will start listening for messages on a socket. @@ -170,9 +185,20 @@ func (u *Input) Start(_ operator.Persister) error { // goHandleMessages will handle messages from a udp connection. func (u *Input) goHandleMessages(ctx context.Context) { - for i := 0; i < u.AsyncConfig.Readers; i++ { + if u.AsyncConfig == nil { u.wg.Add(1) go u.readAndProcessMessages(ctx) + return + } + + for i := 0; i < u.AsyncConfig.Readers; i++ { + u.wg.Add(1) + go u.readMessagesAsync(ctx) + } + + for i := 0; i < u.AsyncConfig.Processors; i++ { + u.wg.Add(1) + go u.processMessagesAsync(ctx) } } @@ -193,23 +219,69 @@ func (u *Input) readAndProcessMessages(ctx context.Context) { break } - if u.OneLogPerPacket { - log := truncateMaxLog(message) - u.handleMessage(ctx, remoteAddr, dec, log) - continue - } + u.processMessage(ctx, message, remoteAddr, dec, buf) + } +} - scanner := bufio.NewScanner(bytes.NewReader(message)) - scanner.Buffer(buf, MaxUDPSize) +func (u *Input) processMessage(ctx context.Context, message []byte, remoteAddr net.Addr, dec *decode.Decoder, buf []byte) { + if u.OneLogPerPacket { + log := truncateMaxLog(message) + u.handleMessage(ctx, remoteAddr, dec, log) + return + } + + scanner := bufio.NewScanner(bytes.NewReader(message)) + scanner.Buffer(buf, MaxUDPSize) + + scanner.Split(u.splitFunc) - scanner.Split(u.splitFunc) + for scanner.Scan() { + u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes()) + } + if err := scanner.Err(); err != nil { + u.Errorw("Scanner error", zap.Error(err)) + } +} - for scanner.Scan() { - u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes()) +func (u *Input) readMessagesAsync(ctx context.Context) { + defer u.wg.Done() + + for { + message, remoteAddr, err := u.readMessage() + if err != nil { + select { + case <-ctx.Done(): + return + default: + u.Errorw("Failed reading messages", zap.Error(err)) + } + break } - if err := scanner.Err(); err != nil { - u.Errorw("Scanner error", zap.Error(err)) + + messageAndAddr := messageAndAddress{ + Message: message, + RemoteAddr: remoteAddr, } + + // Send the message to the message queue for processing + u.messageQueue <- messageAndAddr + } +} + +func (u *Input) processMessagesAsync(ctx context.Context) { + defer u.wg.Done() + + dec := decode.New(u.encoding) + buf := make([]byte, 0, MaxUDPSize) + + for { + // Read a message from the message queue. + messageAndAddr, ok := <-u.messageQueue + if !ok { + return // Channel closed, exit the goroutine. + } + + u.processMessage(ctx, messageAndAddr.Message, messageAndAddr.RemoteAddr, dec, buf) } } @@ -274,18 +346,24 @@ func (u *Input) readMessage() ([]byte, net.Addr, error) { // Stop will stop listening for udp messages. func (u *Input) Stop() error { - if u.cancel == nil { - return nil - } - u.cancel() - if u.connection != nil { - if err := u.connection.Close(); err != nil { - u.Errorf("failed to close UDP connection: %s", err) + u.stopOnce.Do(func() { + if u.AsyncConfig != nil { + close(u.messageQueue) } - } - u.wg.Wait() - if u.resolver != nil { - u.resolver.Stop() - } + + if u.cancel == nil { + return + } + u.cancel() + if u.connection != nil { + if err := u.connection.Close(); err != nil { + u.Errorf("failed to close UDP connection: %s", err) + } + } + u.wg.Wait() + if u.resolver != nil { + u.resolver.Stop() + } + }) return nil } diff --git a/pkg/stanza/operator/input/udp/udp_test.go b/pkg/stanza/operator/input/udp/udp_test.go index aba8eeb28e59..7695b734af90 100644 --- a/pkg/stanza/operator/input/udp/udp_test.go +++ b/pkg/stanza/operator/input/udp/udp_test.go @@ -143,8 +143,11 @@ func TestInput(t *testing.T) { t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg)) t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg)) - cfg.AsyncConfig = NewAsyncConfig() - cfg.AsyncConfig.Readers = 2 + cfg.AsyncConfig = &AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg)) } diff --git a/receiver/udplogreceiver/README.md b/receiver/udplogreceiver/README.md index 4d6c9ed85751..8be89e306df1 100644 --- a/receiver/udplogreceiver/README.md +++ b/receiver/udplogreceiver/README.md @@ -24,7 +24,7 @@ Receives logs over UDP. | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | | `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][hhttps://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] | | `multiline` | | A `multiline` configuration block. See below for details | | `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | | `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | @@ -78,7 +78,9 @@ If set, the `async` configuration block instructs the `udp_input` operator to re | Field | Default | Description | | --- | --- | --- | -| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). | +| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). | +| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. | +| `max_queue_length` | 100 | Determines max length of channel being used by async reader routines. When channel reaches max number, reader routine will block until channel has room. | ## Example Configurations diff --git a/receiver/udplogreceiver/udp_test.go b/receiver/udplogreceiver/udp_test.go index b3cbe39d7b58..e632100a127f 100644 --- a/receiver/udplogreceiver/udp_test.go +++ b/receiver/udplogreceiver/udp_test.go @@ -32,7 +32,12 @@ func TestUdp(t *testing.T) { func TestUdpAsync(t *testing.T) { listenAddress := "127.0.0.1:29019" cfg := testdataConfigYaml(listenAddress) - cfg.InputConfig.AsyncConfig = udp.NewAsyncConfig() + cfg.InputConfig.AsyncConfig = &udp.AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } + cfg.InputConfig.AsyncConfig.Readers = 2 testUDP(t, testdataConfigYaml(listenAddress), listenAddress) } From c44ad3c81b7670b19edae3abef8a9416f04eecd7 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 18 Oct 2023 17:11:00 -0400 Subject: [PATCH 05/32] [receiver/filelog] Implement specifying top n files to track when ordering (#27844) **Description:** * Add a new `ordering_criteria.top_n` option, which allows a user to specify the number of files to track after ordering. * Default is 1, which was the existing behavior. **Link to tracking Issue:** #23788 **Testing:** Unit tests added. **Documentation:** Added new parameter to existing documentation. --- .chloggen/feat_top_n_file_sorting.yaml | 22 ++++ pkg/stanza/fileconsumer/config_test.go | 10 ++ pkg/stanza/fileconsumer/matcher/matcher.go | 23 +++- .../fileconsumer/matcher/matcher_test.go | 114 ++++++++++++++++++ pkg/stanza/fileconsumer/testdata/config.yaml | 4 + receiver/filelogreceiver/README.md | 1 + 6 files changed, 171 insertions(+), 3 deletions(-) create mode 100755 .chloggen/feat_top_n_file_sorting.yaml diff --git a/.chloggen/feat_top_n_file_sorting.yaml b/.chloggen/feat_top_n_file_sorting.yaml new file mode 100755 index 000000000000..1a4e678bae36 --- /dev/null +++ b/.chloggen/feat_top_n_file_sorting.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a new "top_n" option to specify the number of files to track when using ordering criteria + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23788] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ["user"] diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9d83118aa4bd..43171be5c96c 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -412,6 +412,16 @@ func TestUnmarshal(t *testing.T) { return newMockOperatorConfig(cfg) }(), }, + { + Name: "ordering_criteria_top_n", + Expect: func() *mockOperatorConfig { + cfg := NewConfig() + cfg.OrderingCriteria = matcher.OrderingCriteria{ + TopN: 10, + } + return newMockOperatorConfig(cfg) + }(), + }, }, }.Run(t) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher.go b/pkg/stanza/fileconsumer/matcher/matcher.go index 0a7a0628edac..76cdd1bd4feb 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher.go +++ b/pkg/stanza/fileconsumer/matcher/matcher.go @@ -18,6 +18,10 @@ const ( sortTypeAlphabetical = "alphabetical" ) +const ( + defaultOrderingCriteriaTopN = 1 +) + type Criteria struct { Include []string `mapstructure:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty"` @@ -26,6 +30,7 @@ type Criteria struct { type OrderingCriteria struct { Regex string `mapstructure:"regex,omitempty"` + TopN int `mapstructure:"top_n,omitempty"` SortBy []Sort `mapstructure:"sort_by,omitempty"` } @@ -62,6 +67,14 @@ func New(c Criteria) (*Matcher, error) { return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified") } + if c.OrderingCriteria.TopN < 0 { + return nil, fmt.Errorf("'top_n' must be a positive integer") + } + + if c.OrderingCriteria.TopN == 0 { + c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN + } + regex, err := regexp.Compile(c.OrderingCriteria.Regex) if err != nil { return nil, fmt.Errorf("compile regex: %w", err) @@ -97,6 +110,7 @@ func New(c Criteria) (*Matcher, error) { include: c.Include, exclude: c.Exclude, regex: regex, + topN: c.OrderingCriteria.TopN, filterOpts: filterOpts, }, nil } @@ -105,6 +119,7 @@ type Matcher struct { include []string exclude []string regex *regexp.Regexp + topN int filterOpts []filter.Option } @@ -127,7 +142,9 @@ func (m Matcher) MatchFiles() ([]string, error) { return result, errors.Join(err, errs) } - // Return only the first item. - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23788 - return result[:1], errors.Join(err, errs) + if len(result) <= m.topN { + return result, errors.Join(err, errs) + } + + return result[:m.topN], errors.Join(err, errs) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher_test.go b/pkg/stanza/fileconsumer/matcher/matcher_test.go index c838962a4699..1d9de6f17f87 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher_test.go +++ b/pkg/stanza/fileconsumer/matcher/matcher_test.go @@ -98,6 +98,23 @@ func TestNew(t *testing.T) { }, expectedErr: "compile regex: error parsing regexp: missing closing ]: `[a-z`", }, + { + name: "TopN is negative", + criteria: Criteria{ + Include: []string{"*.log"}, + OrderingCriteria: OrderingCriteria{ + Regex: "[a-z]", + TopN: -1, + SortBy: []Sort{ + { + SortType: "numeric", + RegexKey: "key", + }, + }, + }, + }, + expectedErr: "'top_n' must be a positive integer", + }, { name: "SortTypeEmpty", criteria: Criteria{ @@ -249,6 +266,46 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.2023020612.log"}, }, + { + name: "TopN > number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 3, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, + { + name: "TopN == number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, { name: "Timestamp Sorting Ascending", files: []string{"err.2023020612.log", "err.2023020611.log", "err.2023020609.log", "err.2023020610.log"}, @@ -319,6 +376,24 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.d.log"}, }, + { + name: "Alphabetical Sorting - Top 2", + files: []string{"err.a.log", "err.d.log", "err.b.log", "err.c.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z]+).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeAlphabetical, + RegexKey: "value", + Ascending: false, + }, + }, + }, + expected: []string{"err.d.log", "err.c.log"}, + }, { name: "Alphabetical Sorting Ascending", files: []string{"err.b.log", "err.a.log", "err.c.log", "err.d.log"}, @@ -336,6 +411,45 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.a.log"}, }, + { + name: "Multiple Sorting - timestamp priority sort - Top 4", + files: []string{ + "err.b.1.2023020601.log", + "err.b.2.2023020601.log", + "err.a.1.2023020601.log", + "err.a.2.2023020601.log", + "err.b.1.2023020602.log", + "err.a.2.2023020602.log", + "err.b.2.2023020602.log", + "err.a.1.2023020602.log", + }, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z])\.(?P\d+)\.(?P