Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8767 update blobs rpc methods #8823

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,63 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import static tech.pegasys.teku.infrastructure.async.Waiter.waitFor;
import static tech.pegasys.teku.spec.SpecMilestone.CAPELLA;
import static tech.pegasys.teku.spec.SpecMilestone.DENEB;
import static tech.pegasys.teku.spec.SpecMilestone.ELECTRA;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.TestSpecContext;
import tech.pegasys.teku.spec.TestSpecInvocationContextProvider;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.generator.ChainBuilder;

@TestSpecContext(milestone = {CAPELLA, DENEB, ELECTRA})
public class BlobSidecarsByRangeIntegrationTest extends AbstractRpcMethodIntegrationTest {

@Test
private Eth2Peer peer;
private SpecMilestone specMilestone;

@BeforeEach
public void setUp(final TestSpecInvocationContextProvider.SpecContext specContext) {
peer = createPeer(specContext.getSpec());
specMilestone = specContext.getSpecMilestone();
}

@TestTemplate
public void requestBlobSidecars_shouldFailBeforeDenebMilestone() {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalCapella());
assumeThat(specMilestone).isLessThan(SpecMilestone.DENEB);
assertThatThrownBy(() -> requestBlobSidecarsByRange(peer, UInt64.ONE, UInt64.valueOf(10)))
.hasRootCauseInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("BlobSidecarsByRange method is not supported");
}

@Test
public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsOnDenebMilestone()
@TestTemplate
public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsAfterDenebMilestone()
throws ExecutionException, InterruptedException, TimeoutException {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalDeneb());
assumeThat(specMilestone).isGreaterThanOrEqualTo(DENEB);
final List<BlobSidecar> blobSidecars =
requestBlobSidecarsByRange(peer, UInt64.ONE, UInt64.valueOf(10));
assertThat(blobSidecars).isEmpty();
}

@Test
@TestTemplate
public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsWhenCountIsZero()
throws ExecutionException, InterruptedException, TimeoutException {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalDeneb());
assumeThat(specMilestone).isGreaterThanOrEqualTo(DENEB);

// finalize chain 2 blobs per block
finalizeChainWithBlobs(2);
Expand All @@ -65,10 +82,10 @@ public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsWhenCountIsZero()
assertThat(blobSidecars).isEmpty();
}

@Test
@TestTemplate
public void requestBlobSidecars_shouldReturnCanonicalBlobSidecarsOnDenebMilestone()
throws ExecutionException, InterruptedException, TimeoutException {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalDeneb());
assumeThat(specMilestone).isGreaterThanOrEqualTo(DENEB);

// finalize chain 2 blobs per block
finalizeChainWithBlobs(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import static tech.pegasys.teku.infrastructure.async.Waiter.waitFor;
import static tech.pegasys.teku.spec.SpecMilestone.CAPELLA;
import static tech.pegasys.teku.spec.SpecMilestone.DENEB;
import static tech.pegasys.teku.spec.SpecMilestone.ELECTRA;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -24,46 +28,59 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.TestSpecContext;
import tech.pegasys.teku.spec.TestSpecInvocationContextProvider;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

@TestSpecContext(milestone = {CAPELLA, DENEB, ELECTRA})
public class BlobSidecarsByRootIntegrationTest extends AbstractRpcMethodIntegrationTest {

@Test
private Eth2Peer peer;
private SpecMilestone specMilestone;

@BeforeEach
public void setUp(final TestSpecInvocationContextProvider.SpecContext specContext) {
peer = createPeer(specContext.getSpec());
specMilestone = specContext.getSpecMilestone();
}

@TestTemplate
public void requestBlobSidecars_shouldFailBeforeDenebMilestone() {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalCapella());
assumeThat(specMilestone).isLessThan(SpecMilestone.DENEB);
assertThatThrownBy(() -> requestBlobSidecarsByRoot(peer, List.of()))
.hasRootCauseInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("BlobSidecarsByRoot method is not supported");
}

@Test
@TestTemplate
public void requestBlobSidecar_shouldFailBeforeDenebMilestone() {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalCapella());
assumeThat(specMilestone).isLessThan(SpecMilestone.DENEB);
assertThatThrownBy(
() -> requestBlobSidecarByRoot(peer, new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO)))
.hasRootCauseInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("BlobSidecarsByRoot method is not supported");
}

@Test
public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsOnDenebMilestone()
@TestTemplate
public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsAfterDenebMilestone()
throws ExecutionException, InterruptedException, TimeoutException {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalDeneb());
assumeThat(specMilestone).isGreaterThanOrEqualTo(DENEB);
final Optional<BlobSidecar> blobSidecar =
requestBlobSidecarByRoot(peer, new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO));
assertThat(blobSidecar).isEmpty();
}

