Skip to content

Commit

Permalink
Avoid registration of non-active validators
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 committed Aug 15, 2023
1 parent b54882a commit a86e148
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,10 @@ public enum ValidatorStatus {
public boolean hasExited() {
return hasExited;
}

public boolean isActive() {
return this.equals(active_ongoing)
|| this.equals(active_exiting)
|| this.equals(active_slashed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public static ValidatorClientService create(
proposerConfigManager.get(),
new ValidatorRegistrationBatchSender(
validatorConfig.getBuilderRegistrationSendingBatchSize(),
validatorApiChannel)));
validatorApiChannel),
validatorApiChannel));
} else {
proposerConfigManager = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static tech.pegasys.teku.infrastructure.logging.ValidatorLogger.VALIDATOR_LOGGER;

import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -24,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
Expand All @@ -40,6 +42,7 @@
import tech.pegasys.teku.spec.datastructures.builder.ValidatorRegistration;
import tech.pegasys.teku.spec.schemas.ApiSchemas;
import tech.pegasys.teku.spec.signatures.Signer;
import tech.pegasys.teku.validator.api.ValidatorApiChannel;
import tech.pegasys.teku.validator.api.ValidatorTimingChannel;
import tech.pegasys.teku.validator.client.loader.OwnedValidators;

Expand All @@ -59,18 +62,21 @@ public class ValidatorRegistrator implements ValidatorTimingChannel {
private final OwnedValidators ownedValidators;
private final ProposerConfigPropertiesProvider validatorRegistrationPropertiesProvider;
private final ValidatorRegistrationBatchSender validatorRegistrationBatchSender;
private final ValidatorApiChannel validatorApiChannel;

public ValidatorRegistrator(
final Spec spec,
final TimeProvider timeProvider,
final OwnedValidators ownedValidators,
final ProposerConfigPropertiesProvider validatorRegistrationPropertiesProvider,
final ValidatorRegistrationBatchSender validatorRegistrationBatchSender) {
final ValidatorRegistrationBatchSender validatorRegistrationBatchSender,
final ValidatorApiChannel validatorApiChannel) {
this.spec = spec;
this.timeProvider = timeProvider;
this.ownedValidators = ownedValidators;
this.validatorRegistrationPropertiesProvider = validatorRegistrationPropertiesProvider;
this.validatorRegistrationBatchSender = validatorRegistrationBatchSender;
this.validatorApiChannel = validatorApiChannel;
}

@Override
Expand Down Expand Up @@ -158,13 +164,13 @@ private void registerValidators() {
"Validator registration(s) is still in progress. Will skip sending registration(s).");
return;
}
final List<Validator> activeValidators = ownedValidators.getActiveValidators();
registerValidators(activeValidators)
final List<Validator> managedValidators = ownedValidators.getActiveValidators();
registerValidators(managedValidators)
.handleException(VALIDATOR_LOGGER::registeringValidatorsFailed)
.always(
() -> {
registrationInProgress.set(false);
cleanupCache(activeValidators);
cleanupCache(managedValidators);
});
}

Expand All @@ -177,28 +183,46 @@ private SafeFuture<Void> registerValidators(final List<Validator> validators) {
.refresh()
.thenCompose(
__ -> {
final Stream<SafeFuture<SignedValidatorRegistration>> validatorRegistrationsFutures =
createValidatorRegistrations(validators);
return SafeFuture.collectAllSuccessful(validatorRegistrationsFutures)
.thenCompose(validatorRegistrationBatchSender::sendInBatches);
final SafeFuture<List<SignedValidatorRegistration>> validatorRegistrations =
filterActiveValidators(validators)
.thenCompose(this::createValidatorRegistrations);
return validatorRegistrations.thenCompose(
validatorRegistrationBatchSender::sendInBatches);
});
}

private Stream<SafeFuture<SignedValidatorRegistration>> createValidatorRegistrations(
private SafeFuture<List<Validator>> filterActiveValidators(final List<Validator> validators) {
final Map<BLSPublicKey, Validator> validatorMap =
validators.stream().collect(Collectors.toMap(Validator::getPublicKey, Function.identity()));
return validatorApiChannel
.getValidatorStatuses(validators.stream().map(Validator::getPublicKey).toList())
.thenApply(
maybeValidatorStatuses ->
maybeValidatorStatuses.map(Map::entrySet).stream()
.flatMap(Collection::stream)
.filter(statusEntry -> statusEntry.getValue().isActive())
.map(statusEntry -> Optional.ofNullable(validatorMap.get(statusEntry.getKey())))
.flatMap(Optional::stream)
.toList());
}

private SafeFuture<List<SignedValidatorRegistration>> createValidatorRegistrations(
final List<Validator> validators) {
return validators.stream()
.map(
validator ->
createSignedValidatorRegistration(
validator,
throwable -> {
final String errorMessage =
String.format(
"Exception while creating a validator registration for %s. Creation will be attempted again next epoch.",
validator.getPublicKey());
LOG.warn(errorMessage, throwable);
}))
.flatMap(Optional::stream);
final Stream<SafeFuture<SignedValidatorRegistration>> validatorRegistrationsFutures =
validators.stream()
.map(
validator ->
createSignedValidatorRegistration(
validator,
throwable -> {
final String errorMessage =
String.format(
"Exception while creating a validator registration for %s. Creation will be attempted again next epoch.",
validator.getPublicKey());
LOG.warn(errorMessage, throwable);
}))
.flatMap(Optional::stream);
return SafeFuture.collectAllSuccessful(validatorRegistrationsFutures);
}

private Optional<SafeFuture<SignedValidatorRegistration>> createSignedValidatorRegistration(
Expand Down Expand Up @@ -315,14 +339,14 @@ public boolean registrationNeedsUpdating(
|| cachedTimestampIsDifferentThanOverride;
}

private void cleanupCache(final List<Validator> activeValidators) {
private void cleanupCache(final List<Validator> managedValidators) {
if (cachedValidatorRegistrations.isEmpty()
|| cachedValidatorRegistrations.size() == activeValidators.size()) {
|| cachedValidatorRegistrations.size() == managedValidators.size()) {
return;
}

final Set<BLSPublicKey> activeValidatorsPublicKeys =
activeValidators.stream()
final Set<BLSPublicKey> managedValidatorsPublicKeys =
managedValidators.stream()
.map(Validator::getPublicKey)
.collect(Collectors.toCollection(HashSet::new));

Expand All @@ -331,10 +355,10 @@ private void cleanupCache(final List<Validator> activeValidators) {
.removeIf(
cachedPublicKey -> {
final boolean requiresRemoving =
!activeValidatorsPublicKeys.contains(cachedPublicKey);
!managedValidatorsPublicKeys.contains(cachedPublicKey);
if (requiresRemoving) {
LOG.debug(
"Removing cached registration for {} because validator is no longer active.",
"Removing cached registration for {} because validator is no longer owned.",
cachedPublicKey);
}
return requiresRemoving;
Expand Down
Loading

0 comments on commit a86e148

Please sign in to comment.