-
Notifications
You must be signed in to change notification settings - Fork 64
Description
Feature Request
Currently, the receive_batch method in the YDB Python SDK returns a batch of messages as soon as any messages are available, regardless of the batch size. The max_messages parameter sets an upper limit, and timeout only applies when no messages are present at all. This behavior makes it difficult to implement efficient batching strategies where the consumer waits for either:
- A full batch of
max_messages, or - A certain amount of time (e.g., 100ms), whichever comes first.
This feature is important for optimizing downstream processing performance by aggregating more messages per batch, reducing per-message overhead and improving throughput.
Without this capability, users are forced to either:
- Use
receivein a loop (inefficient due to higher latency and more round-trips), or - Accept suboptimal batch sizes, leading to underutilized processing capacity.
Preferred Solution
Introduce a new parameter (e.g., batching_timeout or wait_for_full_batch) in the receive_batch method that defines the maximum time to wait for the batch to fill up to max_messages. The method would:
- Return immediately if
max_messagesare available. - Otherwise, wait up to
batching_timeoutfor more messages to arrive before returning the current batch. - Still respect the overall
timeoutfor cases where no messages are available at all.
Example:
batch = consumer.receive_batch(
max_messages=1000,
batching_timeout=0.1, # Wait up to 100ms to fill the batch
timeout=5.0 # Overall wait if no messages at all
)Alternatives
- Client-side batching with receive(): Manually call receive() in a loop with a small timeout and accumulate messages. However, this increases RPC overhead and is significantly slower, especially under high load (as noted in commit performance issues with commit_with_ack).
- Polling with delays: Not efficient and introduces unnecessary latency or load.
- Using lower-level gRPC streaming directly: Possible but defeats the purpose of using a high-level SDK and increases complexity.
None of these alternatives match the efficiency and simplicity of server-side or SDK-level support for batch aggregation with timeout.
# Current usage
batch = consumer.receive_batch(max_messages=1000, timeout=5)
# Desired behavior with batching timeout
batch = consumer.receive_batch(
max_messages=1000,
batching_timeout=0.2, # Wait to fill batch up to 200ms
timeout=5
)