From 69c48f77a0d0e871d786c813dfa65e62a1793d69 Mon Sep 17 00:00:00 2001 From: rkapka Date: Mon, 14 Oct 2024 15:33:19 +0200 Subject: [PATCH 1/7] Better attestation packing for Electra --- .../operations/attestations/mock/BUILD.bazel | 1 + .../operations/attestations/mock/mock.go | 55 +++++--- .../validator/proposer_attestations.go | 61 ++++++--- .../proposer_attestations_electra.go | 3 + .../validator/proposer_attestations_test.go | 129 ++++++++++++++++++ 5 files changed, 208 insertions(+), 41 deletions(-) diff --git a/beacon-chain/operations/attestations/mock/BUILD.bazel b/beacon-chain/operations/attestations/mock/BUILD.bazel index 1976c31f5e1e..cb20f6379bad 100644 --- a/beacon-chain/operations/attestations/mock/BUILD.bazel +++ b/beacon-chain/operations/attestations/mock/BUILD.bazel @@ -7,6 +7,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations/mock", visibility = ["//visibility:public"], deps = [ + "//beacon-chain/operations/attestations:go_default_library", "//consensus-types/primitives:go_default_library", "//proto/prysm/v1alpha1:go_default_library", ], diff --git a/beacon-chain/operations/attestations/mock/mock.go b/beacon-chain/operations/attestations/mock/mock.go index a4101bbe51ee..5d17ad7878ce 100644 --- a/beacon-chain/operations/attestations/mock/mock.go +++ b/beacon-chain/operations/attestations/mock/mock.go @@ -3,13 +3,17 @@ package mock import ( "context" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" ) +var _ attestations.Pool = &PoolMock{} + // PoolMock -- type PoolMock struct { - AggregatedAtts []*ethpb.Attestation + AggregatedAtts []ethpb.Att + UnaggregatedAtts []ethpb.Att } // AggregateUnaggregatedAttestations -- @@ -23,18 +27,18 @@ func (*PoolMock) AggregateUnaggregatedAttestationsBySlotIndex(_ context.Context, } // SaveAggregatedAttestation -- -func (*PoolMock) SaveAggregatedAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) SaveAggregatedAttestation(_ ethpb.Att) error { panic("implement me") } // SaveAggregatedAttestations -- -func (m *PoolMock) SaveAggregatedAttestations(atts []*ethpb.Attestation) error { +func (m *PoolMock) SaveAggregatedAttestations(atts []ethpb.Att) error { m.AggregatedAtts = append(m.AggregatedAtts, atts...) return nil } // AggregatedAttestations -- -func (m *PoolMock) AggregatedAttestations() []*ethpb.Attestation { +func (m *PoolMock) AggregatedAttestations() []ethpb.Att { return m.AggregatedAtts } @@ -43,13 +47,18 @@ func (*PoolMock) AggregatedAttestationsBySlotIndex(_ context.Context, _ primitiv panic("implement me") } +// AggregatedAttestationsBySlotIndexElectra -- +func (*PoolMock) AggregatedAttestationsBySlotIndexElectra(_ context.Context, _ primitives.Slot, _ primitives.CommitteeIndex) []*ethpb.AttestationElectra { + panic("implement me") +} + // DeleteAggregatedAttestation -- -func (*PoolMock) DeleteAggregatedAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) DeleteAggregatedAttestation(_ ethpb.Att) error { panic("implement me") } // HasAggregatedAttestation -- -func (*PoolMock) HasAggregatedAttestation(_ *ethpb.Attestation) (bool, error) { +func (*PoolMock) HasAggregatedAttestation(_ ethpb.Att) (bool, error) { panic("implement me") } @@ -59,18 +68,19 @@ func (*PoolMock) AggregatedAttestationCount() int { } // SaveUnaggregatedAttestation -- -func (*PoolMock) SaveUnaggregatedAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) SaveUnaggregatedAttestation(_ ethpb.Att) error { panic("implement me") } // SaveUnaggregatedAttestations -- -func (*PoolMock) SaveUnaggregatedAttestations(_ []*ethpb.Attestation) error { - panic("implement me") +func (m *PoolMock) SaveUnaggregatedAttestations(atts []ethpb.Att) error { + m.UnaggregatedAtts = append(m.UnaggregatedAtts, atts...) + return nil } // UnaggregatedAttestations -- -func (*PoolMock) UnaggregatedAttestations() ([]*ethpb.Attestation, error) { - panic("implement me") +func (m *PoolMock) UnaggregatedAttestations() ([]ethpb.Att, error) { + return m.UnaggregatedAtts, nil } // UnaggregatedAttestationsBySlotIndex -- @@ -78,8 +88,13 @@ func (*PoolMock) UnaggregatedAttestationsBySlotIndex(_ context.Context, _ primit panic("implement me") } +// UnaggregatedAttestationsBySlotIndexElectra -- +func (*PoolMock) UnaggregatedAttestationsBySlotIndexElectra(_ context.Context, _ primitives.Slot, _ primitives.CommitteeIndex) []*ethpb.AttestationElectra { + panic("implement me") +} + // DeleteUnaggregatedAttestation -- -func (*PoolMock) DeleteUnaggregatedAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) DeleteUnaggregatedAttestation(_ ethpb.Att) error { panic("implement me") } @@ -94,42 +109,42 @@ func (*PoolMock) UnaggregatedAttestationCount() int { } // SaveBlockAttestation -- -func (*PoolMock) SaveBlockAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) SaveBlockAttestation(_ ethpb.Att) error { panic("implement me") } // SaveBlockAttestations -- -func (*PoolMock) SaveBlockAttestations(_ []*ethpb.Attestation) error { +func (*PoolMock) SaveBlockAttestations(_ []ethpb.Att) error { panic("implement me") } // BlockAttestations -- -func (*PoolMock) BlockAttestations() []*ethpb.Attestation { +func (*PoolMock) BlockAttestations() []ethpb.Att { panic("implement me") } // DeleteBlockAttestation -- -func (*PoolMock) DeleteBlockAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) DeleteBlockAttestation(_ ethpb.Att) error { panic("implement me") } // SaveForkchoiceAttestation -- -func (*PoolMock) SaveForkchoiceAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) SaveForkchoiceAttestation(_ ethpb.Att) error { panic("implement me") } // SaveForkchoiceAttestations -- -func (*PoolMock) SaveForkchoiceAttestations(_ []*ethpb.Attestation) error { +func (*PoolMock) SaveForkchoiceAttestations(_ []ethpb.Att) error { panic("implement me") } // ForkchoiceAttestations -- -func (*PoolMock) ForkchoiceAttestations() []*ethpb.Attestation { +func (*PoolMock) ForkchoiceAttestations() []ethpb.Att { panic("implement me") } // DeleteForkchoiceAttestation -- -func (*PoolMock) DeleteForkchoiceAttestation(_ *ethpb.Attestation) error { +func (*PoolMock) DeleteForkchoiceAttestation(_ ethpb.Att) error { panic("implement me") } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go index 4446f853f2e0..73aa5806a416 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go @@ -91,16 +91,26 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon var attsForInclusion proposerAtts if postElectra { - // TODO: hack for Electra devnet-1, take only one aggregate per ID - // (which essentially means one aggregate for an attestation_data+committee combination - topAggregates := make([]ethpb.Att, 0) - for _, v := range attsById { - topAggregates = append(topAggregates, v[0]) - } + idx := 0 + for { + topAggregates := make([]ethpb.Att, 0, len(attsById)) + for _, v := range attsById { + if len(v) > idx { + topAggregates = append(topAggregates, v[idx]) + } + } - attsForInclusion, err = computeOnChainAggregate(topAggregates) - if err != nil { - return nil, err + if len(topAggregates) == 0 { + break + } + + onChainAggregates, err := computeOnChainAggregate(topAggregates) + if err != nil { + return nil, err + } + attsForInclusion = append(attsForInclusion, onChainAggregates...) + + idx++ } } else { attsForInclusion = make([]ethpb.Att, 0) @@ -113,10 +123,20 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon if err != nil { return nil, err } - sorted, err := deduped.sort() - if err != nil { - return nil, err + + var sorted proposerAtts + if postElectra { + sorted, err = deduped.sortOnChainAggregates() + if err != nil { + return nil, err + } + } else { + sorted, err = deduped.sort() + if err != nil { + return nil, err + } } + atts = sorted.limitToMaxAttestations() return vs.filterAttestationBySignature(ctx, atts, latestState) } @@ -223,6 +243,14 @@ func (a proposerAtts) sort() (proposerAtts, error) { return a.sortBySlotAndCommittee() } +func (a proposerAtts) sortOnChainAggregates() (proposerAtts, error) { + if len(a) < 2 { + return a, nil + } + + return a.sortByProfitabilityUsingMaxCover() +} + // Separate attestations by slot, as slot number takes higher precedence when sorting. // Also separate by committee index because maxcover will prefer attestations for the same // committee with disjoint bits over attestations for different committees with overlapping @@ -231,7 +259,6 @@ func (a proposerAtts) sortBySlotAndCommittee() (proposerAtts, error) { type slotAtts struct { candidates map[primitives.CommitteeIndex]proposerAtts selected map[primitives.CommitteeIndex]proposerAtts - leftover map[primitives.CommitteeIndex]proposerAtts } var slots []primitives.Slot @@ -250,7 +277,6 @@ func (a proposerAtts) sortBySlotAndCommittee() (proposerAtts, error) { var err error for _, sa := range attsBySlot { sa.selected = make(map[primitives.CommitteeIndex]proposerAtts) - sa.leftover = make(map[primitives.CommitteeIndex]proposerAtts) for ci, committeeAtts := range sa.candidates { sa.selected[ci], err = committeeAtts.sortByProfitabilityUsingMaxCover_committeeAwarePacking() if err != nil { @@ -266,9 +292,6 @@ func (a proposerAtts) sortBySlotAndCommittee() (proposerAtts, error) { for _, slot := range slots { sortedAtts = append(sortedAtts, sortSlotAttestations(attsBySlot[slot].selected)...) } - for _, slot := range slots { - sortedAtts = append(sortedAtts, sortSlotAttestations(attsBySlot[slot].leftover)...) - } return sortedAtts, nil } @@ -287,15 +310,11 @@ func (a proposerAtts) sortByProfitabilityUsingMaxCover_committeeAwarePacking() ( return nil, err } } - // Add selected candidates on top, those that are not selected - append at bottom. selectedKeys, _, err := aggregation.MaxCover(candidates, len(candidates), true /* allowOverlaps */) if err != nil { log.WithError(err).Debug("MaxCover aggregation failed") return a, nil } - - // Pick selected attestations first, leftover attestations will be appended at the end. - // Both lists will be sorted by number of bits set. selected := make(proposerAtts, selectedKeys.Count()) for i, key := range selectedKeys.BitIndices() { selected[i] = a[key] diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_electra.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_electra.go index e15df73bcaa4..4c3fb63f33e3 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_electra.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_electra.go @@ -13,6 +13,9 @@ import ( // computeOnChainAggregate constructs a final aggregate form a list of network aggregates with equal attestation data. // It assumes that each network aggregate has exactly one committee bit set. // +// Our implementation allows to pass aggregates for different attestation data, in which case the function will return +// one final aggregate per attestation data. +// // Spec definition: // // def compute_on_chain_aggregate(network_aggregates: Sequence[Attestation]) -> Attestation: diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go index a4f9668861f2..d67f2c42b463 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go @@ -9,10 +9,12 @@ import ( "github.com/prysmaticlabs/go-bitfield" chainMock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations/mock" "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/crypto/bls/blst" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/assert" "github.com/prysmaticlabs/prysm/v5/testing/require" @@ -680,6 +682,133 @@ func Test_packAttestations(t *testing.T) { }) } +func Test_packAttestations_ElectraOnChainAggregates(t *testing.T) { + params.SetupTestConfigCleanup(t) + cfg := params.BeaconConfig().Copy() + cfg.ElectraForkEpoch = 1 + params.OverrideBeaconConfig(cfg) + + ctx := context.Background() + key, err := blst.RandKey() + require.NoError(t, err) + sig := key.Sign([]byte{'X'}) + + cb0 := primitives.NewAttestationCommitteeBits() + cb0.SetBitAt(0, true) + cb1 := primitives.NewAttestationCommitteeBits() + cb1.SetBitAt(1, true) + + data0 := util.HydrateAttestationData(ðpb.AttestationData{BeaconBlockRoot: bytesutil.PadTo([]byte{'0'}, 32)}) + data1 := util.HydrateAttestationData(ðpb.AttestationData{BeaconBlockRoot: bytesutil.PadTo([]byte{'1'}, 32)}) + + // Glossary: + // - Single Aggregate: aggregate with exactly one committee bit set, from which an On-Chain Aggregate is constructed + // - On-Chain Aggregate: final aggregate packed into a block + // + // We construct the following number of single aggregates: + // - data_root_0 and committee_index_0: 3 single aggregates + // - data_root_0 and committee_index_1: 2 single aggregates + // - data_root_1 and committee_index_0: 1 single aggregate + // - data_root_1 and committee_index_1: 3 single aggregates + // + // Because the function tries to aggregate attestations, we have to create attestations which are not aggregatable. + // It suffices that they have overlapping aggregation bits. + // + // The result should be the following six on-chain aggregates: + // - for data_root_0 combining single aggregates at index 0 for each committee + // - for data_root_0 combining single aggregates at index 1 for each committee + // - for data_root_0 constructed from the single aggregate at index 2 for committee_index_0 + // - for data_root_1 combining single aggregates at index 0 for each committee + // - for data_root_1 constructed from the single aggregate at index 1 for committee_index_1 + // - for data_root_1 constructed from the single aggregate at index 2 for committee_index_1 + // + // This test has no control over the index of a single aggregate because MaxCover may rearrange them. + // It means that even if we save d0_c0_a1, d0_c0_a2 and d0_c0_a3 to the pool in this exact order, + // at the time of constructing the on-chain aggregate they may be rearranged. This does not + // change the total number of on-chain aggregates constructed because the number of single aggregates + // remains the same. + + d0_c0_a1 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b11110}, + CommitteeBits: cb0, + Data: data0, + Signature: sig.Marshal(), + } + d0_c0_a2 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b11001}, + CommitteeBits: cb0, + Data: data0, + Signature: sig.Marshal(), + } + d0_c0_a3 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b10111}, + CommitteeBits: cb0, + Data: data0, + Signature: sig.Marshal(), + } + d0_c1_a1 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b11110}, + CommitteeBits: cb1, + Data: data0, + Signature: sig.Marshal(), + } + d0_c1_a2 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b10111}, + CommitteeBits: cb1, + Data: data0, + Signature: sig.Marshal(), + } + d1_c0_a1 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b11111}, + CommitteeBits: cb0, + Data: data1, + Signature: sig.Marshal(), + } + d1_c1_a1 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b11110}, + CommitteeBits: cb1, + Data: data1, + Signature: sig.Marshal(), + } + d1_c1_a2 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b11001}, + CommitteeBits: cb1, + Data: data1, + Signature: sig.Marshal(), + } + d1_c1_a3 := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b10111}, + CommitteeBits: cb1, + Data: data1, + Signature: sig.Marshal(), + } + + pool := &mock.PoolMock{} + require.NoError(t, pool.SaveAggregatedAttestations([]ethpb.Att{d0_c0_a1, d0_c0_a2, d0_c0_a3, d0_c1_a1, d0_c1_a2, d1_c0_a1, d1_c1_a1, d1_c1_a2, d1_c1_a3})) + slot := primitives.Slot(1) + s := &Server{AttPool: pool, HeadFetcher: &chainMock.ChainService{}, TimeFetcher: &chainMock.ChainService{Slot: &slot}} + st, _ := util.DeterministicGenesisStateElectra(t, 128) + require.NoError(t, st.SetSlot(params.BeaconConfig().SlotsPerEpoch+1)) + + atts, err := s.packAttestations(ctx, st, params.BeaconConfig().SlotsPerEpoch) + require.NoError(t, err) + require.Equal(t, 6, len(atts)) + + t.Run("slot takes precedence", func(t *testing.T) { + moreRecentAtt := ðpb.AttestationElectra{ + AggregationBits: bitfield.Bitlist{0b11000}, // we set only one bit for committee_index_0 + CommitteeBits: cb1, + Data: util.HydrateAttestationData(ðpb.AttestationData{Slot: 1, BeaconBlockRoot: bytesutil.PadTo([]byte{'0'}, 32)}), + Signature: sig.Marshal(), + } + require.NoError(t, pool.SaveUnaggregatedAttestations([]ethpb.Att{moreRecentAtt})) + atts, err = s.packAttestations(ctx, st, params.BeaconConfig().SlotsPerEpoch) + require.NoError(t, err) + require.Equal(t, 7, len(atts)) + assert.Equal(t, true, atts[0].GetData().Slot == 1) + }) +} + func Test_limitToMaxAttestations(t *testing.T) { t.Run("Phase 0", func(t *testing.T) { atts := make([]ethpb.Att, params.BeaconConfig().MaxAttestations+1) From fb0d395b540ef29db4da1dd0cb7fdec2de1fe4b4 Mon Sep 17 00:00:00 2001 From: rkapka Date: Mon, 14 Oct 2024 15:38:29 +0200 Subject: [PATCH 2/7] changelog <3 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed4472c6efea..91f8c744e7a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - HTTP endpoint for PublishBlobs - GetBlockV2, GetBlindedBlock, ProduceBlockV2, ProduceBlockV3: add Electra case. - SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413) +- Better attestation packing for Electra. [PR](https://github.com/prysmaticlabs/prysm/pull/14534) ### Changed From fd2501bc47c75af64365467da17054519e8f8ccb Mon Sep 17 00:00:00 2001 From: rkapka Date: Mon, 14 Oct 2024 15:39:43 +0200 Subject: [PATCH 3/7] bzl --- beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel b/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel index 50acfb14486e..4eb081712a47 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel @@ -212,7 +212,9 @@ go_test( embed = [":go_default_library"], eth_network = "minimal", tags = ["minimal"], - deps = common_deps, + deps = common_deps + [ + "//beacon-chain/operations/attestations/mock:go_default_library", + ], ) go_test( From 60f29e39010bfc4720d5f60b1ae9ba877725ea94 Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 25 Oct 2024 16:32:25 +0700 Subject: [PATCH 4/7] sort before constructing on-chain aggregates --- .../validator/proposer_attestations.go | 5 +- .../validator/proposer_attestations_test.go | 52 +++++++++++-------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go index 73aa5806a416..7573411b2c3b 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go @@ -86,7 +86,10 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon if err != nil { return nil, err } - attsById[id] = as + attsById[id], err = proposerAtts(as).sort() + if err != nil { + return nil, err + } } var attsForInclusion proposerAtts diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go index d67f2c42b463..e6a0f1aec6e5 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go @@ -711,73 +711,69 @@ func Test_packAttestations_ElectraOnChainAggregates(t *testing.T) { // - data_root_1 and committee_index_0: 1 single aggregate // - data_root_1 and committee_index_1: 3 single aggregates // - // Because the function tries to aggregate attestations, we have to create attestations which are not aggregatable. - // It suffices that they have overlapping aggregation bits. + // Because the function tries to aggregate attestations, we have to create attestations which are not aggregatable + // and are not redundant when using MaxCover. + // The function should also sort attestation by ID before computing the On-Chain Aggregate, so we want unsorted aggregation bits + // to test the sorting part. // // The result should be the following six on-chain aggregates: - // - for data_root_0 combining single aggregates at index 0 for each committee - // - for data_root_0 combining single aggregates at index 1 for each committee + // - for data_root_0 combining the most profitable aggregate for each committee + // - for data_root_0 combining the second most profitable aggregate for each committee // - for data_root_0 constructed from the single aggregate at index 2 for committee_index_0 - // - for data_root_1 combining single aggregates at index 0 for each committee + // - for data_root_1 combining the most profitable aggregate for each committee // - for data_root_1 constructed from the single aggregate at index 1 for committee_index_1 // - for data_root_1 constructed from the single aggregate at index 2 for committee_index_1 - // - // This test has no control over the index of a single aggregate because MaxCover may rearrange them. - // It means that even if we save d0_c0_a1, d0_c0_a2 and d0_c0_a3 to the pool in this exact order, - // at the time of constructing the on-chain aggregate they may be rearranged. This does not - // change the total number of on-chain aggregates constructed because the number of single aggregates - // remains the same. d0_c0_a1 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b11110}, + AggregationBits: bitfield.Bitlist{0b1000011}, CommitteeBits: cb0, Data: data0, Signature: sig.Marshal(), } d0_c0_a2 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b11001}, + AggregationBits: bitfield.Bitlist{0b1100101}, CommitteeBits: cb0, Data: data0, Signature: sig.Marshal(), } d0_c0_a3 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b10111}, + AggregationBits: bitfield.Bitlist{0b1111000}, CommitteeBits: cb0, Data: data0, Signature: sig.Marshal(), } d0_c1_a1 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b11110}, + AggregationBits: bitfield.Bitlist{0b1111100}, CommitteeBits: cb1, Data: data0, Signature: sig.Marshal(), } d0_c1_a2 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b10111}, + AggregationBits: bitfield.Bitlist{0b1001111}, CommitteeBits: cb1, Data: data0, Signature: sig.Marshal(), } d1_c0_a1 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b11111}, + AggregationBits: bitfield.Bitlist{0b1111111}, CommitteeBits: cb0, Data: data1, Signature: sig.Marshal(), } d1_c1_a1 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b11110}, + AggregationBits: bitfield.Bitlist{0b1000011}, CommitteeBits: cb1, Data: data1, Signature: sig.Marshal(), } d1_c1_a2 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b11001}, + AggregationBits: bitfield.Bitlist{0b1100101}, CommitteeBits: cb1, Data: data1, Signature: sig.Marshal(), } d1_c1_a3 := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b10111}, + AggregationBits: bitfield.Bitlist{0b1111000}, CommitteeBits: cb1, Data: data1, Signature: sig.Marshal(), @@ -787,16 +783,28 @@ func Test_packAttestations_ElectraOnChainAggregates(t *testing.T) { require.NoError(t, pool.SaveAggregatedAttestations([]ethpb.Att{d0_c0_a1, d0_c0_a2, d0_c0_a3, d0_c1_a1, d0_c1_a2, d1_c0_a1, d1_c1_a1, d1_c1_a2, d1_c1_a3})) slot := primitives.Slot(1) s := &Server{AttPool: pool, HeadFetcher: &chainMock.ChainService{}, TimeFetcher: &chainMock.ChainService{Slot: &slot}} - st, _ := util.DeterministicGenesisStateElectra(t, 128) + + // We need the correct number of validators so that there are at least 2 committees per slot + // and each committee has exactly 6 validators (this is because we have 6 aggregation bits). + st, _ := util.DeterministicGenesisStateElectra(t, 192) + require.NoError(t, st.SetSlot(params.BeaconConfig().SlotsPerEpoch+1)) atts, err := s.packAttestations(ctx, st, params.BeaconConfig().SlotsPerEpoch) require.NoError(t, err) require.Equal(t, 6, len(atts)) + assert.Equal(t, true, + atts[0].GetAggregationBits().Count() >= atts[1].GetAggregationBits().Count() && + atts[1].GetAggregationBits().Count() >= atts[2].GetAggregationBits().Count() && + atts[2].GetAggregationBits().Count() >= atts[3].GetAggregationBits().Count() && + atts[3].GetAggregationBits().Count() >= atts[4].GetAggregationBits().Count() && + atts[4].GetAggregationBits().Count() >= atts[5].GetAggregationBits().Count(), + "on-chain aggregates are not sorted by aggregation bit count", + ) t.Run("slot takes precedence", func(t *testing.T) { moreRecentAtt := ðpb.AttestationElectra{ - AggregationBits: bitfield.Bitlist{0b11000}, // we set only one bit for committee_index_0 + AggregationBits: bitfield.Bitlist{0b1100000}, // we set only one bit for committee_index_0 CommitteeBits: cb1, Data: util.HydrateAttestationData(ðpb.AttestationData{Slot: 1, BeaconBlockRoot: bytesutil.PadTo([]byte{'0'}, 32)}), Signature: sig.Marshal(), From 42271cc30be5d55bda17b7820b68deba6b1c1146 Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 25 Oct 2024 17:21:59 +0700 Subject: [PATCH 5/7] move ctx to top --- .../rpc/prysm/v1alpha1/validator/proposer_attestations_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go index e6a0f1aec6e5..753511d978f1 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go @@ -683,12 +683,13 @@ func Test_packAttestations(t *testing.T) { } func Test_packAttestations_ElectraOnChainAggregates(t *testing.T) { + ctx := context.Background() + params.SetupTestConfigCleanup(t) cfg := params.BeaconConfig().Copy() cfg.ElectraForkEpoch = 1 params.OverrideBeaconConfig(cfg) - ctx := context.Background() key, err := blst.RandKey() require.NoError(t, err) sig := key.Sign([]byte{'X'}) From b2d96d829573ec446454efea108ad5d8eb57437f Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 25 Oct 2024 20:42:51 +0700 Subject: [PATCH 6/7] extract Electra logic and add comments --- .../validator/proposer_attestations.go | 72 ++++++++++++------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go index 7573411b2c3b..6f98ebd34b28 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations.go @@ -86,34 +86,14 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon if err != nil { return nil, err } - attsById[id], err = proposerAtts(as).sort() - if err != nil { - return nil, err - } + attsById[id] = as } var attsForInclusion proposerAtts if postElectra { - idx := 0 - for { - topAggregates := make([]ethpb.Att, 0, len(attsById)) - for _, v := range attsById { - if len(v) > idx { - topAggregates = append(topAggregates, v[idx]) - } - } - - if len(topAggregates) == 0 { - break - } - - onChainAggregates, err := computeOnChainAggregate(topAggregates) - if err != nil { - return nil, err - } - attsForInclusion = append(attsForInclusion, onChainAggregates...) - - idx++ + attsForInclusion, err = onChainAggregates(attsById) + if err != nil { + return nil, err } } else { attsForInclusion = make([]ethpb.Att, 0) @@ -144,6 +124,50 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon return vs.filterAttestationBySignature(ctx, atts, latestState) } +func onChainAggregates(attsById map[attestation.Id][]ethpb.Att) (proposerAtts, error) { + var result proposerAtts + var err error + + // When constructing on-chain aggregates, we want to combine the most profitable + // aggregate for each ID, then the second most profitable, and so on and so forth. + // Because of this we sort attestations at the beginning. + for id, as := range attsById { + attsById[id], err = proposerAtts(as).sort() + if err != nil { + return nil, err + } + } + + // We construct the first on-chain aggregate by taking the first aggregate for each ID. + // We construct the second on-chain aggregate by taking the second aggregate for each ID. + // We continue doing this until we run out of aggregates. + idx := 0 + for { + topAggregates := make([]ethpb.Att, 0, len(attsById)) + for _, as := range attsById { + // In case there are no more aggregates for an ID, we skip that ID. + if len(as) > idx { + topAggregates = append(topAggregates, as[idx]) + } + } + + // Once there are no more aggregates for any ID, we are done. + if len(topAggregates) == 0 { + break + } + + onChainAggs, err := computeOnChainAggregate(topAggregates) + if err != nil { + return nil, err + } + result = append(result, onChainAggs...) + + idx++ + } + + return result, nil +} + // filter separates attestation list into two groups: valid and invalid attestations. // The first group passes the all the required checks for attestation to be considered for proposing. // And attestations from the second group should be deleted. From a7258d3d40dc8b14b96179089d65e8443036c57c Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 1 Nov 2024 20:53:05 +0700 Subject: [PATCH 7/7] benchmark --- .../validator/proposer_attestations_test.go | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go index 753511d978f1..fc1b7808b621 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_attestations_test.go @@ -3,11 +3,13 @@ package validator import ( "bytes" "context" + "math/rand" "sort" "testing" "github.com/prysmaticlabs/go-bitfield" chainMock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations/mock" "github.com/prysmaticlabs/prysm/v5/config/features" @@ -818,6 +820,64 @@ func Test_packAttestations_ElectraOnChainAggregates(t *testing.T) { }) } +func Benchmark_packAttestations_Electra(b *testing.B) { + ctx := context.Background() + + params.SetupTestConfigCleanup(b) + cfg := params.BeaconConfig().Copy() + cfg.ElectraForkEpoch = 1 + params.OverrideBeaconConfig(cfg) + + valCount := uint64(65536) + committeeCount := helpers.SlotCommitteeCount(valCount) + valsPerCommittee := valCount / committeeCount / uint64(params.BeaconConfig().SlotsPerEpoch) + + st, _ := util.DeterministicGenesisStateElectra(b, valCount) + + key, err := blst.RandKey() + require.NoError(b, err) + sig := key.Sign([]byte{'X'}) + + r := rand.New(rand.NewSource(123)) + + var atts []ethpb.Att + for i := uint64(0); i < uint64(params.BeaconConfig().SlotsPerEpoch); i++ { + for c := uint64(0); c < committeeCount; c++ { + for a := uint64(0); a < params.BeaconConfig().TargetAggregatorsPerCommittee; a++ { + cb := primitives.NewAttestationCommitteeBits() + cb.SetBitAt(c, true) + + att := ðpb.AttestationElectra{ + Data: util.HydrateAttestationData(ðpb.AttestationData{BeaconBlockRoot: bytesutil.PadTo(bytesutil.Uint64ToBytesLittleEndian(i), 32)}), + AggregationBits: bitfield.NewBitlist(valsPerCommittee), + CommitteeBits: cb, + Signature: sig.Marshal(), + } + + for bit := uint64(0); bit < valsPerCommittee; bit++ { + att.AggregationBits.SetBitAt(bit, r.Intn(2) == 1) + } + + atts = append(atts, att) + } + } + } + + pool := &mock.PoolMock{} + require.NoError(b, pool.SaveAggregatedAttestations(atts)) + + slot := primitives.Slot(1) + s := &Server{AttPool: pool, HeadFetcher: &chainMock.ChainService{}, TimeFetcher: &chainMock.ChainService{Slot: &slot}} + + require.NoError(b, st.SetSlot(params.BeaconConfig().SlotsPerEpoch+1)) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = s.packAttestations(ctx, st, params.BeaconConfig().SlotsPerEpoch) + require.NoError(b, err) + } +} + func Test_limitToMaxAttestations(t *testing.T) { t.Run("Phase 0", func(t *testing.T) { atts := make([]ethpb.Att, params.BeaconConfig().MaxAttestations+1)