@Test
@TestTemplate
public void requestBlobSidecars_shouldReturnBlobSidecarsOnDenebMilestone()
throws ExecutionException, InterruptedException, TimeoutException {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalDeneb());
assumeThat(specMilestone).isGreaterThanOrEqualTo(DENEB);

// generate 4 blobs per block
peerStorage.chainUpdater().blockOptions.setGenerateRandomBlobs(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage;
Expand Down Expand Up @@ -284,8 +283,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
RpcContextCodec.forkDigest(spec, recentChainData, ForkDigestPayloadContext.BLOB_SIDECAR);

final BlobSidecarsByRootMessageHandler blobSidecarsByRootHandler =
new BlobSidecarsByRootMessageHandler(
spec, getSpecConfigDeneb(spec), metricsSystem, combinedChainDataClient);
new BlobSidecarsByRootMessageHandler(spec, metricsSystem, combinedChainDataClient);
final BlobSidecarsByRootRequestMessageSchema blobSidecarsByRootRequestMessageSchema =
SchemaDefinitionsDeneb.required(
spec.forMilestone(SpecMilestone.DENEB).getSchemaDefinitions())
Expand Down Expand Up @@ -326,8 +324,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
RpcContextCodec.forkDigest(spec, recentChainData, ForkDigestPayloadContext.BLOB_SIDECAR);

final BlobSidecarsByRangeMessageHandler blobSidecarsByRangeHandler =
new BlobSidecarsByRangeMessageHandler(
spec, getSpecConfigDeneb(spec), metricsSystem, combinedChainDataClient);
new BlobSidecarsByRangeMessageHandler(spec, metricsSystem, combinedChainDataClient);

return Optional.of(
new SingleProtocolEth2RpcMethod<>(
Expand Down Expand Up @@ -426,10 +423,6 @@ private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
spec.getNetworkingConfig());
}

private static SpecConfigDeneb getSpecConfigDeneb(final Spec spec) {
return SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig());
}

