Skip to content

Commit caa86cf

Browse files
jmacdmwear
andauthored
[otelarrowexporter] Support the new experimental batcher (open-telemetry#34802)
**Description:** Add exportbatcher.BatcherConfig to OTel-Arrow exporter. Follows open-telemetry/opentelemetry-collector#10846 as we intend to maintain parity between OTLP/gRPC and OTel-Arrow exporters. **Link to tracking Issue:** **Testing:** ✅ **Documentation:** README updated with reference to new batcher and [concurrent batch processor](https://github.com/open-telemetry/otel-arrow/tree/main/collector/processor/concurrentbatchprocessor) in the otel-arrow repo. --------- Co-authored-by: Matthew Wear <matthew.wear@gmail.com>
1 parent cb7f220 commit caa86cf

File tree

6 files changed

+98
-0
lines changed

6 files changed

+98
-0
lines changed

.chloggen/otelarrow-batcher.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: otelarrowexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add BatcherConfig field following similar in OTLP exporter.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34802]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/otelarrowexporter/README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,47 @@ exporters:
241241
zstd:
242242
level: 1 # 1 is the "fastest" compression level
243243
```
244+
245+
### Batching Configuration
246+
247+
This exporter includes a new, experimental `batcher` configuration for
248+
batching in the `exporterhelper` module, but this mode is disabled by
249+
default. This batching support works when combined with
250+
`queue_sender` functionality.
251+
252+
```
253+
exporters:
254+
otelarrow:
255+
batcher:
256+
enabled: true
257+
sending_queue:
258+
enabled: true
259+
storage: file_storage/otc
260+
extensions:
261+
file_storage/otc:
262+
directory: /var/lib/storage/otc
263+
```
264+
265+
The built-in batcher is only recommended with a persistent queue,
266+
otherwise it cannot provide back-pressure to the caller. If building
267+
a custom build of the OpenTelemetry Collector, we recommend using the
268+
[Concurrent Batch
269+
Processor](https://github.com/open-telemetry/otel-arrow/blob/main/collector/processor/concurrentbatchprocessor/README.md)
270+
to provide simultaneous back-pressure, concurrency, and batching
271+
functionality. See [more discussion on this
272+
issue](https://github.com/open-telemetry/opentelemetry-collector/issues/10368).
273+
274+
```
275+
exporters:
276+
otelarrow:
277+
batcher:
278+
enabled: false
279+
sending_queue:
280+
enabled: false
281+
processors:
282+
concurrentbatch:
283+
send_batch_max_size: 1500
284+
send_batch_size: 1000
285+
timeout: 1s
286+
max_in_flight_size_mib: 128
287+
```

exporter/otelarrowexporter/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.opentelemetry.io/collector/config/configcompression"
1313
"go.opentelemetry.io/collector/config/configgrpc"
1414
"go.opentelemetry.io/collector/config/configretry"
15+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1516
"go.opentelemetry.io/collector/exporter/exporterhelper"
1617
"google.golang.org/grpc"
1718

@@ -32,6 +33,10 @@ type Config struct {
3233

3334
configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
3435

36+
// Experimental: This configuration is at the early stage of development and may change without backward compatibility
37+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved
38+
BatcherConfig exporterbatcher.Config `mapstructure:"batcher"`
39+
3540
// Arrow includes settings specific to OTel Arrow.
3641
Arrow ArrowConfig `mapstructure:"arrow"`
3742

exporter/otelarrowexporter/config_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"go.opentelemetry.io/collector/config/configretry"
2121
"go.opentelemetry.io/collector/config/configtls"
2222
"go.opentelemetry.io/collector/confmap/confmaptest"
23+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
2324
"go.opentelemetry.io/collector/exporter/exporterhelper"
2425

2526
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow"
@@ -84,6 +85,16 @@ func TestUnmarshalConfig(t *testing.T) {
8485
BalancerName: "experimental",
8586
Auth: &configauth.Authentication{AuthenticatorID: component.NewID(component.MustNewType("nop"))},
8687
},
88+
BatcherConfig: exporterbatcher.Config{
89+
Enabled: true,
90+
FlushTimeout: 200 * time.Millisecond,
91+
MinSizeConfig: exporterbatcher.MinSizeConfig{
92+
MinSizeItems: 1000,
93+
},
94+
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
95+
MaxSizeItems: 10000,
96+
},
97+
},
8798
Arrow: ArrowConfig{
8899
NumStreams: 2,
89100
MaxStreamLifetime: 2 * time.Hour,

exporter/otelarrowexporter/factory.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.opentelemetry.io/collector/config/configretry"
1717
"go.opentelemetry.io/collector/consumer"
1818
"go.opentelemetry.io/collector/exporter"
19+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper"
2021
"google.golang.org/grpc"
2122

@@ -37,10 +38,14 @@ func NewFactory() exporter.Factory {
3738
}
3839

3940
func createDefaultConfig() component.Config {
41+
batcherCfg := exporterbatcher.NewDefaultConfig()
42+
batcherCfg.Enabled = false
43+
4044
return &Config{
4145
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
4246
RetryConfig: configretry.NewDefaultBackOffConfig(),
4347
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
48+
BatcherConfig: batcherCfg,
4449
ClientConfig: configgrpc.ClientConfig{
4550
Headers: map[string]configopaque.String{},
4651
// Default to zstd compression
@@ -74,6 +79,7 @@ func (exp *baseExporter) helperOptions() []exporterhelper.Option {
7479
exporterhelper.WithRetry(exp.config.RetryConfig),
7580
exporterhelper.WithQueue(exp.config.QueueSettings),
7681
exporterhelper.WithStart(exp.start),
82+
exporterhelper.WithBatcher(exp.config.BatcherConfig),
7783
exporterhelper.WithShutdown(exp.shutdown),
7884
}
7985
}

exporter/otelarrowexporter/testdata/config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ keepalive:
2525
timeout: 30s
2626
permit_without_stream: true
2727
balancer_name: "experimental"
28+
batcher:
29+
enabled: true
30+
flush_timeout: 200ms
31+
min_size_items: 1000
32+
max_size_items: 10000
2833
arrow:
2934
num_streams: 2
3035
disabled: false

0 commit comments

Comments
 (0)