Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 62 additions & 11 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/

#include <algorithm>
#include <limits>
#include <memory>
#include <stdexcept>

Expand Down Expand Up @@ -54,9 +53,20 @@ CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args, uint3
void AddStreamEntriesToResponse(std::string *output, const std::vector<StreamEntry> &entries) {
output->append(redis::MultiLen(entries.size()));
for (const auto &entry : entries) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(entry.key));
output->append(redis::ArrayOfBulkStrings(entry.values));
// Check if this entry has CLAIM metadata (idle_ms >= 0 means it was claimed)
if (entry.idle_ms >= 0) {
// Extended format for CLAIM: [id, [fields...], idle_ms, delivery_count]
output->append(redis::MultiLen(4));
output->append(redis::BulkString(entry.key));
output->append(redis::ArrayOfBulkStrings(entry.values));
output->append(redis::Integer(entry.idle_ms));
output->append(redis::Integer(entry.delivery_count));
} else {
// Standard format: [id, [fields...]]
output->append(redis::MultiLen(2));
output->append(redis::BulkString(entry.key));
output->append(redis::ArrayOfBulkStrings(entry.values));
}
}
}

Expand Down Expand Up @@ -1411,6 +1421,8 @@ class CommandXRead : public Commander,
int blocked_default_count_ = 1000;
bool with_count_ = false;
bool block_ = false;
bool noack_ = false;
int64_t min_idle_time_ms_ = -1;

void unblockAll() { srv_->UnblockOnStreams(streams_, conn_); }
};
Expand Down Expand Up @@ -1475,6 +1487,24 @@ class CommandXReadGroup : public Commander,

if (arg == "noack") {
noack_ = true;
++i;
continue;
}

if (arg == "claim") {
if (i + 1 >= args.size()) {
return {Status::RedisParseErr, errInvalidSyntax};
}
auto parse_result = ParseInt<int64_t>(args[i + 1], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*parse_result < 0) {
return {Status::RedisParseErr, "min-idle-time must be non-negative"};
}
min_idle_time_ms_ = *parse_result;
i += 2;
continue;
}

++i;
Expand Down Expand Up @@ -1527,8 +1557,14 @@ class CommandXReadGroup : public Commander,
options.exclude_end = false;

std::vector<StreamEntry> result;
auto s = stream_db.RangeWithPending(ctx, streams_[i], options, &result, group_name_, consumer_name_, noack_,
latest_marks_[i]);
redis::StreamReadGroupReadOptions read_options;
read_options.group_name = group_name_;
read_options.consumer_name = consumer_name_;
read_options.noack = noack_;
read_options.latest = latest_marks_[i];
read_options.min_idle_time_ms = min_idle_time_ms_;

auto s = stream_db.RangeWithPending(ctx, streams_[i], options, &result, read_options);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -1573,13 +1609,21 @@ class CommandXReadGroup : public Commander,
output->append(redis::BulkString(result.name));
output->append(redis::MultiLen(result.entries.size()));
for (const auto &entry : result.entries) {
output->append(redis::MultiLen(2));
if (entry.idle_ms >= 0) {
output->append(redis::MultiLen(4));
} else {
output->append(redis::MultiLen(2));
}
output->append(redis::BulkString(entry.key));
if (entry.values.size() == 0 && !latest_marks_[id]) {
output->append(conn->NilString());
continue;
} else {
output->append(conn->MultiBulkString(entry.values));
}
if (entry.idle_ms >= 0) {
output->append(redis::Integer(entry.idle_ms));
output->append(redis::Integer(entry.delivery_count));
}
output->append(conn->MultiBulkString(entry.values));
}
++id;
}
Expand Down Expand Up @@ -1650,8 +1694,14 @@ class CommandXReadGroup : public Commander,
options.exclude_end = false;

std::vector<StreamEntry> result;
auto s = stream_db.RangeWithPending(ctx, streams_[i], options, &result, group_name_, consumer_name_, noack_,
latest_marks_[i]);
redis::StreamReadGroupReadOptions read_options;
read_options.group_name = group_name_;
read_options.consumer_name = consumer_name_;
read_options.noack = noack_;
read_options.latest = latest_marks_[i];
read_options.min_idle_time_ms = min_idle_time_ms_;

