Skip to content

Commit

Permalink
[CELEBORN-1391] Retry when MasterClient receiving a RpcTimeoutException
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Retry when MasterClient receiving a RpcTimeoutException

### Why are the changes needed?
When the MasterClient encounters an RpcTimeoutException, it may indicate that the current master is either busy or unavailable. In such cases, retrying with an alternative master endpoint could work.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test: org.apache.celeborn.common.client.MasterClientSuiteJ#testOneMasterTimeoutInHA

Closes #2466 from jiang13021/celeborn-1391.

Authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
jiang13021 authored and SteNicholas committed Apr 23, 2024
1 parent ed31df4 commit 1672887
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@
import org.apache.celeborn.common.protocol.message.ControlMessages.OneWayMessageResponse$;
import org.apache.celeborn.common.protocol.message.MasterRequestMessage;
import org.apache.celeborn.common.protocol.message.Message;
import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.rpc.RpcTimeout;
import org.apache.celeborn.common.rpc.*;
import org.apache.celeborn.common.util.ThreadUtils;

public class MasterClient {
Expand Down Expand Up @@ -177,7 +174,7 @@ private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) {
LOG.warn("Master leader is not present currently, please check masters' status!");
}
return true;
} else if (e.getCause() instanceof IOException) {
} else if (e.getCause() instanceof IOException || e instanceof RpcTimeoutException) {
resetRpcEndpointRef(oldRef);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

Expand All @@ -45,6 +46,7 @@
import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.rpc.RpcTimeoutException;

public class MasterClientSuiteJ {
private static final Logger LOG = LoggerFactory.getLogger(MasterClientSuiteJ.class);
Expand Down Expand Up @@ -219,6 +221,11 @@ public void testOneMasterDownCausedByRuntimeExceptionInHA() {
checkOneMasterDownInHA(new RuntimeException("test"));
}

@Test
public void testOneMasterTimeoutInHA() {
checkOneMasterAskFailedInHA(new RpcTimeoutException("test", new TimeoutException("test")));
}

private void checkOneMasterDownInHA(Exception causedByException) {
final CelebornConf conf = prepareForCelebornConfWithHA();

Expand Down Expand Up @@ -268,6 +275,61 @@ private void checkOneMasterDownInHA(Exception causedByException) {
assertEquals(mockResponse, response);
}

private void checkOneMasterAskFailedInHA(Exception exception) {
final CelebornConf conf = prepareForCelebornConfWithHA();

final RpcEndpointRef master1 = Mockito.mock(RpcEndpointRef.class);
final RpcEndpointRef master2 = Mockito.mock(RpcEndpointRef.class);
final RpcEndpointRef master3 = Mockito.mock(RpcEndpointRef.class);

// master leader switch to host2
Mockito.doReturn(
Future$.MODULE$.failed(new MasterNotLeaderException("host1:9097", "host2:9097", null)))
.when(master1)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

// Assume master2 get exception.
Mockito.doReturn(Future$.MODULE$.failed(exception))
.when(master2)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

Mockito.doReturn(Future$.MODULE$.successful(mockResponse))
.when(master3)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

Mockito.doAnswer(
(invocation) -> {
RpcAddress address = invocation.getArgument(0, RpcAddress.class);
switch (address.host()) {
case "host1":
return master1;
case "host2":
return master2;
case "host3":
return master3;
default:
fail(
"Should use master host1/host2/host3:" + masterPort + ", but use " + address);
}
return null;
})
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());

MasterClient client = new MasterClient(rpcEnv, conf, false);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);

HeartbeatFromWorkerResponse response = null;
try {
response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should be no exceptions when sending one-way message.", t);
fail("It should be no exceptions when sending one-way message.");
}

assertEquals(mockResponse, response);
}

private void prepareForRpcEnvWithHA(final Supplier<Future<?>> supplier) {
final RpcEndpointRef ref1 = Mockito.mock(RpcEndpointRef.class);
final RpcEndpointRef ref2 = Mockito.mock(RpcEndpointRef.class);
Expand Down

0 comments on commit 1672887

Please sign in to comment.