Skip to content

Commit

Permalink
fix(transaction): Fix auto journaling in transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
  • Loading branch information
BagritsevichStepan committed Mar 10, 2025
1 parent 74f5f14 commit 5d42c56
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,11 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&

stream* stream_inst = (stream*)it->second.RObjPtr();

streamID result_id;
const auto& parsed_id = opts.parsed_id;
streamID passed_id = parsed_id.val;
DCHECK(!op_args.shard->IsReplica() || (op_args.shard->IsReplica() && parsed_id.id_given));

streamID result_id;
int res = StreamAppendItem(stream_inst, args, op_args.db_cntx.time_now_ms, &result_id,
parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq);

Expand Down
1 change: 1 addition & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
kv_fp_.clear();

cid_ = cid;
re_enabled_auto_journal_ = false;
cb_ptr_ = nullptr;

for (auto& sd : shard_data_) {
Expand Down
33 changes: 33 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2932,3 +2932,36 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa
await wait_for_replicas_state(*c_replicas)

await fill_task


async def test_autojournaling_in_multi_mode(df_factory):
master = df_factory.create(proactor_threads=1)
replica = df_factory.create(proactor_threads=1)
df_factory.start_all([master, replica])

c_master = master.client()
c_replica = replica.client()

await c_master.execute_command("XADD stream * field value")

for i in range(300):
await c_master.execute_command(f"SADD set {i}")

await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)

# The first call to XTRIM triggers autojournaling.
# The SPOP command is executed with CO::NO_AUTOJOURNALING.
# This test ensures that the SPOP command is still properly replicated
await c_master.execute_command(
"EVAL",
"redis.call('XTRIM', KEYS[1], 'MINID', '0'); return redis.call('SPOP', KEYS[2]);",
2,
"stream",
"set",
)

# Check replica data consistent
replica_data = await StaticSeeder.capture(c_replica)
master_data = await StaticSeeder.capture(c_master)
assert master_data == replica_data

0 comments on commit 5d42c56

Please sign in to comment.