From 5d42c560ed09e5c4fff9613286262da104af0e9d Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Mon, 10 Mar 2025 13:14:18 +0100 Subject: [PATCH] fix(transaction): Fix auto journaling in transaction Signed-off-by: Stepan Bagritsevich --- src/server/stream_family.cc | 4 +++- src/server/transaction.cc | 1 + tests/dragonfly/replication_test.py | 33 +++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 402fbe691b5d..1afd6746098a 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -697,9 +697,11 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& stream* stream_inst = (stream*)it->second.RObjPtr(); - streamID result_id; const auto& parsed_id = opts.parsed_id; streamID passed_id = parsed_id.val; + DCHECK(!op_args.shard->IsReplica() || (op_args.shard->IsReplica() && parsed_id.id_given)); + + streamID result_id; int res = StreamAppendItem(stream_inst, args, op_args.db_cntx.time_now_ms, &result_id, parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index d8b35dc2cd2d..cb631e3780fc 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -499,6 +499,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { kv_fp_.clear(); cid_ = cid; + re_enabled_auto_journal_ = false; cb_ptr_ = nullptr; for (auto& sd : shard_data_) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 12b3e03475c6..b103107f0b8c 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2932,3 +2932,36 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa await wait_for_replicas_state(*c_replicas) await fill_task + + +async def test_autojournaling_in_multi_mode(df_factory): + master = df_factory.create(proactor_threads=1) + replica = df_factory.create(proactor_threads=1) + df_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + await c_master.execute_command("XADD stream * field value") + + for i in range(300): + await c_master.execute_command(f"SADD set {i}") + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + # The first call to XTRIM triggers autojournaling. + # The SPOP command is executed with CO::NO_AUTOJOURNALING. + # This test ensures that the SPOP command is still properly replicated + await c_master.execute_command( + "EVAL", + "redis.call('XTRIM', KEYS[1], 'MINID', '0'); return redis.call('SPOP', KEYS[2]);", + 2, + "stream", + "set", + ) + + # Check replica data consistent + replica_data = await StaticSeeder.capture(c_replica) + master_data = await StaticSeeder.capture(c_master) + assert master_data == replica_data