auto s = stream_db.RangeWithPending(ctx, streams_[i], options, &result, read_options);
if (!s.ok() && !s.IsNotFound()) {
conn_->Reply(redis::Error({Status::NotOK, s.ToString()}));
return;
Expand Down Expand Up @@ -1716,6 +1766,7 @@ class CommandXReadGroup : public Commander,
std::string group_name_;
std::string consumer_name_;
bool noack_ = false;
int64_t min_idle_time_ms_ = -1;

Server *srv_ = nullptr;
Connection *conn_ = nullptr;
Expand Down
155 changes: 139 additions & 16 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,8 @@ rocksdb::Status Stream::Range(engine::Context &ctx, const Slice &stream_name, co
}

rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stream_name, StreamRangeOptions &options,
std::vector<StreamEntry> *entries, std::string &group_name,
std::string &consumer_name, bool noack, bool latest) {
std::vector<StreamEntry> *entries,
const StreamReadGroupReadOptions &read_options) {
entries->clear();

if (options.with_count && options.count == 0) {
Expand All @@ -1452,20 +1452,22 @@ rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stre
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}

std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string group_key = internalKeyFromGroupName(ns_key, metadata, read_options.group_name);
std::string get_group_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value);
if (!s.ok()) return s;

std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string consumer_key =
internalKeyFromConsumerName(ns_key, metadata, read_options.group_name, read_options.consumer_name);
std::string get_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
int created_number = 0;
s = createConsumerWithoutLock(ctx, stream_name, group_name, consumer_name, &created_number);
s = createConsumerWithoutLock(ctx, stream_name, read_options.group_name, read_options.consumer_name,
&created_number);
if (!s.ok()) {
return s;
}
Expand All @@ -1487,7 +1489,8 @@ rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stre
consumer_metadata.last_attempted_interaction_ms = now_ms;
consumer_metadata.last_successful_interaction_ms = now_ms;

if (latest) {
if (read_options.latest && read_options.min_idle_time_ms < 0) {
// No CLAIM, just consume new messages
options.start = consumergroup_metadata.last_delivered_id;
s = range(ctx, ns_key, metadata, options, entries);
if (!s.ok()) {
Expand All @@ -1503,9 +1506,9 @@ rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stre
if (id > maxid) {
maxid = id;
}
if (!noack) {
std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
StreamPelEntry pel_entry = {now_ms, 1, consumer_name};
if (!read_options.noack) {
std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, read_options.group_name, id);
StreamPelEntry pel_entry = {now_ms, 1, read_options.consumer_name};
std::string pel_value = encodeStreamPelEntryValue(pel_entry);
s = batch->Put(stream_cf_handle_, pel_key, pel_value);
if (!s.ok()) return s;
Expand All @@ -1517,23 +1520,143 @@ rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stre
if (maxid > consumergroup_metadata.last_delivered_id) {
consumergroup_metadata.last_delivered_id = maxid;
}
} else if (read_options.min_idle_time_ms >= 0) {
// CLAIM option: First try to claim idle pending entries
std::string prefix_key =
internalPelKeyFromGroupAndEntryId(ns_key, metadata, read_options.group_name, StreamEntryID::Minimum());
std::string end_key =
internalPelKeyFromGroupAndEntryId(ns_key, metadata, read_options.group_name, StreamEntryID::Maximum());

rocksdb::ReadOptions read_options_db = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(end_key);
read_options_db.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options_db.iterate_lower_bound = &lower_bound;

auto iter = util::UniqueIterator(ctx, read_options_db, stream_cf_handle_);
uint64_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());

if (now_ms - pel_entry.last_delivery_time_ms < static_cast<uint64_t>(read_options.min_idle_time_ms)) {
continue;
}

std::string raw_value;
rocksdb::Status st = getEntryRawValue(ctx, ns_key, metadata, entry_id, &raw_value);
if (!st.ok()) {
if (st.IsNotFound()) continue; // Entry might have been deleted
return st;
}

std::vector<std::string> values;
auto rv = DecodeRawStreamEntryValue(raw_value, &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
// Include CLAIM metadata: idle_ms and delivery_count
// Include CLAIM metadata: idle_ms and delivery_count
int64_t idle_ms = static_cast<int64_t>(now_ms - pel_entry.last_delivery_time_ms);

// Claim the entry
if (pel_entry.consumer_name != read_options.consumer_name) {
// Update old consumer's pending count
std::string old_consumer_key =
internalKeyFromConsumerName(ns_key, metadata, read_options.group_name, pel_entry.consumer_name);
std::string get_old_consumer_value;
s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, old_consumer_key, &get_old_consumer_value);
if (s.ok()) {
StreamConsumerMetadata old_consumer_metadata = decodeStreamConsumerMetadataValue(get_old_consumer_value);
if (old_consumer_metadata.pending_number > 0) {
old_consumer_metadata.pending_number -= 1;
s = batch->Put(stream_cf_handle_, old_consumer_key,
encodeStreamConsumerMetadataValue(old_consumer_metadata));
if (!s.ok()) return s;
}
}

pel_entry.consumer_name = read_options.consumer_name;
consumer_metadata.pending_number += 1;
}

pel_entry.last_delivery_count += 1;
entries->emplace_back(entry_id.ToString(), std::move(values), idle_ms, pel_entry.last_delivery_count);
pel_entry.last_delivery_time_ms = now_ms;
s = batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(pel_entry));
if (!s.ok()) return s;

++count;
if (count >= options.count) break;
}
if (auto s = iter->status(); !s.ok()) {
return s;
}

// If latest=true and we haven't reached the count, also consume new messages
if (read_options.latest && count < options.count) {
options.start = consumergroup_metadata.last_delivered_id;
std::vector<StreamEntry> new_entries;
StreamRangeOptions new_options = options;
new_options.count = options.count - count;
s = range(ctx, ns_key, metadata, new_options, &new_entries);
if (!s.ok()) {
return s;
}
StreamEntryID maxid = {0, 0};
for (const auto &entry : new_entries) {
StreamEntryID id;
Status st = ParseStreamEntryID(entry.key, &id);
if (!st.IsOK()) {
return rocksdb::Status::InvalidArgument(st.Msg());
}
if (id > maxid) {
maxid = id;
}
if (!read_options.noack) {
std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, read_options.group_name, id);
StreamPelEntry pel_entry = {now_ms, 1, read_options.consumer_name};
std::string pel_value = encodeStreamPelEntryValue(pel_entry);
s = batch->Put(stream_cf_handle_, pel_key, pel_value);
if (!s.ok()) return s;
consumergroup_metadata.entries_read += 1;
consumergroup_metadata.pending_number += 1;
consumer_metadata.pending_number += 1;
}
// New messages in CLAIM mode should also have 4-element format with idle_ms=0, delivery_count=0
entries->emplace_back(entry.key, std::move(entry.values), 0, 0);
}
if (maxid > consumergroup_metadata.last_delivered_id) {
consumergroup_metadata.last_delivered_id = maxid;
}
}
} else {
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start);
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, StreamEntryID::Maximum());
// No CLAIM, read from pending list
std::string prefix_key =
internalPelKeyFromGroupAndEntryId(ns_key, metadata, read_options.group_name, options.start);
std::string end_key =
internalPelKeyFromGroupAndEntryId(ns_key, metadata, read_options.group_name, StreamEntryID::Maximum());

rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::ReadOptions read_options_db = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;
read_options_db.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
read_options_db.iterate_lower_bound = &lower_bound;

auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
auto iter = util::UniqueIterator(ctx, read_options_db, stream_cf_handle_);
uint64_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);

// Skip entry if exclude_start is true and it matches options.start
if (options.exclude_start && entry_id == options.start) {
continue;
}

StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (pel_entry.consumer_name != consumer_name) continue;
if (pel_entry.consumer_name != read_options.consumer_name) continue;
std::string raw_value;
rocksdb::Status st = getEntryRawValue(ctx, ns_key, metadata, entry_id, &raw_value);
if (!st.ok() && !st.IsNotFound()) {
Expand Down
3 changes: 1 addition & 2 deletions src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class Stream : public SubKeyScanner {
rocksdb::Status Range(engine::Context &ctx, const Slice &stream_name, const StreamRangeOptions &options,
std::vector<StreamEntry> *entries);
rocksdb::Status RangeWithPending(engine::Context &ctx, const Slice &stream_name, StreamRangeOptions &options,
std::vector<StreamEntry> *entries, std::string &group_name,
std::string &consumer_name, bool noack, bool latest);
std::vector<StreamEntry> *entries, const StreamReadGroupReadOptions &read_options);
rocksdb::Status Trim(engine::Context &ctx, const Slice &stream_name, const StreamTrimOptions &options,
uint64_t *delete_cnt);
rocksdb::Status GetMetadata(engine::Context &ctx, const Slice &stream_name, StreamMetadata *metadata);
Expand Down
14 changes: 14 additions & 0 deletions src/types/redis_stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,13 @@ struct StreamEntry {
std::string key;
std::vector<std::string> values;

// Optional metadata for CLAIM reply extension
int64_t idle_ms = -1; // Milliseconds since last delivery
uint64_t delivery_count = 0; // Number of times previously delivered

StreamEntry(std::string k, std::vector<std::string> vv) : key(std::move(k)), values(std::move(vv)) {}
StreamEntry(std::string k, std::vector<std::string> vv, int64_t idle, uint64_t count)
: key(std::move(k)), values(std::move(vv)), idle_ms(idle), delivery_count(count) {}
};

struct StreamTrimOptions {
Expand Down Expand Up @@ -267,6 +273,14 @@ struct StreamNACK {
StreamPelEntry pel_entry;
};

struct StreamReadGroupReadOptions {
std::string group_name;
std::string consumer_name;
bool noack = false;
bool latest = false;
int64_t min_idle_time_ms = -1;
};

Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> ParseNextStreamEntryIDStrategy(const std::string &input);
Expand Down
Loading
Loading