Skip to content

Commit

Permalink
Trigger primary BN readiness check when failover has issues (#8193)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Apr 12, 2024
1 parent 8ac4353 commit c862af7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public Iterator<? extends RemoteValidatorApiChannel> getFailoversInOrderOfReadin
.iterator();
}

public SafeFuture<Void> performPrimaryReadinessCheck() {
return performReadinessCheck(primaryBeaconNodeApi, true);
}

@Override
protected SafeFuture<?> doStart() {
return performReadinessCheckAgainstAllNodes();
Expand Down Expand Up @@ -136,10 +140,6 @@ private SafeFuture<Void> performReadinessCheckAgainstAllNodes() {
return SafeFuture.allOf(primaryReadinessCheck, SafeFuture.allOf(failoverReadinessChecks));
}

private SafeFuture<Void> performPrimaryReadinessCheck() {
return performReadinessCheck(primaryBeaconNodeApi, true);
}

private SafeFuture<Void> performFailoverReadinessCheck(final RemoteValidatorApiChannel failover) {
return performReadinessCheck(failover, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import okhttp3.HttpUrl;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class EventSourceBeaconChainEventAdapter
private final CountDownLatch runningLatch = new CountDownLatch(1);

private volatile BackgroundEventSource eventSource;
private volatile RemoteValidatorApiChannel currentBeaconNodeUsedForEventStreaming;
@VisibleForTesting volatile RemoteValidatorApiChannel currentBeaconNodeUsedForEventStreaming;

private final BeaconNodeReadinessManager beaconNodeReadinessManager;
private final RemoteValidatorApiChannel primaryBeaconNodeApi;
Expand Down Expand Up @@ -120,9 +121,14 @@ public void onPrimaryNodeNotReady() {
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onFailoverNodeNotReady(final RemoteValidatorApiChannel failoverNotInSync) {
if (currentEventStreamHasSameEndpoint(failoverNotInSync)) {
switchToFailoverEventStreamIfAvailable();
if (failoverBeaconNodeApis.size() == 1 || !switchToFailoverEventStreamIfAvailable()) {
// No failover switching is available, and we are currently connected to a failover node
// with issues, so trigger the readiness check against the primary BN immediately
beaconNodeReadinessManager.performPrimaryReadinessCheck();
}
}
}

Expand Down Expand Up @@ -170,26 +176,28 @@ private HttpUrl createEventStreamSourceUrl(
}

// synchronized because of the ConnectionErrorHandler and the BeaconNodeReadinessChannel callbacks
private synchronized void switchToFailoverEventStreamIfAvailable() {
private synchronized boolean switchToFailoverEventStreamIfAvailable() {
if (failoverBeaconNodeApis.isEmpty()) {
return;
return false;
}
findReadyFailoverAndSwitch();
return findReadyFailoverAndSwitch();
}

private void findReadyFailoverAndSwitch() {
failoverBeaconNodeApis.stream()
.filter(beaconNodeReadinessManager::isReady)
.findFirst()
.ifPresentOrElse(
this::switchToFailoverEventStream,
validatorLogger::noFailoverBeaconNodesAvailableForEventStreaming);
private boolean findReadyFailoverAndSwitch() {
final Optional<? extends RemoteValidatorApiChannel> readyFailover =
failoverBeaconNodeApis.stream()
.filter(beaconNodeReadinessManager::isReady)
.filter(failover -> !currentEventStreamHasSameEndpoint(failover))
.findFirst();
if (readyFailover.isPresent()) {
switchToFailoverEventStream(readyFailover.get());
return true;
}
validatorLogger.noFailoverBeaconNodesAvailableForEventStreaming();
return false;
}

private void switchToFailoverEventStream(final RemoteValidatorApiChannel beaconNodeApi) {
if (currentEventStreamHasSameEndpoint(beaconNodeApi)) {
return;
}
eventSource.close();
eventSource = createEventSource(beaconNodeApi);
currentBeaconNodeUsedForEventStreaming = beaconNodeApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import tech.pegasys.teku.api.response.v1.EventType;
Expand Down Expand Up @@ -69,6 +71,32 @@ public void shouldSubscribeToSlashingEvents(final boolean shutdownWhenValidatorS
verifyEventSourceSubscriptionUrl(httpUrlMock, shutdownWhenValidatorSlashedEnabled);
}

@Test
public void performsPrimaryReadinessCheckWhenFailoverNotReadyAndNoOtherFailoversAvailable() {
final BeaconNodeReadinessManager beaconNodeReadinessManager =
mock(BeaconNodeReadinessManager.class);
final RemoteValidatorApiChannel failover = mock(RemoteValidatorApiChannel.class);
final EventSourceBeaconChainEventAdapter eventSourceBeaconChainEventAdapter =
new EventSourceBeaconChainEventAdapter(
beaconNodeReadinessManager,
mock(RemoteValidatorApiChannel.class),
List.of(failover),
mock(OkHttpClient.class),
mock(ValidatorLogger.class),
mock(BeaconChainEventAdapter.class),
mock(ValidatorTimingChannel.class),
metricsSystemMock,
true,
false,
mock(Spec.class));

eventSourceBeaconChainEventAdapter.currentBeaconNodeUsedForEventStreaming = failover;

eventSourceBeaconChainEventAdapter.onFailoverNodeNotReady(failover);

verify(beaconNodeReadinessManager).performPrimaryReadinessCheck();
}

private EventSourceBeaconChainEventAdapter initEventSourceBeaconChainEventAdapter(
final boolean shutdownWhenValidatorSlashedEnabled) {
return new EventSourceBeaconChainEventAdapter(
Expand Down

0 comments on commit c862af7

Please sign in to comment.