Skip to content

Commit

Permalink
update blobs rpc methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdi-aouadi committed Nov 6, 2024
1 parent 4dc1d6a commit 37c3683
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage;
Expand Down Expand Up @@ -284,8 +283,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
RpcContextCodec.forkDigest(spec, recentChainData, ForkDigestPayloadContext.BLOB_SIDECAR);

final BlobSidecarsByRootMessageHandler blobSidecarsByRootHandler =
new BlobSidecarsByRootMessageHandler(
spec, getSpecConfigDeneb(spec), metricsSystem, combinedChainDataClient);
new BlobSidecarsByRootMessageHandler(spec, metricsSystem, combinedChainDataClient);
final BlobSidecarsByRootRequestMessageSchema blobSidecarsByRootRequestMessageSchema =
SchemaDefinitionsDeneb.required(
spec.forMilestone(SpecMilestone.DENEB).getSchemaDefinitions())
Expand Down Expand Up @@ -326,8 +324,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
RpcContextCodec.forkDigest(spec, recentChainData, ForkDigestPayloadContext.BLOB_SIDECAR);

final BlobSidecarsByRangeMessageHandler blobSidecarsByRangeHandler =
new BlobSidecarsByRangeMessageHandler(
spec, getSpecConfigDeneb(spec), metricsSystem, combinedChainDataClient);
new BlobSidecarsByRangeMessageHandler(spec, metricsSystem, combinedChainDataClient);

return Optional.of(
new SingleProtocolEth2RpcMethod<>(
Expand Down Expand Up @@ -426,10 +423,6 @@ private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
spec.getNetworkingConfig());
}

private static SpecConfigDeneb getSpecConfigDeneb(final Spec spec) {
return SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig());
}

