diff --git a/.chloggen/awss3exporter-sending-queue.yaml b/.chloggen/awss3exporter-sending-queue.yaml new file mode 100644 index 000000000000..bf8a2250a51f --- /dev/null +++ b/.chloggen/awss3exporter-sending-queue.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement sending queue for S3 exporter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37274, 36264] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/awss3exporter/README.md b/exporter/awss3exporter/README.md index 1a9dbc44c9ee..d93780c52b76 100644 --- a/exporter/awss3exporter/README.md +++ b/exporter/awss3exporter/README.md @@ -19,21 +19,22 @@ This exporter targets to support proto/json format. The following exporter configuration parameters are supported. -| Name | Description | Default | -|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------| -| `region` | AWS region. | "us-east-1" | -| `s3_bucket` | S3 bucket | | -| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | -| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | -| `role_arn` | the Role ARN to be assumed | | -| `file_prefix` | file prefix defined by user | | -| `marshaler` | marshaler used to produce output data | `otlp_json` | -| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | | +| Name | Description | Default | +|:--------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------| +| `region` | AWS region. | "us-east-1" | +| `s3_bucket` | S3 bucket | | +| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | +| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | +| `role_arn` | the Role ARN to be assumed | | +| `file_prefix` | file prefix defined by user | | +| `marshaler` | marshaler used to produce output data | `otlp_json` | +| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | | | `encoding_file_extension` | file format extension suffix when using the `encoding` configuration option. May be left empty for no suffix to be appended. | | -| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | -| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | -| `disable_ssl` | set this to `true` to disable SSL when sending requests | false | -| `compression` | should the file be compressed | none | +| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | +| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | +| `disable_ssl` | set this to `true` to disable SSL when sending requests | false | +| `compression` | should the file be compressed | none | +| `sending_queue` | [exporters common queuing](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | disabled | ### Marshaler @@ -68,6 +69,12 @@ exporters: s3_bucket: 'databucket' s3_prefix: 'metric' s3_partition: 'minute' + + # Optional (disabled by default) + sending_queue: + enabled: true + num_consumers: 10 + queue_size: 100 ``` Logs and traces will be stored inside 'databucket' in the following path format. diff --git a/exporter/awss3exporter/config.go b/exporter/awss3exporter/config.go index 4514d2bb7643..cac26c285746 100644 --- a/exporter/awss3exporter/config.go +++ b/exporter/awss3exporter/config.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/multierr" ) @@ -49,6 +50,8 @@ const ( // Config contains the main configuration options for the s3 exporter type Config struct { + QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"` + S3Uploader S3UploaderConfig `mapstructure:"s3uploader"` MarshalerName MarshalerType `mapstructure:"marshaler"` diff --git a/exporter/awss3exporter/config_test.go b/exporter/awss3exporter/config_test.go index cd08539581c5..ec72b167d3eb 100644 --- a/exporter/awss3exporter/config_test.go +++ b/exporter/awss3exporter/config_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.uber.org/multierr" @@ -32,7 +33,12 @@ func TestLoadConfig(t *testing.T) { e := cfg.Exporters[component.MustNewID("awss3")].(*Config) encoding := component.MustNewIDWithName("foo", "bar") + + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + assert.Equal(t, &Config{ + QueueSettings: queueCfg, Encoding: &encoding, EncodingFileExtension: "baz", S3Uploader: S3UploaderConfig{ @@ -59,9 +65,16 @@ func TestConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.QueueConfig{ + Enabled: true, + NumConsumers: 23, + QueueSize: 42, + } + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -88,9 +101,13 @@ func TestConfigForS3CompatibleSystems(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -200,9 +217,13 @@ func TestMarshallerName(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -215,6 +236,7 @@ func TestMarshallerName(t *testing.T) { e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "bar", @@ -239,9 +261,13 @@ func TestCompressionName(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -255,6 +281,7 @@ func TestCompressionName(t *testing.T) { e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "bar", diff --git a/exporter/awss3exporter/factory.go b/exporter/awss3exporter/factory.go index da343d63ba0a..ef4464568c6a 100644 --- a/exporter/awss3exporter/factory.go +++ b/exporter/awss3exporter/factory.go @@ -26,7 +26,11 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + return &Config{ + QueueSettings: queueCfg, S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Partition: "minute", @@ -39,19 +43,31 @@ func createLogsExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Logs, error) { - s3Exporter := newS3Exporter(config.(*Config), "logs", params) + cfg, err := checkAndCastConfig(config) + if err != nil { + return nil, err + } + + s3Exporter := newS3Exporter(cfg, "logs", params) return exporterhelper.NewLogs(ctx, params, config, s3Exporter.ConsumeLogs, - exporterhelper.WithStart(s3Exporter.start)) + exporterhelper.WithStart(s3Exporter.start), + exporterhelper.WithQueue(cfg.QueueSettings), + ) } func createMetricsExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Metrics, error) { - s3Exporter := newS3Exporter(config.(*Config), "metrics", params) + cfg, err := checkAndCastConfig(config) + if err != nil { + return nil, err + } + + s3Exporter := newS3Exporter(cfg, "metrics", params) if config.(*Config).MarshalerName == SumoIC { return nil, fmt.Errorf("metrics are not supported by sumo_ic output format") @@ -60,14 +76,21 @@ func createMetricsExporter(ctx context.Context, return exporterhelper.NewMetrics(ctx, params, config, s3Exporter.ConsumeMetrics, - exporterhelper.WithStart(s3Exporter.start)) + exporterhelper.WithStart(s3Exporter.start), + exporterhelper.WithQueue(cfg.QueueSettings), + ) } func createTracesExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Traces, error) { - s3Exporter := newS3Exporter(config.(*Config), "traces", params) + cfg, err := checkAndCastConfig(config) + if err != nil { + return nil, err + } + + s3Exporter := newS3Exporter(cfg, "traces", params) if config.(*Config).MarshalerName == SumoIC { return nil, fmt.Errorf("traces are not supported by sumo_ic output format") @@ -77,5 +100,16 @@ func createTracesExporter(ctx context.Context, params, config, s3Exporter.ConsumeTraces, - exporterhelper.WithStart(s3Exporter.start)) + exporterhelper.WithStart(s3Exporter.start), + exporterhelper.WithQueue(cfg.QueueSettings), + ) +} + +// checkAndCastConfig checks the configuration type and casts it to the S3 exporter Config struct. +func checkAndCastConfig(c component.Config) (*Config, error) { + cfg, ok := c.(*Config) + if !ok { + return nil, fmt.Errorf("config structure is not of type *awss3exporter.Config") + } + return cfg, nil } diff --git a/exporter/awss3exporter/testdata/config.yaml b/exporter/awss3exporter/testdata/config.yaml index 3f0d5808033e..c373fe878cb9 100644 --- a/exporter/awss3exporter/testdata/config.yaml +++ b/exporter/awss3exporter/testdata/config.yaml @@ -3,6 +3,11 @@ receivers: exporters: awss3: + sending_queue: + enabled: true + num_consumers: 23 + queue_size: 42 + s3uploader: region: 'us-east-1' s3_bucket: 'foo'