feat(stream): add support of the CLAIM option for the XREADGROUP command#3278
Open
i18nsite wants to merge 2 commits intoapache:unstablefrom
Open
feat(stream): add support of the CLAIM option for the XREADGROUP command#3278i18nsite wants to merge 2 commits intoapache:unstablefrom
i18nsite wants to merge 2 commits intoapache:unstablefrom
Conversation
This patch implements the CLAIM option for XREADGROUP command, enabling consumers to claim idle pending messages from other consumers in a group. Key features: - Parse and validate CLAIM min-idle-time parameter - Extended reply format with idle_ms and delivery_count for claimed entries - Ordering guarantees: idle entries first (by idle time), then new entries - Correct interaction with NOACK option (claimed entries still in PEL) - Efficient PEL iteration respecting exclude_start option Implementation details: - New StreamReadGroupReadOptions struct to pass CLAIM parameters - Extended StreamEntry with idle_ms and delivery_count fields - Smart response format: 4-element for CLAIM mode, 2-element otherwise - Optimized consumer metadata management for ownership transfers Tests include comprehensive coverage: - Basic claiming and ownership transfer - Ordering guarantees validation - NOACK interaction behavior - Min-idle-time filtering - Delivery count increment tracking
Contributor
Author
|
redis XREADGROUP 有 CLAIM 的返回值会多出一些,官方文档没写 我还需要继续加工下 |
This patch fixes an issue where the delivery count returned in the XREADGROUP CLAIM response was not incremented to reflect the current claim attempt. It now correctly returns the incremented count. Additionally, the tests have been enhanced to: - Verify the delivery count increments correctly upon multiple claims. - Verify the returned idle time is accurate (>= elapsed time). - Verify standard XREADGROUP response format (2 elements) vs CLAIM format (4 elements).
Contributor
Author
fixed |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
https://redis.io/docs/latest/commands/xreadgroup/

feat(stream): Add support for XREADGROUP CLAIM option
English / 中文
English
Overview
This PR implements the
CLAIMoption for theXREADGROUPcommand, enabling consumers to claim idle pending messages from other consumers in a consumer group. This feature is essential for handling failed or slow consumers in distributed stream processing scenarios.Motivation
In distributed stream processing with consumer groups, consumers may fail or become slow, causing messages to remain in their Pending Entries List (PEL) indefinitely. The
CLAIMoption allows other consumers to take over these idle messages, improving fault tolerance and ensuring timely message processing.Changes
Core Implementation
Command Parsing (
src/commands/cmd_stream.cc)CLAIM min-idle-timeparameter parsing with validationAddStreamEntriesToResponsefor conditional formattingStream Processing (
src/types/redis_stream.cc)Stream::RangeWithPendingexclude_startoptionData Structures (
src/types/redis_stream_base.h,src/types/redis_stream.h)StreamEntrywithidle_msanddelivery_countfieldsStreamReadGroupReadOptionsstruct for CLAIM parametersRangeWithPendingsignatureKey Features
[id, fields, idle_ms, delivery_count]Test Coverage
Comprehensive test suite in
tests/gocase/unit/type/stream/xreadgroup_test.go:All tests passing ✅
Usage Example
Redis Specification Compliance
This implementation follows the Redis protocol specification:
>(latest) ID and NOACK optionTesting
Performance
中文
概述
本PR为
XREADGROUP命令实现了CLAIM选项,使消费者能够从消费者组中的其他消费者那里认领空闲的待处理消息。此功能对于处理分布式流处理场景中失败或缓慢的消费者至关重要。动机
在使用消费者组进行分布式流处理时,消费者可能会失败或变慢,导致消息无限期地保留在其待处理条目列表(PEL)中。
CLAIM选项允许其他消费者接管这些空闲消息,提高容错性并确保及时处理消息。更改内容
核心实现
命令解析 (
src/commands/cmd_stream.cc)CLAIM min-idle-time参数解析和验证AddStreamEntriesToResponse以支持条件格式化流处理 (
src/types/redis_stream.cc)Stream::RangeWithPending中实现了完整的认领逻辑exclude_start选项数据结构 (
src/types/redis_stream_base.h,src/types/redis_stream.h)StreamEntry增加idle_ms和delivery_count字段StreamReadGroupReadOptions结构体用于传递CLAIM参数RangeWithPending函数签名关键特性
[id, fields, idle_ms, delivery_count]测试覆盖
tests/gocase/unit/type/stream/xreadgroup_test.go中的完整测试套件:所有测试通过 ✅
使用示例
Redis规范合规性
此实现遵循Redis协议规范:
>(latest) ID和NOACK选项兼容测试
性能