kq: 支持 kafka.Writer BatchTimeout/BatchSize/BatchBytes 配置 #96
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
涉及文件:
Greptile Overview
Updated On: 2025-11-07 08:35:18 UTC
Greptile Summary
This PR adds three new configuration options to the Kafka pusher component to expose native kafka-go writer batch configuration settings. The changes introduce
WithBatchTimeout,WithBatchSize, andWithBatchBytesoptions that allow users to control when message batches are flushed based on time intervals, message counts, or byte sizes respectively. These new options follow the existing functional options pattern used throughout thekqpackage, maintaining consistency with other configuration functions likeWithBalancerandWithChunkSize. The implementation adds the new fields to thepushOptionsstruct and applies them to the underlyingkafka.Writerduring initialization, only setting values when they are positive to avoid overriding defaults with zero values.Important Files Changed
Confidence score: 4/5
syncPushunconditionally overrides user-configuredbatchSizeWithSyncPushandWithBatchSizeoptions in kq/pusher.goSequence Diagram
sequenceDiagram participant User participant Pusher participant kafka.Writer participant ChunkExecutor User->>Pusher: NewPusher(addrs, topic, opts) Pusher->>kafka.Writer: new kafka.Writer Note over Pusher: Apply batch options (timeout, size, bytes) alt syncPush enabled Note over Pusher: Set BatchSize=1, no ChunkExecutor else async push (default) Pusher->>ChunkExecutor: NewChunkExecutor with options end Pusher-->>User: return Pusher instance User->>Pusher: Push(ctx, value) or PushWithKey(ctx, key, value) Note over Pusher: Create kafka.Message with key/value Note over Pusher: Inject trace context via OpenTelemetry alt has ChunkExecutor (async) Pusher->>ChunkExecutor: Add(msg, len(value)) Note over ChunkExecutor: Buffer messages until batch ready ChunkExecutor->>kafka.Writer: WriteMessages(ctx, batch...) else no ChunkExecutor (sync) Pusher->>kafka.Writer: WriteMessages(ctx, msg) end kafka.Writer-->>Pusher: return error/nil Pusher-->>User: return error/nil User->>Pusher: Close() alt has ChunkExecutor Pusher->>ChunkExecutor: Flush() ChunkExecutor->>kafka.Writer: WriteMessages(ctx, remaining_batch...) end Pusher->>kafka.Writer: Close() kafka.Writer-->>Pusher: return error/nil Pusher-->>User: return error/nil