public Collection<RpcMethod<?, ?, ?>> all() {
return Collections.unmodifiableCollection(allMethods);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
import tech.pegasys.teku.networking.eth2.rpc.core.RpcException.ResourceUnavailableException;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.util.SlotAndBlockRootAndBlobIndex;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;

/**
Expand All @@ -58,18 +59,15 @@ public class BlobSidecarsByRangeMessageHandler
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
private final SpecConfigDeneb specConfigDeneb;
private final CombinedChainDataClient combinedChainDataClient;
private final LabelledMetric<Counter> requestCounter;
private final Counter totalBlobSidecarsRequestedCounter;

public BlobSidecarsByRangeMessageHandler(
final Spec spec,
final SpecConfigDeneb specConfigDeneb,
final MetricsSystem metricsSystem,
final CombinedChainDataClient combinedChainDataClient) {
this.spec = spec;
this.specConfigDeneb = specConfigDeneb;
this.combinedChainDataClient = combinedChainDataClient;
requestCounter =
metricsSystem.createLabelledCounter(
Expand All @@ -88,16 +86,23 @@ public BlobSidecarsByRangeMessageHandler(
public Optional<RpcException> validateRequest(
final String protocolId, final BlobSidecarsByRangeRequestMessage request) {

final long requestedCount = calculateRequestedCount(request);
final SpecMilestone latestMilestoneRequested =
spec.getForkSchedule().getSpecMilestoneAtSlot(request.getMaxSlot());
final MiscHelpers miscHelpers = spec.forMilestone(latestMilestoneRequested).miscHelpers();

if (requestedCount > specConfigDeneb.getMaxRequestBlobSidecars()) {
final int maxRequestBlobSidecars = miscHelpers.getMaxRequestBlobSidecars();
final int maxBlobsPerBlock = miscHelpers.getMaxBlobsPerBlock();

final long requestedCount = calculateRequestedCount(request, maxBlobsPerBlock);

if (requestedCount > maxRequestBlobSidecars) {
requestCounter.labels("count_too_big").inc();
return Optional.of(
new RpcException(
INVALID_REQUEST_CODE,
String.format(
"Only a maximum of %s blob sidecars can be requested per request",
specConfigDeneb.getMaxRequestBlobSidecars())));
maxRequestBlobSidecars)));
}

return Optional.empty();
Expand All @@ -118,7 +123,10 @@ public void onIncomingMessage(
message.getCount(),
startSlot);

final long requestedCount = calculateRequestedCount(message);
final SpecMilestone latestMilestoneRequested =
spec.getForkSchedule().getSpecMilestoneAtSlot(endSlot);
final MiscHelpers miscHelpers = spec.forMilestone(latestMilestoneRequested).miscHelpers();
final long requestedCount = calculateRequestedCount(message, miscHelpers.getMaxBlobsPerBlock());

final Optional<RequestApproval> blobSidecarsRequestApproval =
peer.approveBlobSidecarsRequest(callback, requestedCount);
Expand Down Expand Up @@ -159,9 +167,15 @@ public void onIncomingMessage(
} else {
canonicalHotRoots = ImmutableSortedMap.of();
}

final int maxRequestBlobSidecars = miscHelpers.getMaxRequestBlobSidecars();
final RequestState initialState =
new RequestState(callback, startSlot, endSlot, canonicalHotRoots, finalizedSlot);
new RequestState(
callback,
startSlot,
endSlot,
canonicalHotRoots,
finalizedSlot,
maxRequestBlobSidecars);
if (message.getCount().isZero()) {
return SafeFuture.completedFuture(initialState);
}
Expand All @@ -182,8 +196,9 @@ public void onIncomingMessage(
});
}

private long calculateRequestedCount(final BlobSidecarsByRangeRequestMessage message) {
return specConfigDeneb.getMaxBlobsPerBlock() * message.getCount().longValue();
private long calculateRequestedCount(
final BlobSidecarsByRangeRequestMessage message, final int maxBlobsPerBlock) {
return maxBlobsPerBlock * message.getCount().longValue();
}

private boolean checkBlobSidecarsAreAvailable(
Expand Down Expand Up @@ -234,6 +249,7 @@ class RequestState {
private final UInt64 endSlot;
private final UInt64 finalizedSlot;
private final Map<UInt64, Bytes32> canonicalHotRoots;
private final int maxRequestBlobSidecars;

private final AtomicInteger sentBlobSidecars = new AtomicInteger(0);

Expand All @@ -247,12 +263,14 @@ class RequestState {
final UInt64 startSlot,
final UInt64 endSlot,
final Map<UInt64, Bytes32> canonicalHotRoots,
final UInt64 finalizedSlot) {
final UInt64 finalizedSlot,
final int maxRequestBlobSidecars) {
this.callback = callback;
this.startSlot = startSlot;
this.endSlot = endSlot;
this.finalizedSlot = finalizedSlot;
this.canonicalHotRoots = canonicalHotRoots;
this.maxRequestBlobSidecars = maxRequestBlobSidecars;
}

SafeFuture<Void> sendBlobSidecar(final BlobSidecar blobSidecar) {
Expand All @@ -262,7 +280,7 @@ SafeFuture<Void> sendBlobSidecar(final BlobSidecar blobSidecar) {
SafeFuture<Optional<BlobSidecar>> loadNextBlobSidecar() {
if (blobSidecarKeysIterator.isEmpty()) {
return combinedChainDataClient
.getBlobSidecarKeys(startSlot, endSlot, specConfigDeneb.getMaxRequestBlobSidecars())
.getBlobSidecarKeys(startSlot, endSlot, maxRequestBlobSidecars)
.thenCompose(
keys -> {
blobSidecarKeysIterator = Optional.of(keys.iterator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import tech.pegasys.teku.networking.eth2.rpc.core.RpcException;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
Expand All @@ -52,19 +52,16 @@ public class BlobSidecarsByRootMessageHandler
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
private final SpecConfigDeneb specConfigDeneb;
private final CombinedChainDataClient combinedChainDataClient;

private final LabelledMetric<Counter> requestCounter;
private final Counter totalBlobSidecarsRequestedCounter;

public BlobSidecarsByRootMessageHandler(
final Spec spec,
final SpecConfigDeneb specConfigDeneb,
final MetricsSystem metricsSystem,
final CombinedChainDataClient combinedChainDataClient) {
this.spec = spec;
this.specConfigDeneb = specConfigDeneb;
this.combinedChainDataClient = combinedChainDataClient;
requestCounter =
metricsSystem.createLabelledCounter(
Expand All @@ -82,7 +79,7 @@ public BlobSidecarsByRootMessageHandler(
@Override
public Optional<RpcException> validateRequest(
final String protocolId, final BlobSidecarsByRootRequestMessage request) {
final int maxRequestBlobSidecars = specConfigDeneb.getMaxRequestBlobSidecars();
final int maxRequestBlobSidecars = getMaxRequestBlobSidecars();
if (request.size() > maxRequestBlobSidecars) {
requestCounter.labels("count_too_big").inc();
return Optional.of(
Expand Down Expand Up @@ -156,6 +153,13 @@ public void onIncomingMessage(
});
}

private int getMaxRequestBlobSidecars() {
final UInt64 epoch =
combinedChainDataClient.getRecentChainData().getCurrentEpoch().orElse(UInt64.ZERO);
final SpecMilestone specMilestone = spec.getForkSchedule().getSpecMilestoneAtEpoch(epoch);
return spec.forMilestone(specMilestone).miscHelpers().getMaxRequestBlobSidecars();
}

private UInt64 getFinalizedEpoch() {
return combinedChainDataClient
.getFinalizedBlock()
Expand Down
Loading

0 comments on commit 37c3683

Please sign in to comment.