From 189ec6ced8d03b298e1a0293d894b10e7fa1fa9c Mon Sep 17 00:00:00 2001 From: mudit-saxena Date: Sat, 8 Mar 2025 20:07:02 +0530 Subject: [PATCH] Prioritization - Take % of remoteReplicas to mimic prioritization --- .../com/github/ambry/config/ReplicationConfig.java | 13 +++++++++++++ .../com/github/ambry/replication/ReplicaThread.java | 12 ++++++++++-- .../replication/continuous/DataNodeTracker.java | 10 ++++++++-- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java b/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java index 7d4e13806d..1ce4fb2fec 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java @@ -350,6 +350,17 @@ public class ReplicationConfig { @Config(BACKUP_CHECKER_REPORT_DIR) public final int maxBackupCheckerReportFd; + @Config(REPLICATION_ENABLE_PRIORITIZATION) + @Default("false") + public final boolean replicationEnablePrioritzation; + public final static String REPLICATION_ENABLE_PRIORITIZATION = "replication.enable.prioritization"; + + @Config(REPLICATION_MAX_PRIORITIZED_REPLICAS_PERCENT) + @Default("100") + public final int replicationMaxPrioritizedReplicasPercent; + public final static String REPLICATION_MAX_PRIORITIZED_REPLICAS_PERCENT = "replication.max.prioritized.replicas.percent"; + + public ReplicationConfig(VerifiableProperties verifiableProperties) { maxReplicationRetryCount = @@ -428,5 +439,7 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getBoolean(REPLICATION_USING_NONBLOCKING_NETWORK_CLIENT_FOR_REMOTE_COLO, false); replicationUsingNonblockingNetworkClientForLocalColo = verifiableProperties.getBoolean(REPLICATION_USING_NONBLOCKING_NETWORK_CLIENT_FOR_LOCAL_COLO, false); + replicationEnablePrioritzation = verifiableProperties.getBoolean(REPLICATION_ENABLE_PRIORITIZATION, false); + replicationMaxPrioritizedReplicasPercent = verifiableProperties.getInt(REPLICATION_MAX_PRIORITIZED_REPLICAS_PERCENT, 100); } } diff --git a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicaThread.java b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicaThread.java index 454433b33c..7d0c75badc 100644 --- a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicaThread.java +++ b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicaThread.java @@ -593,12 +593,20 @@ public void replicateCyclic() { standbyReplicasTimedOutOnNoProgress); if (activeReplicasPerNode.size() > 0) { + + // maxReplicaCountPerRequest = 20 List> activeReplicaSubLists = maxReplicaCountPerRequest > 0 ? Utils.partitionList(activeReplicasPerNode, maxReplicaCountPerRequest) : Collections.singletonList(activeReplicasPerNode); for (List replicaSubList : activeReplicaSubLists) { + int size = replicaSubList.size(); + if (replicationConfig.replicationEnablePrioritzation) { + int maxSize = replicationConfig.replicationMaxPrioritizedReplicasPercent/100 * size; + size = Math.min(size, maxSize); + } + RemoteReplicaGroup group = - new RemoteReplicaGroup(replicaSubList, remoteNode, false, remoteReplicaGroupId++); + new RemoteReplicaGroup(replicaSubList.subList(0, size), remoteNode, false, remoteReplicaGroupId++); remoteReplicaGroups.add(group); } } @@ -2226,7 +2234,7 @@ private void fillDataNodeTrackers() { DataNodeTracker dataNodeTracker = new DataNodeTracker(remoteHost, remoteReplicasPerNode, maxReplicaCountPerRequest, currentStartGroupId, time, - threadThrottleDurationMs); + threadThrottleDurationMs, replicationConfig.replicationEnablePrioritzation, replicationConfig.replicationMaxPrioritizedReplicasPercent); logger.trace("Thread name: {} for datanode {} create datanode tracker {}", threadName, remoteHost, dataNodeTracker); diff --git a/ambry-replication/src/main/java/com/github/ambry/replication/continuous/DataNodeTracker.java b/ambry-replication/src/main/java/com/github/ambry/replication/continuous/DataNodeTracker.java index 185ca9e054..f0a5a1f0db 100644 --- a/ambry-replication/src/main/java/com/github/ambry/replication/continuous/DataNodeTracker.java +++ b/ambry-replication/src/main/java/com/github/ambry/replication/continuous/DataNodeTracker.java @@ -48,7 +48,7 @@ public class DataNodeTracker { * @param replicaThrottleDurationMs throttle duration for replicas */ public DataNodeTracker(DataNodeId dataNodeId, List remoteReplicas, int maxActiveGroupSize, - int startGroupId, Time time, long replicaThrottleDurationMs) { + int startGroupId, Time time, long replicaThrottleDurationMs, boolean isReplicaPrioritzationEnabled, int replicationMaxPrioritizedReplicas) { this.dataNodeId = dataNodeId; this.activeGroupTrackers = new ArrayList<>(); @@ -61,7 +61,13 @@ public DataNodeTracker(DataNodeId dataNodeId, List remoteRepl // for each of smaller array of remote replicas create active group trackers with consecutive group ids for (List remoteReplicaList : remoteReplicaSegregatedList) { - ActiveGroupTracker activeGroupTracker = new ActiveGroupTracker(currentGroupId, remoteReplicaList.stream() + int size = remoteReplicaList.size(); + if (isReplicaPrioritzationEnabled) { + int maxSize = replicationMaxPrioritizedReplicas/100 * size; + size = Math.min(size, maxSize); + } + + ActiveGroupTracker activeGroupTracker = new ActiveGroupTracker(currentGroupId, remoteReplicaList.subList(0, size).stream() .map(remoteReplicaInfo -> new ReplicaTracker(remoteReplicaInfo, time, replicaThrottleDurationMs)) .collect(Collectors.toList())); activeGroupTrackers.add(activeGroupTracker);