Skip to content

Commit

Permalink
[CELEBORN-1371] Update ratis with internal port endpoint address as w…
Browse files Browse the repository at this point in the history
…ell (#2446)

* Update ratis with internal port endpoint address as well, and propagate it to workers, while keeping existing path for applications the same
---------

Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
  • Loading branch information
mridulm authored Apr 5, 2024
1 parent 5065dbe commit 186899f
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) {
// 'CelebornException: Exception thrown in awaitResult'
if (e.getCause() instanceof MasterNotLeaderException) {
MasterNotLeaderException exception = (MasterNotLeaderException) e.getCause();
String leaderAddr = exception.getSuggestedLeaderAddress();
String leaderAddr =
isWorker
? exception.getSuggestedInternalLeaderAddress()
: exception.getSuggestedLeaderAddress();
if (!leaderAddr.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) {
setRpcEndpointRef(leaderAddr);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,42 @@ public class MasterNotLeaderException extends IOException {
private static final long serialVersionUID = -2552475565785098271L;

private final String leaderPeer;
private final String internalLeaderPeer;

public static final String LEADER_NOT_PRESENTED = "leader is not present";

public MasterNotLeaderException(
String currentPeer, String suggestedLeaderPeer, @Nullable Throwable cause) {
this(currentPeer, suggestedLeaderPeer, suggestedLeaderPeer, cause);
}

public MasterNotLeaderException(
String currentPeer,
String suggestedLeaderPeer,
String suggestedInternalLeaderPeer,
@Nullable Throwable cause) {
super(
String.format(
"Master:%s is not the leader.%s%s",
currentPeer,
currentPeer.equals(suggestedLeaderPeer)
? StringUtils.EMPTY
: String.format(" Suggested leader is Master:%s.", suggestedLeaderPeer),
: String.format(
" Suggested leader is Master:%s (%s).",
suggestedLeaderPeer, suggestedInternalLeaderPeer),
cause == null
? StringUtils.EMPTY
: String.format(" Exception:%s.", cause.getMessage())),
cause);
this.leaderPeer = suggestedLeaderPeer;
this.internalLeaderPeer = suggestedInternalLeaderPeer;
}

public String getSuggestedLeaderAddress() {
return leaderPeer;
}

public String getSuggestedInternalLeaderAddress() {
return internalLeaderPeer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import java.io.File;
import java.io.IOException;
import java.util.Optional;

import scala.Tuple2;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.protocol.Message;
Expand Down Expand Up @@ -50,11 +53,13 @@ public static void sendFailure(
RpcCallContext context, HARaftServer ratisServer, Throwable cause) {
if (context != null) {
if (ratisServer != null) {
if (ratisServer.getCachedLeaderPeerRpcEndpoint().isPresent()) {
Optional<Tuple2<String, String>> leaderPeer = ratisServer.getCachedLeaderPeerRpcEndpoint();
if (leaderPeer.isPresent()) {
context.sendFailure(
new MasterNotLeaderException(
ratisServer.getRpcEndpoint(),
ratisServer.getCachedLeaderPeerRpcEndpoint().get(),
leaderPeer.get()._1(),
leaderPeer.get()._2(),
cause));
} else {
context.sendFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ static long nextCallId() {

private final InetSocketAddress ratisAddr;
private final String rpcEndpoint;
private final String internalRpcEndpoint;
private final RaftServer server;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
Expand All @@ -89,7 +90,7 @@ static long nextCallId() {
private long roleCheckIntervalMs;
private final ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
private Optional<String> cachedLeaderPeerRpcEndpoint = Optional.empty();
private Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoints = Optional.empty();
private final CelebornConf conf;
private long workerTimeoutDeadline;
private long appTimeoutDeadline;
Expand All @@ -109,11 +110,13 @@ private HARaftServer(
RaftPeerId localRaftPeerId,
InetSocketAddress ratisAddr,
String rpcEndpoint,
String internalRpcEndpoint,
List<RaftPeer> raftPeers)
throws IOException {
this.metaHandler = metaHandler;
this.ratisAddr = ratisAddr;
this.rpcEndpoint = rpcEndpoint;
this.internalRpcEndpoint = internalRpcEndpoint;
this.raftPeerId = localRaftPeerId;
this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
this.masterStateMachine = getStateMachine();
Expand Down Expand Up @@ -163,6 +166,8 @@ public static HARaftServer newMasterRatisServer(
.setId(localRaftPeerId)
.setAddress(ratisAddr)
.setClientAddress(localNode.rpcEndpoint())
// We use admin address to host the internal rpc address
.setAdminAddress(localNode.internalRpcEndpoint())
.build();
List<RaftPeer> raftPeers = new ArrayList<>();
// Add this Ratis server to the Ratis ring
Expand All @@ -178,6 +183,8 @@ public static HARaftServer newMasterRatisServer(
.setId(raftPeerId)
.setAddress(peer.ratisEndpoint())
.setClientAddress(peer.rpcEndpoint())
// We use admin address to host the internal rpc address
.setAdminAddress(peer.internalRpcEndpoint())
.build();
} else {
InetSocketAddress peerRatisAddr = peer.ratisAddr();
Expand All @@ -186,14 +193,22 @@ public static HARaftServer newMasterRatisServer(
.setId(raftPeerId)
.setAddress(peerRatisAddr)
.setClientAddress(peer.rpcEndpoint())
// We use admin address to host the internal rpc address
.setAdminAddress(peer.internalRpcEndpoint())
.build();
}

// Add other nodes belonging to the same service to the Ratis ring
raftPeers.add(raftPeer);
});
return new HARaftServer(
metaHandler, conf, localRaftPeerId, ratisAddr, localNode.rpcEndpoint(), raftPeers);
metaHandler,
conf,
localRaftPeerId,
ratisAddr,
localNode.rpcEndpoint(),
localNode.internalRpcEndpoint(),
raftPeers);
}

public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
Expand Down Expand Up @@ -421,12 +436,12 @@ public boolean isLeader() {
/**
* Get the suggested leader peer id.
*
* @return RaftPeerId of the suggested leader node.
* @return RaftPeerId of the suggested leader node - Tuple2(rpc endpoint, internal rpc endpoint)
*/
public Optional<String> getCachedLeaderPeerRpcEndpoint() {
public Optional<Tuple2<String, String>> getCachedLeaderPeerRpcEndpoint() {
this.roleCheckLock.readLock().lock();
try {
return cachedLeaderPeerRpcEndpoint;
return cachedLeaderPeerRpcEndpoints;
} finally {
this.roleCheckLock.readLock().unlock();
}
Expand All @@ -442,35 +457,46 @@ public void updateServerRole() {
RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();

if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
setServerRole(thisNodeRole, getRpcEndpoint());
setServerRole(thisNodeRole, getRpcEndpoint(), getInternalRpcEndpoint());
} else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId = roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
// There may be a chance, here we get leaderNodeId as null. For
// example, in 3 node Ratis, if 2 nodes are down, there will
// be no leader.
String leaderPeerRpcEndpoint = null;
String leaderPeerInternalRpcEndpoint = null;
if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
leaderPeerRpcEndpoint =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress();
// We use admin address to host the internal rpc address
if (conf.internalPortEnabled()) {
leaderPeerInternalRpcEndpoint =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getAdminAddress();
} else {
leaderPeerInternalRpcEndpoint = leaderPeerRpcEndpoint;
}
}

setServerRole(thisNodeRole, leaderPeerRpcEndpoint);
setServerRole(thisNodeRole, leaderPeerRpcEndpoint, leaderPeerInternalRpcEndpoint);

} else {
setServerRole(thisNodeRole, null);
setServerRole(thisNodeRole, null, null);
}
} catch (IOException e) {
LOG.error(
"Failed to retrieve RaftPeerRole. Setting cached role to "
+ "{} and resetting leader info.",
RaftProtos.RaftPeerRole.UNRECOGNIZED,
e);
setServerRole(null, null);
setServerRole(null, null, null);
}
}

/** Set the current server role and the leader peer rpc endpoint. */
private void setServerRole(RaftProtos.RaftPeerRole currentRole, String leaderPeerRpcEndpoint) {
private void setServerRole(
RaftProtos.RaftPeerRole currentRole,
String leaderPeerRpcEndpoint,
String leaderPeerInternalRpcEndpoint) {
this.roleCheckLock.writeLock().lock();
try {
boolean leaderChanged = false;
Expand All @@ -490,7 +516,12 @@ private void setServerRole(RaftProtos.RaftPeerRole currentRole, String leaderPee
}

this.cachedPeerRole = Optional.ofNullable(currentRole);
this.cachedLeaderPeerRpcEndpoint = Optional.ofNullable(leaderPeerRpcEndpoint);
if (null != leaderPeerRpcEndpoint) {
this.cachedLeaderPeerRpcEndpoints =
Optional.of(Tuple2.apply(leaderPeerRpcEndpoint, leaderPeerInternalRpcEndpoint));
} else {
this.cachedLeaderPeerRpcEndpoints = Optional.empty();
}
} finally {
this.roleCheckLock.writeLock().unlock();
}
Expand All @@ -510,6 +541,10 @@ public String getRpcEndpoint() {
return this.rpcEndpoint;
}

public String getInternalRpcEndpoint() {
return this.internalRpcEndpoint;
}

void stepDown() {
try {
TransferLeadershipRequest request =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ object MasterClusterInfo extends Logging {
val ratisPort = conf.haMasterRatisPort(nodeId)
val rpcHost = conf.haMasterNodeHost(nodeId)
val rpcPort = conf.haMasterNodePort(nodeId)
MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort)
val internalPort =
if (conf.internalPortEnabled) conf.haMasterNodeInternalPort(nodeId) else rpcPort
MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort, internalPort)
}

val (localNodes, peerNodes) = localNodeIdOpt match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ case class MasterNode(
ratisHost: String,
ratisPort: Int,
rpcHost: String,
rpcPort: Int) {
rpcPort: Int,
internalRpcPort: Int) {

def isRatisHostUnresolved: Boolean = ratisAddr.isUnresolved

Expand All @@ -39,6 +40,8 @@ case class MasterNode(

def rpcEndpoint: String = rpcHost + ":" + rpcPort

def internalRpcEndpoint: String = rpcHost + ":" + internalRpcPort

lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort)

lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort)
Expand All @@ -52,6 +55,7 @@ object MasterNode extends Logging {
private var ratisPort = 0
private var rpcHost: String = _
private var rpcPort = 0
private var internalRpcPort = 0

def setNodeId(nodeId: String): this.type = {
this.nodeId = nodeId
Expand Down Expand Up @@ -84,7 +88,13 @@ object MasterNode extends Logging {
this
}

def build: MasterNode = MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort)
def setInternalRpcPort(internalRpcPort: Int): this.type = {
this.internalRpcPort = internalRpcPort
this
}

def build: MasterNode =
MasterNode(nodeId, ratisHost, ratisPort, rpcHost, rpcPort, internalRpcPort)
}

private def createSocketAddr(host: String, port: Int): InetSocketAddress = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

import scala.Tuple2;

import org.junit.*;
import org.mockito.Mockito;

Expand Down Expand Up @@ -127,20 +129,23 @@ public static void resetRaftServer() throws IOException, InterruptedException {
.setHost(Utils.localHostName(conf1))
.setRatisPort(ratisPort1)
.setRpcPort(ratisPort1)
.setInternalRpcPort(ratisPort1)
.setNodeId(id1)
.build();
MasterNode masterNode2 =
new MasterNode.Builder()
.setHost(Utils.localHostName(conf2))
.setRatisPort(ratisPort2)
.setRpcPort(ratisPort2)
.setInternalRpcPort(ratisPort2)
.setNodeId(id2)
.build();
MasterNode masterNode3 =
new MasterNode.Builder()
.setHost(Utils.localHostName(conf3))
.setRatisPort(ratisPort3)
.setRpcPort(ratisPort3)
.setInternalRpcPort(ratisPort3)
.setNodeId(id3)
.build();

Expand Down Expand Up @@ -179,6 +184,26 @@ public void testLeaderAvaiable() {
boolean hasLeader =
RATISSERVER1.isLeader() || RATISSERVER2.isLeader() || RATISSERVER3.isLeader();
Assert.assertTrue(hasLeader);

// Check if the rpc endpoint and internal rpc endpoint of the leader is as expected.

HARaftServer leader =
RATISSERVER1.isLeader()
? RATISSERVER1
: (RATISSERVER2.isLeader() ? RATISSERVER2 : RATISSERVER3);
// one of them must be the follower given the three servers we have
HARaftServer follower = RATISSERVER1.isLeader() ? RATISSERVER2 : RATISSERVER1;

// This is expected to be false, but as a side effect, updates getCachedLeaderPeerRpcEndpoint
boolean isFollowerCurrentLeader = follower.isLeader();
Assert.assertFalse(isFollowerCurrentLeader);

Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoint =
follower.getCachedLeaderPeerRpcEndpoint();

Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent());
Assert.assertEquals(leader.getRpcEndpoint(), cachedLeaderPeerRpcEndpoint.get()._1());
Assert.assertEquals(leader.getInternalRpcEndpoint(), cachedLeaderPeerRpcEndpoint.get()._2());
}

private static final String HOSTNAME1 = "host1";
Expand Down

0 comments on commit 186899f

Please sign in to comment.