From 186899f53f8fa807b429684a7703c9b5ee797ec2 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan <1591700+mridulm@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:42:03 -0500 Subject: [PATCH] [CELEBORN-1371] Update ratis with internal port endpoint address as well (#2446) * Update ratis with internal port endpoint address as well, and propagate it to workers, while keeping existing path for applications the same --------- Co-authored-by: Mridul Muralidharan --- .../celeborn/common/client/MasterClient.java | 5 +- .../client/MasterNotLeaderException.java | 18 +++++- .../master/clustermeta/ha/HAHelper.java | 9 ++- .../master/clustermeta/ha/HARaftServer.java | 57 +++++++++++++++---- .../clustermeta/ha/MasterClusterInfo.scala | 4 +- .../master/clustermeta/ha/MasterNode.scala | 14 ++++- .../ha/RatisMasterStatusSystemSuiteJ.java | 25 ++++++++ 7 files changed, 114 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index c34e1050bf..94e4201a99 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -175,7 +175,10 @@ private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) { // 'CelebornException: Exception thrown in awaitResult' if (e.getCause() instanceof MasterNotLeaderException) { MasterNotLeaderException exception = (MasterNotLeaderException) e.getCause(); - String leaderAddr = exception.getSuggestedLeaderAddress(); + String leaderAddr = + isWorker + ? exception.getSuggestedInternalLeaderAddress() + : exception.getSuggestedLeaderAddress(); if (!leaderAddr.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) { setRpcEndpointRef(leaderAddr); } else { diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java index 38198cf786..b613e1041f 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java @@ -28,26 +28,42 @@ public class MasterNotLeaderException extends IOException { private static final long serialVersionUID = -2552475565785098271L; private final String leaderPeer; + private final String internalLeaderPeer; public static final String LEADER_NOT_PRESENTED = "leader is not present"; public MasterNotLeaderException( String currentPeer, String suggestedLeaderPeer, @Nullable Throwable cause) { + this(currentPeer, suggestedLeaderPeer, suggestedLeaderPeer, cause); + } + + public MasterNotLeaderException( + String currentPeer, + String suggestedLeaderPeer, + String suggestedInternalLeaderPeer, + @Nullable Throwable cause) { super( String.format( "Master:%s is not the leader.%s%s", currentPeer, currentPeer.equals(suggestedLeaderPeer) ? StringUtils.EMPTY - : String.format(" Suggested leader is Master:%s.", suggestedLeaderPeer), + : String.format( + " Suggested leader is Master:%s (%s).", + suggestedLeaderPeer, suggestedInternalLeaderPeer), cause == null ? StringUtils.EMPTY : String.format(" Exception:%s.", cause.getMessage())), cause); this.leaderPeer = suggestedLeaderPeer; + this.internalLeaderPeer = suggestedInternalLeaderPeer; } public String getSuggestedLeaderAddress() { return leaderPeer; } + + public String getSuggestedInternalLeaderAddress() { + return internalLeaderPeer; + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java index fdd22443e8..8d2669f207 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java @@ -19,6 +19,9 @@ import java.io.File; import java.io.IOException; +import java.util.Optional; + +import scala.Tuple2; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.protocol.Message; @@ -50,11 +53,13 @@ public static void sendFailure( RpcCallContext context, HARaftServer ratisServer, Throwable cause) { if (context != null) { if (ratisServer != null) { - if (ratisServer.getCachedLeaderPeerRpcEndpoint().isPresent()) { + Optional> leaderPeer = ratisServer.getCachedLeaderPeerRpcEndpoint(); + if (leaderPeer.isPresent()) { context.sendFailure( new MasterNotLeaderException( ratisServer.getRpcEndpoint(), - ratisServer.getCachedLeaderPeerRpcEndpoint().get(), + leaderPeer.get()._1(), + leaderPeer.get()._2(), cause)); } else { context.sendFailure( diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java index 9e502058f0..b9d690d337 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java @@ -74,6 +74,7 @@ static long nextCallId() { private final InetSocketAddress ratisAddr; private final String rpcEndpoint; + private final String internalRpcEndpoint; private final RaftServer server; private final RaftGroup raftGroup; private final RaftPeerId raftPeerId; @@ -89,7 +90,7 @@ static long nextCallId() { private long roleCheckIntervalMs; private final ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock(); private Optional cachedPeerRole = Optional.empty(); - private Optional cachedLeaderPeerRpcEndpoint = Optional.empty(); + private Optional> cachedLeaderPeerRpcEndpoints = Optional.empty(); private final CelebornConf conf; private long workerTimeoutDeadline; private long appTimeoutDeadline; @@ -109,11 +110,13 @@ private HARaftServer( RaftPeerId localRaftPeerId, InetSocketAddress ratisAddr, String rpcEndpoint, + String internalRpcEndpoint, List raftPeers) throws IOException { this.metaHandler = metaHandler; this.ratisAddr = ratisAddr; this.rpcEndpoint = rpcEndpoint; + this.internalRpcEndpoint = internalRpcEndpoint; this.raftPeerId = localRaftPeerId; this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers); this.masterStateMachine = getStateMachine(); @@ -163,6 +166,8 @@ public static HARaftServer newMasterRatisServer( .setId(localRaftPeerId) .setAddress(ratisAddr) .setClientAddress(localNode.rpcEndpoint()) + // We use admin address to host the internal rpc address + .setAdminAddress(localNode.internalRpcEndpoint()) .build(); List raftPeers = new ArrayList<>(); // Add this Ratis server to the Ratis ring @@ -178,6 +183,8 @@ public static HARaftServer newMasterRatisServer( .setId(raftPeerId) .setAddress(peer.ratisEndpoint()) .setClientAddress(peer.rpcEndpoint()) + // We use admin address to host the internal rpc address + .setAdminAddress(peer.internalRpcEndpoint()) .build(); } else { InetSocketAddress peerRatisAddr = peer.ratisAddr(); @@ -186,6 +193,8 @@ public static HARaftServer newMasterRatisServer( .setId(raftPeerId) .setAddress(peerRatisAddr) .setClientAddress(peer.rpcEndpoint()) + // We use admin address to host the internal rpc address + .setAdminAddress(peer.internalRpcEndpoint()) .build(); } @@ -193,7 +202,13 @@ public static HARaftServer newMasterRatisServer( raftPeers.add(raftPeer); }); return new HARaftServer( - metaHandler, conf, localRaftPeerId, ratisAddr, localNode.rpcEndpoint(), raftPeers); + metaHandler, + conf, + localRaftPeerId, + ratisAddr, + localNode.rpcEndpoint(), + localNode.internalRpcEndpoint(), + raftPeers); } public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request) @@ -421,12 +436,12 @@ public boolean isLeader() { /** * Get the suggested leader peer id. * - * @return RaftPeerId of the suggested leader node. + * @return RaftPeerId of the suggested leader node - Tuple2(rpc endpoint, internal rpc endpoint) */ - public Optional getCachedLeaderPeerRpcEndpoint() { + public Optional> getCachedLeaderPeerRpcEndpoint() { this.roleCheckLock.readLock().lock(); try { - return cachedLeaderPeerRpcEndpoint; + return cachedLeaderPeerRpcEndpoints; } finally { this.roleCheckLock.readLock().unlock(); } @@ -442,22 +457,30 @@ public void updateServerRole() { RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole(); if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) { - setServerRole(thisNodeRole, getRpcEndpoint()); + setServerRole(thisNodeRole, getRpcEndpoint(), getInternalRpcEndpoint()); } else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) { ByteString leaderNodeId = roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId(); // There may be a chance, here we get leaderNodeId as null. For // example, in 3 node Ratis, if 2 nodes are down, there will // be no leader. String leaderPeerRpcEndpoint = null; + String leaderPeerInternalRpcEndpoint = null; if (leaderNodeId != null && !leaderNodeId.isEmpty()) { leaderPeerRpcEndpoint = roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress(); + // We use admin address to host the internal rpc address + if (conf.internalPortEnabled()) { + leaderPeerInternalRpcEndpoint = + roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getAdminAddress(); + } else { + leaderPeerInternalRpcEndpoint = leaderPeerRpcEndpoint; + } } - setServerRole(thisNodeRole, leaderPeerRpcEndpoint); + setServerRole(thisNodeRole, leaderPeerRpcEndpoint, leaderPeerInternalRpcEndpoint); } else { - setServerRole(thisNodeRole, null); + setServerRole(thisNodeRole, null, null); } } catch (IOException e) { LOG.error( @@ -465,12 +488,15 @@ public void updateServerRole() { + "{} and resetting leader info.", RaftProtos.RaftPeerRole.UNRECOGNIZED, e); - setServerRole(null, null); + setServerRole(null, null, null); } } /** Set the current server role and the leader peer rpc endpoint. */ - private void setServerRole(RaftProtos.RaftPeerRole currentRole, String leaderPeerRpcEndpoint) { + private void setServerRole( + RaftProtos.RaftPeerRole currentRole, + String leaderPeerRpcEndpoint, + String leaderPeerInternalRpcEndpoint) { this.roleCheckLock.writeLock().lock(); try { boolean leaderChanged = false; @@ -490,7 +516,12 @@ private void setServerRole(RaftProtos.RaftPeerRole currentRole, String leaderPee } this.cachedPeerRole = Optional.ofNullable(currentRole); - this.cachedLeaderPeerRpcEndpoint = Optional.ofNullable(leaderPeerRpcEndpoint); + if (null != leaderPeerRpcEndpoint) { + this.cachedLeaderPeerRpcEndpoints = + Optional.of(Tuple2.apply(leaderPeerRpcEndpoint, leaderPeerInternalRpcEndpoint)); + } else { + this.cachedLeaderPeerRpcEndpoints = Optional.empty(); + } } finally { this.roleCheckLock.writeLock().unlock(); } @@ -510,6 +541,10 @@ public String getRpcEndpoint() { return this.rpcEndpoint; } + public String getInternalRpcEndpoint() { + return this.internalRpcEndpoint; + } + void stepDown() { try { TransferLeadershipRequest request = diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala index a661c6acaf..1042f46273 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterClusterInfo.scala @@ -43,7 +43,9 @@ object MasterClusterInfo extends Logging { val ratisPort = conf.haMasterRatisPort(nodeId) val rpcHost = conf.haMasterNodeHost(nodeId) val rpcPort = conf.haMasterNodePort(nodeId) - MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort) + val internalPort = + if (conf.internalPortEnabled) conf.haMasterNodeInternalPort(nodeId) else rpcPort + MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort, internalPort) } val (localNodes, peerNodes) = localNodeIdOpt match { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala index a1ac2b67ea..b60b76eda5 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala @@ -29,7 +29,8 @@ case class MasterNode( ratisHost: String, ratisPort: Int, rpcHost: String, - rpcPort: Int) { + rpcPort: Int, + internalRpcPort: Int) { def isRatisHostUnresolved: Boolean = ratisAddr.isUnresolved @@ -39,6 +40,8 @@ case class MasterNode( def rpcEndpoint: String = rpcHost + ":" + rpcPort + def internalRpcEndpoint: String = rpcHost + ":" + internalRpcPort + lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort) lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort) @@ -52,6 +55,7 @@ object MasterNode extends Logging { private var ratisPort = 0 private var rpcHost: String = _ private var rpcPort = 0 + private var internalRpcPort = 0 def setNodeId(nodeId: String): this.type = { this.nodeId = nodeId @@ -84,7 +88,13 @@ object MasterNode extends Logging { this } - def build: MasterNode = MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort) + def setInternalRpcPort(internalRpcPort: Int): this.type = { + this.internalRpcPort = internalRpcPort + this + } + + def build: MasterNode = + MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort, internalRpcPort) } private def createSocketAddr(host: String, port: Int): InetSocketAddress = { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index d7cf715a0f..9dbcff797d 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -25,6 +25,8 @@ import java.util.*; import java.util.concurrent.atomic.AtomicLong; +import scala.Tuple2; + import org.junit.*; import org.mockito.Mockito; @@ -127,6 +129,7 @@ public static void resetRaftServer() throws IOException, InterruptedException { .setHost(Utils.localHostName(conf1)) .setRatisPort(ratisPort1) .setRpcPort(ratisPort1) + .setInternalRpcPort(ratisPort1) .setNodeId(id1) .build(); MasterNode masterNode2 = @@ -134,6 +137,7 @@ public static void resetRaftServer() throws IOException, InterruptedException { .setHost(Utils.localHostName(conf2)) .setRatisPort(ratisPort2) .setRpcPort(ratisPort2) + .setInternalRpcPort(ratisPort2) .setNodeId(id2) .build(); MasterNode masterNode3 = @@ -141,6 +145,7 @@ public static void resetRaftServer() throws IOException, InterruptedException { .setHost(Utils.localHostName(conf3)) .setRatisPort(ratisPort3) .setRpcPort(ratisPort3) + .setInternalRpcPort(ratisPort3) .setNodeId(id3) .build(); @@ -179,6 +184,26 @@ public void testLeaderAvaiable() { boolean hasLeader = RATISSERVER1.isLeader() || RATISSERVER2.isLeader() || RATISSERVER3.isLeader(); Assert.assertTrue(hasLeader); + + // Check if the rpc endpoint and internal rpc endpoint of the leader is as expected. + + HARaftServer leader = + RATISSERVER1.isLeader() + ? RATISSERVER1 + : (RATISSERVER2.isLeader() ? RATISSERVER2 : RATISSERVER3); + // one of them must be the follower given the three servers we have + HARaftServer follower = RATISSERVER1.isLeader() ? RATISSERVER2 : RATISSERVER1; + + // This is expected to be false, but as a side effect, updates getCachedLeaderPeerRpcEndpoint + boolean isFollowerCurrentLeader = follower.isLeader(); + Assert.assertFalse(isFollowerCurrentLeader); + + Optional> cachedLeaderPeerRpcEndpoint = + follower.getCachedLeaderPeerRpcEndpoint(); + + Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent()); + Assert.assertEquals(leader.getRpcEndpoint(), cachedLeaderPeerRpcEndpoint.get()._1()); + Assert.assertEquals(leader.getInternalRpcEndpoint(), cachedLeaderPeerRpcEndpoint.get()._2()); } private static final String HOSTNAME1 = "host1";