Skip to content

Commit

Permalink
fix(server): deadlock with replicaof inside multi (#4685)
Browse files Browse the repository at this point in the history
* fix server: fix deadlock with replicaof inside multi

Signed-off-by: adi_holden <adi@dragonflydb.io>
  • Loading branch information
adiholden authored and romange committed Mar 9, 2025
1 parent cf47d68 commit 4f91030
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
const ScriptMgr& script_mgr) {
// Check if script most LIKELY has global eval transactions
bool contains_global = false;
bool contains_admin_cmd = false;
Transaction::MultiMode multi_mode = Transaction::LOCK_AHEAD;

if (state == ExecScriptUse::SCRIPT_RUN) {
Expand All @@ -670,6 +671,7 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
transactional |= scmd.Cid()->IsTransactional();
}
contains_global |= scmd.Cid()->opt_mask() & CO::GLOBAL_TRANS;
contains_admin_cmd |= scmd.Cid()->opt_mask() & CO::ADMIN;

// We can't run no-key-transactional commands in lock-ahead mode currently,
// because it means we have to schedule on all shards
Expand All @@ -685,9 +687,13 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
if (!transactional && exec_info.watched_keys.empty())
return Transaction::NOT_DETERMINED;

if (contains_admin_cmd) {
multi_mode = Transaction::NON_ATOMIC;
}
// Atomic modes fall back to GLOBAL if they contain global commands.
if (contains_global && multi_mode == Transaction::LOCK_AHEAD)
else if (contains_global && multi_mode == Transaction::LOCK_AHEAD) {
multi_mode = Transaction::GLOBAL;
}

return multi_mode;
}
Expand Down
32 changes: 32 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2799,3 +2799,35 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa
await wait_for_replicas_state(*c_replicas)

await fill_task


@dfly_args({"proactor_threads": 2})
async def test_replicaof_inside_multi(df_factory):
master = df_factory.create()
replica = df_factory.create()
df_factory.start_all([master, replica])

async def replicate_inside_multi():
try:
c_master = master.client()
p = c_master.pipeline(transaction=True)
for i in range(5):
p.execute_command("dbsize")
p.execute_command(f"replicaof localhost {replica.port}")
await p.execute()
return True
except redis.exceptions.ResponseError:
return False

MULTI_COMMANDS_TO_ISSUE = 30
replication_commands = [
asyncio.create_task(replicate_inside_multi()) for _ in range(MULTI_COMMANDS_TO_ISSUE)
]

num_successes = 0
for result in asyncio.as_completed(replication_commands, timeout=80):
num_successes += await result

logging.info(f"succeses: {num_successes}")
assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
assert num_successes > 0, "At least one REPLICAOF must success"

0 comments on commit 4f91030

Please sign in to comment.