public Collection<RpcMethod<?, ?, ?>> all() {
return Collections.unmodifiableCollection(allMethods);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
import tech.pegasys.teku.networking.eth2.rpc.core.RpcException.ResourceUnavailableException;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.util.SlotAndBlockRootAndBlobIndex;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;

/**
Expand All @@ -58,18 +59,15 @@ public class BlobSidecarsByRangeMessageHandler
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
private final SpecConfigDeneb specConfigDeneb;
private final CombinedChainDataClient combinedChainDataClient;
private final LabelledMetric<Counter> requestCounter;
private final Counter totalBlobSidecarsRequestedCounter;

public BlobSidecarsByRangeMessageHandler(
final Spec spec,
final SpecConfigDeneb specConfigDeneb,
final MetricsSystem metricsSystem,
final CombinedChainDataClient combinedChainDataClient) {
this.spec = spec;
this.specConfigDeneb = specConfigDeneb;
this.combinedChainDataClient = combinedChainDataClient;
requestCounter =
metricsSystem.createLabelledCounter(
Expand All @@ -88,16 +86,23 @@ public BlobSidecarsByRangeMessageHandler(
public Optional<RpcException> validateRequest(
final String protocolId, final BlobSidecarsByRangeRequestMessage request) {

final long requestedCount = calculateRequestedCount(request);
final SpecMilestone latestMilestoneRequested =
spec.getForkSchedule().getSpecMilestoneAtSlot(request.getMaxSlot());
final MiscHelpers miscHelpers = spec.forMilestone(latestMilestoneRequested).miscHelpers();

if (requestedCount > specConfigDeneb.getMaxRequestBlobSidecars()) {
final int maxRequestBlobSidecars = miscHelpers.getMaxRequestBlobSidecars();
final int maxBlobsPerBlock = miscHelpers.getMaxBlobsPerBlock();

final long requestedCount = calculateRequestedCount(request, maxBlobsPerBlock);

if (requestedCount > maxRequestBlobSidecars) {
requestCounter.labels("count_too_big").inc();
return Optional.of(
new RpcException(
INVALID_REQUEST_CODE,
String.format(
"Only a maximum of %s blob sidecars can be requested per request",
specConfigDeneb.getMaxRequestBlobSidecars())));
maxRequestBlobSidecars)));
}

return Optional.empty();
Expand All @@ -118,7 +123,10 @@ public void onIncomingMessage(
message.getCount(),
startSlot);

final long requestedCount = calculateRequestedCount(message);
final SpecMilestone latestMilestoneRequested =
spec.getForkSchedule().getSpecMilestoneAtSlot(endSlot);
final MiscHelpers miscHelpers = spec.forMilestone(latestMilestoneRequested).miscHelpers();
final long requestedCount = calculateRequestedCount(message, miscHelpers.getMaxBlobsPerBlock());

final Optional<RequestApproval> blobSidecarsRequestApproval =
peer.approveBlobSidecarsRequest(callback, requestedCount);
Expand Down Expand Up @@ -159,9 +167,15 @@ public void onIncomingMessage(
} else {
canonicalHotRoots = ImmutableSortedMap.of();
}

final int maxRequestBlobSidecars = miscHelpers.getMaxRequestBlobSidecars();
final RequestState initialState =
new RequestState(callback, startSlot, endSlot, canonicalHotRoots, finalizedSlot);
new RequestState(
callback,
startSlot,
endSlot,
canonicalHotRoots,
finalizedSlot,
maxRequestBlobSidecars);
if (message.getCount().isZero()) {
return SafeFuture.completedFuture(initialState);
}
Expand All @@ -182,8 +196,9 @@ public void onIncomingMessage(
});
}

private long calculateRequestedCount(final BlobSidecarsByRangeRequestMessage message) {
return specConfigDeneb.getMaxBlobsPerBlock() * message.getCount().longValue();
private long calculateRequestedCount(
final BlobSidecarsByRangeRequestMessage message, final int maxBlobsPerBlock) {
return maxBlobsPerBlock * message.getCount().longValue();
}

private boolean checkBlobSidecarsAreAvailable(
Expand Down Expand Up @@ -234,6 +249,7 @@ class RequestState {
private final UInt64 endSlot;
private final UInt64 finalizedSlot;
private final Map<UInt64, Bytes32> canonicalHotRoots;
private final int maxRequestBlobSidecars;

private final AtomicInteger sentBlobSidecars = new AtomicInteger(0);

Expand All @@ -247,12 +263,14 @@ class RequestState {
final UInt64 startSlot,
final UInt64 endSlot,
final Map<UInt64, Bytes32> canonicalHotRoots,
final UInt64 finalizedSlot) {
final UInt64 finalizedSlot,
final int maxRequestBlobSidecars) {
this.callback = callback;
this.startSlot = startSlot;
this.endSlot = endSlot;
this.finalizedSlot = finalizedSlot;
this.canonicalHotRoots = canonicalHotRoots;
this.maxRequestBlobSidecars = maxRequestBlobSidecars;
}

SafeFuture<Void> sendBlobSidecar(final BlobSidecar blobSidecar) {
Expand All @@ -262,7 +280,7 @@ SafeFuture<Void> sendBlobSidecar(final BlobSidecar blobSidecar) {
SafeFuture<Optional<BlobSidecar>> loadNextBlobSidecar() {
if (blobSidecarKeysIterator.isEmpty()) {
return combinedChainDataClient
.getBlobSidecarKeys(startSlot, endSlot, specConfigDeneb.getMaxRequestBlobSidecars())
.getBlobSidecarKeys(startSlot, endSlot, maxRequestBlobSidecars)
.thenCompose(
keys -> {
blobSidecarKeysIterator = Optional.of(keys.iterator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import tech.pegasys.teku.networking.eth2.rpc.core.RpcException;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
Expand All @@ -52,19 +52,16 @@ public class BlobSidecarsByRootMessageHandler
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
private final SpecConfigDeneb specConfigDeneb;
private final CombinedChainDataClient combinedChainDataClient;

private final LabelledMetric<Counter> requestCounter;
private final Counter totalBlobSidecarsRequestedCounter;

public BlobSidecarsByRootMessageHandler(
final Spec spec,
final SpecConfigDeneb specConfigDeneb,
final MetricsSystem metricsSystem,
final CombinedChainDataClient combinedChainDataClient) {
this.spec = spec;
this.specConfigDeneb = specConfigDeneb;
this.combinedChainDataClient = combinedChainDataClient;
requestCounter =
metricsSystem.createLabelledCounter(
Expand All @@ -82,7 +79,7 @@ public BlobSidecarsByRootMessageHandler(
@Override
public Optional<RpcException> validateRequest(
final String protocolId, final BlobSidecarsByRootRequestMessage request) {
final int maxRequestBlobSidecars = specConfigDeneb.getMaxRequestBlobSidecars();
final int maxRequestBlobSidecars = getMaxRequestBlobSidecars();
if (request.size() > maxRequestBlobSidecars) {
requestCounter.labels("count_too_big").inc();
return Optional.of(
Expand Down Expand Up @@ -156,6 +153,13 @@ public void onIncomingMessage(
});
}

private int getMaxRequestBlobSidecars() {
final UInt64 epoch =
combinedChainDataClient.getRecentChainData().getCurrentEpoch().orElse(UInt64.ZERO);
final SpecMilestone specMilestone = spec.getForkSchedule().getSpecMilestoneAtEpoch(epoch);
return spec.forMilestone(specMilestone).miscHelpers().getMaxRequestBlobSidecars();
}

private UInt64 getFinalizedEpoch() {
return combinedChainDataClient
.getFinalizedBlock()
Expand Down
Loading