Skip to content
Open
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Makefile.dep
.idea/*
.ccls
.ccls-cache/*
.DS_Store
compile_commands.json
redis.code-workspace
.cache
Expand Down
90 changes: 59 additions & 31 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2929,11 +2929,12 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
clusterNode *migration_source_node = NULL;

for (j = 0; j < CLUSTER_SLOTS; j++) {
clusterNode *slot_owner = server.cluster->slots[j];
if (bitmapTestBit(slots, j)) {
sender_slots++;

/* The slot is already bound to the sender of this message. */
if (server.cluster->slots[j] == sender) {
if (slot_owner == sender) {
bitmapClearBit(server.cluster->owner_not_claiming_slot, j);
continue;
}
Expand All @@ -2949,15 +2950,15 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* migration, we will accept the topology update regardless of the
* epoch. */
if (isSlotUnclaimed(j) ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch ||
slot_owner->configEpoch < senderConfigEpoch ||
clusterSlotFailoverGranted(j)) {
if (!isSlotUnclaimed(j) && !areInSameShard(server.cluster->slots[j], sender)) {
if (!isSlotUnclaimed(j) && !areInSameShard(slot_owner, sender)) {
if (first_migrated_slot == -1) {
/* Delay-initialize the range of migrated slots. */
first_migrated_slot = j;
last_migrated_slot = j;
migration_source_node = server.cluster->slots[j];
} else if (migration_source_node == server.cluster->slots[j] && j == last_migrated_slot + 1) {
migration_source_node = slot_owner;
} else if (migration_source_node == slot_owner && j == last_migrated_slot + 1) {
/* Extend the range of migrated slots. */
last_migrated_slot = j;
} else {
Expand All @@ -2968,13 +2969,13 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
/* Reset the range for the next slot. */
first_migrated_slot = j;
last_migrated_slot = j;
migration_source_node = server.cluster->slots[j];
migration_source_node = slot_owner;
}
}

/* Was this slot mine, and still contains keys? Mark it as
* a dirty slot. */
if (server.cluster->slots[j] == myself && countKeysInSlot(j) && sender != myself) {
if (slot_owner == myself && countKeysInSlot(j)) {
dirty_slots[dirty_slots_count] = j;
dirty_slots_count++;
}
Expand All @@ -2983,7 +2984,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc

if (clusterIsSlotImporting(j)) importing_slots_count++;

if (server.cluster->slots[j] == cur_primary) {
if (slot_owner == cur_primary) {
new_primary = sender;
migrated_our_slots++;
}
Expand All @@ -3005,15 +3006,15 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc

/* Handle the case where we are importing this slot and the ownership changes */
clusterNode *in = getImportingSlotSource(j);
if (in != NULL &&
in != sender) {
if (in != NULL && in != sender) {
/* Update importing_slots_from to point to the sender, if it is in the
* same shard as the previous slot owner */
if (areInSameShard(sender, in)) {
serverLog(LL_VERBOSE,
serverLog(LL_NOTICE,
"Failover occurred in migration source. Update importing "
"source for slot %d to node %.40s (%s) in shard %.40s.",
j, sender->name, sender->human_nodename, sender->shard_id);
"source for slot %d from node %.40s (%s) to node %.40s (%s) in shard %.40s.",
j, in->name, in->human_nodename,
sender->name, sender->human_nodename, sender->shard_id);
setImportingSlotSource(j, sender);
} else {
/* If the sender is from a different shard, it must be a result
Expand All @@ -3031,7 +3032,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}
} else {
if (server.cluster->slots[j] == sender) {
if (slot_owner == sender) {
/* The slot is currently bound to the sender but the sender is no longer
* claiming it. We don't want to unbind the slot yet as it can cause the cluster
* to move to FAIL state and also throw client error. Keeping the slot bound to
Expand All @@ -3051,7 +3052,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
(mn->configEpoch < senderConfigEpoch ||
nodeIsReplica(mn)) &&
areInSameShard(mn, sender)) {
serverLog(LL_VERBOSE,
serverLog(LL_NOTICE,
"Failover occurred in migration target."
" Slot %d is now being migrated to node %.40s (%s) in shard %.40s.",
j, sender->name, sender->human_nodename, sender->shard_id);
Expand Down Expand Up @@ -3082,11 +3083,11 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
j, sender->name, sender->human_nodename, sender->shard_id);
setImportingSlotSource(j, NULL);
/* Take over the slot ownership if I am not the owner yet*/
if (server.cluster->slots[j] != myself) {
if (slot_owner != myself) {
/* A primary reason why we are here is likely due to my primary crashing during the
* slot finalization process, leading me to become the new primary without
* inheriting the slot ownership, while the source shard continued and relinquished
* theslot to its old primary. Under such circumstances, the node would undergo
* the slot to its old primary. Under such circumstances, the node would undergo
* an election and have its config epoch increased with consensus. That said, we
* will still explicitly bump the config epoch here to be consistent with the
* existing practice.
Expand Down Expand Up @@ -3128,7 +3129,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* its slots, this node should become a replica of the sender if
* one of the following conditions is true:
*
* 1. cluster-allow-replication-migration is enabled
* 1. cluster-allow-replica-migration is enabled
* 2. all the lost slots go to the sender and the sender belongs
* to this node's shard
*
Expand Down Expand Up @@ -3156,7 +3157,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG |
CLUSTER_TODO_BROADCAST_ALL);
} else if (nodeIsPrimary(myself) && (sender_slots >= migrated_our_slots) && !are_in_same_shard) {
} else if (nodeIsPrimary(myself) && sender_slots >= migrated_our_slots) {
/* When all our slots are lost to the sender and the sender belongs to
* a different shard, this is likely due to a client triggered slot
* migration. Don't reconfigure this node to migrate to the new shard
Expand Down Expand Up @@ -3648,8 +3649,8 @@ int clusterProcessPacket(clusterLink *link) {
uint16_t type = ntohs(hdr->type);
if (server.debug_cluster_close_link_on_packet_drop &&
(type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) {
freeClusterLink(link);
serverLog(LL_WARNING, "Closing link for matching packet type %s", clusterGetMessageTypeString(type));
freeClusterLink(link);
return 0;
}
return 1;
Expand Down Expand Up @@ -4093,10 +4094,13 @@ int clusterProcessPacket(clusterLink *link) {
* Or a replica's primary can turn itself into a replica of its other
* replica during a failover. In this case, they are in the same shard,
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1,
!areInSameShard(myself->replicaof->replicaof, myself));
clusterNode *ultimate_primary = myself->replicaof->replicaof;
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a "
"replica of %.40s (%s) from %.40s (%s)",
ultimate_primary->name, ultimate_primary->human_nodename,
myself->replicaof->name, myself->replicaof->human_nodename);
clusterSetPrimary(ultimate_primary, 1,
!areInSameShard(ultimate_primary, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE |
CLUSTER_TODO_FSYNC_CONFIG | CLUSTER_TODO_BROADCAST_ALL);
}
Expand All @@ -4120,15 +4124,14 @@ int clusterProcessPacket(clusterLink *link) {
* instance claims is different compared to the set of slots we have
* for it or if there was a failover in the sender's shard. Check
* this ASAP to avoid other computational expensive checks later.*/

if (sender && sender_claims_to_be_primary &&
(sender_last_reported_as_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) {
/* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */
serverAssert(nodeIsPrimary(sender));

serverLog(LL_NOTICE, "Mismatch in topology information for sender node %.40s (%s) in shard %.40s", sender->name,
sender->human_nodename, sender->shard_id);

serverLog(LL_NOTICE, "%s. Sender node %.40s (%s) in shard %.40s",
sender_last_reported_as_replica ? "Sender last reported as replica" : "Sender changed slots",
sender->name, sender->human_nodename, sender->shard_id);
/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
Expand Down Expand Up @@ -4179,6 +4182,21 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* In some corner case our primary might be failed, and it used to
* have a replica, which has become the new primary. We should start
* following this new primary, otherwise we would start failover and
* eventually become an empty primary.
*/
if (sender && sender_claims_to_be_primary &&
nodeFailed(clusterNodeGetPrimary(myself)) &&
areInSameShard(sender, myself) &&
nodeIsReplica(myself) &&
sender != clusterNodeGetPrimary(myself)) {
serverLog(LL_NOTICE, "Sender %.40s (%s) and I are in the same shard and I should follow it",
sender->name, sender->human_nodename);
clusterSetPrimary(sender, 1, 1);
}

/* If our config epoch collides with the sender's try to fix
* the problem. */
if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) &&
Expand Down Expand Up @@ -4331,7 +4349,8 @@ void clusterLinkConnectHandler(connection *conn) {

/* Check if connection succeeded */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s", node->name, node->ip, node->cport,
serverLog(LL_VERBOSE, "Connection with Node %.40s (%s) at %s:%d failed: %s",
node->name, node->human_nodename, node->ip, node->cport,
connGetLastError(conn));
freeClusterLink(link);
return;
Expand Down Expand Up @@ -5418,6 +5437,15 @@ void clusterHandleReplicaFailover(void) {
}
}

/* A replica might not properly start its replication (e.g. the
* primary didn't reply to the replication request yet).
* In this case, it's not a valid candidate for promotion. */
const long long repl_offset = replicationGetReplicaOffset();
if (repl_offset == 0) {
serverLog(LL_NOTICE, "Cannot start election with zero replication offset");
return;
}

/* If the previous failover attempt timeout and the retry time has
* elapsed, we can setup a new one. */
if (auth_age > auth_retry_time) {
Expand Down Expand Up @@ -5456,9 +5484,9 @@ void clusterHandleReplicaFailover(void) {
"Start of election delayed for %lld milliseconds "
"(rank #%d, primary rank #%d, offset %lld).",
server.cluster->failover_auth_time - now, server.cluster->failover_auth_rank,
server.cluster->failover_failed_primary_rank, replicationGetReplicaOffset());
server.cluster->failover_failed_primary_rank, repl_offset);
/* Now that we have a scheduled election, broadcast our offset
* to all the other replicas so that they'll updated their offsets
* to all the other replicas so that they'll update their offsets
* if our offset is better. */
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_REPLICAS);

Expand Down
6 changes: 3 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3907,13 +3907,13 @@ void syncWithPrimaryHandleError(connection **conn) {
* * Motivation *
* - Reduce primary memory load. We do that by moving the COB tracking to the replica side. This also decrease
* the chance for COB overruns. Note that primary's input buffer limits at the replica side are less restricted
* then primary's COB as the replica plays less critical part in the replication group. While increasing the
* than primary's COB as the replica plays less critical part in the replication group. While increasing the
* primary's COB may end up with primary reaching swap and clients suffering, at replica side we're more at
* ease with it. Larger COB means better chance to sync successfully.
* - Reduce primary main process CPU load. By opening a new, dedicated channel for the RDB transfer, child
* processes can have direct access to the new channel. Due to TLS connection restrictions, this was not
* possible using one main channel. We eliminate the need for the child process to use the primary's
* child-proc -> main-proc pipeline, thus freeing up the main process to process clients queries.
* child-proc -> main-proc pipeline, thus freeing up the main process to handle clients queries.
*
* * High level interface design *
* - Dual channel sync begins when the replica sends a REPLCONF capa dual-channel to the primary during initial
Expand Down Expand Up @@ -4094,7 +4094,7 @@ void syncWithPrimary(connection *conn) {
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_WARNING,
"syncWithPrimary(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
"state should be RECEIVE_PSYNC_REPLY but is %d",
server.repl_state);
syncWithPrimaryHandleError(&conn);
return;
Expand Down
2 changes: 1 addition & 1 deletion src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int
if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE, "Accepted %s:%d", cip, cport);
serverLog(LL_VERBOSE, "Accepted client connection %s:%d", cip, cport);

if (server.tcpkeepalive) anetKeepAlive(NULL, cfd, server.tcpkeepalive);
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL), flags, cip);
Expand Down
2 changes: 1 addition & 1 deletion tests/instances.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ proc pause_on_error {} {
# We redefine 'test' as for Sentinel we don't use the server-client
# architecture for the test, everything is sequential.
proc test {descr code} {
set ts [clock format [clock seconds] -format %H:%M:%S]
set ts [get_current_ts]
puts -nonewline "$ts> $descr: "
flush stdout

Expand Down
42 changes: 38 additions & 4 deletions tests/support/cluster_util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,46 @@ proc fix_cluster {addr} {
# Check if cluster configuration is consistent.
# All the nodes in the cluster should show same slots configuration and have health
# state "online" to be considered as consistent.
proc cluster_config_consistent {} {
# We can optionally provide the node index to ignore. This is useful
# when we deliberately stop a server and wait for the other nodes to
# converge without it.
proc cluster_config_consistent { ignored_idx } {
set base_cfg {}

set ignored_port -1
if { $ignored_idx != {} } {
set ignored_port [srv $ignored_idx port]
}

for {set j 0} {$j < [llength $::servers]} {incr j} {
# Check if all the nodes are online
set res [lsearch -exact ignored_idx $j]
if { $j == $ignored_idx } {
continue
}

# Ensure all nodes believe the cluster is ok
set state [CI $j cluster_state]
if { $state ne "ok" } {
return 0
}

# Check if all the nodes are online, except the one
# we optionally ignore.
set shards_cfg [R $j CLUSTER SHARDS]
foreach shard_cfg $shards_cfg {
set nodes [dict get $shard_cfg nodes]
foreach node $nodes {
set node_port [dict get $node port]
if { $node_port == $ignored_port } {
continue
}
if {[dict get $node health] ne "online"} {
return 0
}
}
}

if {$j == 0} {
if {$base_cfg eq {}} {
set base_cfg [R $j cluster slots]
} else {
if {[R $j cluster slots] != $base_cfg} {
Expand All @@ -126,8 +152,16 @@ proc cluster_size_consistent {cluster_size} {

# Wait for cluster configuration to propagate and be consistent across nodes.
proc wait_for_cluster_propagation {} {
wait_for_cluster_propagation_except_node {}
}

proc wait_for_cluster_propagation_except_node { ignored_idx } {
# wait_for_condition evaluates the code in different scope
# so we need to embed the argument first.
set condition_script [format {cluster_config_consistent {%s}} [join $ignored_idx " "]]

wait_for_condition 1000 50 {
[cluster_config_consistent] eq 1
[eval $condition_script] eq 1
} else {
for {set j 0} {$j < [llength $::servers]} {incr j} {
puts "R $j cluster slots output: [R $j cluster slots]"
Expand Down
Loading
Loading