Skip to content

Commit e434f62

Browse files
authored
fix: crash for delconsumer during stream reading (#4513)
fix: crash for delconsumer during reading stream
1 parent 2d85f59 commit e434f62

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

src/server/stream_family.cc

+7-3
Original file line numberDiff line numberDiff line change
@@ -2335,17 +2335,21 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
23352335
if (sitem.group) {
23362336
range_opts.consumer =
23372337
FindOrAddConsumer(opts->consumer_name, sitem.group, GetCurrentTimeMs());
2338-
}
2338+
sitem.consumer = range_opts.consumer;
2339+
if (!sitem.consumer) {
2340+
return OpStatus::OUT_OF_MEMORY;
2341+
}
23392342

2340-
range_opts.noack = opts->noack;
2341-
if (sitem.consumer) {
23422343
if (sitem.consumer->pel->numnodes == 0) {
23432344
LOG(DFATAL) << "Internal error when accessing consumer data, seen_time "
23442345
<< sitem.consumer->seen_time;
23452346
result = OpStatus::CANCELLED;
23462347
return OpStatus::OK;
23472348
}
23482349
}
2350+
2351+
range_opts.noack = opts->noack;
2352+
23492353
result = OpRange(t->GetOpArgs(shard), *wake_key, range_opts);
23502354
key = *wake_key;
23512355
}

src/server/stream_family_test.cc

+19
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,25 @@ TEST_F(StreamFamilyTest, XReadGroupBlock) {
400400
EXPECT_THAT(resp0, ErrArg("consumer group this client was blocked on no longer exists"));
401401
}
402402

403+
TEST_F(StreamFamilyTest, XReadGroupBlockDelconsumer) {
404+
Run({"XGROUP", "CREATE", "foo", "group", "0", "MKSTREAM"});
405+
406+
RespExpr resp0;
407+
auto fb0 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] {
408+
resp0 = Run({"XREADGROUP", "GROUP", "group", "alice", "BLOCK", "0", "streams", "foo", ">"});
409+
});
410+
ThisFiber::SleepFor(50us);
411+
412+
// Del consumer while it's blocked
413+
RespExpr resp_del_consumer = Run({"XGROUP", "DELCONSUMER", "foo", "group", "alice"});
414+
415+
pp_->at(1)->Await([&] { return Run("xadd", {"XADD", "foo", "1-0", "k1", "v1"}); });
416+
fb0.Join();
417+
418+
EXPECT_THAT(resp0.GetVec(), ElementsAre("foo", ArrLen(1)));
419+
EXPECT_THAT(resp_del_consumer, IntArg(0));
420+
}
421+
403422
TEST_F(StreamFamilyTest, XReadInvalidArgs) {
404423
// Invalid COUNT value.
405424
auto resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "0", "0"});

0 commit comments

Comments
 (0)