From 82d27231d95ff8027cf932aa0481e50d03d5aac9 Mon Sep 17 00:00:00 2001 From: Saolyn Date: Tue, 5 Nov 2024 18:00:06 +0100 Subject: [PATCH 1/6] modify v1 and add v2 --- api/server/structs/endpoints_beacon.go | 6 +- beacon-chain/rpc/eth/beacon/handlers_pool.go | 217 +++++++++++++++++- .../rpc/eth/beacon/handlers_pool_test.go | 1 + 3 files changed, 222 insertions(+), 2 deletions(-) diff --git a/api/server/structs/endpoints_beacon.go b/api/server/structs/endpoints_beacon.go index 273e9578530..9fc51c9ac8c 100644 --- a/api/server/structs/endpoints_beacon.go +++ b/api/server/structs/endpoints_beacon.go @@ -25,8 +25,12 @@ type ListAttestationsResponse struct { Data json.RawMessage `json:"data"` } +//type SubmitAttestationsRequest struct { +// Data []*Attestation `json:"data"` +//} + type SubmitAttestationsRequest struct { - Data []*Attestation `json:"data"` + Data json.RawMessage `json:"data"` } type ListVoluntaryExitsResponse struct { diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index 6755e85d794..8a7e8c472d6 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "time" @@ -194,9 +195,26 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { return } + // Attempt to unmarshal Data into a slice of attestations + var sourceAttestations []*structs.Attestation + if err := json.Unmarshal(req.Data, &sourceAttestations); err != nil { + // If that fails, try unmarshaling into a single attestation + var singleAttestation *structs.Attestation + if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { + httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) + return + } + sourceAttestations = append(sourceAttestations, singleAttestation) + } + + if len(sourceAttestations) == 0 { + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + return + } + var validAttestations []*eth.Attestation var attFailures []*server.IndexedVerificationFailure - for i, sourceAtt := range req.Data { + for i, sourceAtt := range sourceAttestations { att, err := sourceAtt.ToConsensus() if err != nil { attFailures = append(attFailures, &server.IndexedVerificationFailure{ @@ -241,6 +259,8 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { log.WithError(err).Errorf("could not broadcast attestation at index %d", i) + failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + continue } if corehelpers.IsAggregated(att) { @@ -272,6 +292,201 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { } } +// SubmitAttestationsV2 submits an attestation object to node. If the attestation passes all validation +// constraints, node MUST publish the attestation on an appropriate subnet. +func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { + ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2") + defer span.End() + + var req structs.SubmitAttestationsRequest + err := json.NewDecoder(r.Body).Decode(&req.Data) + switch { + case errors.Is(err, io.EOF): + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + return + case err != nil: + httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest) + return + } + if len(req.Data) == 0 { + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + return + } + + versionHeader := r.Header.Get(api.VersionHeader) + if versionHeader == "" { + httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest) + } + v, err := version.FromString(versionHeader) + if err != nil { + httputil.HandleError(w, "Invalid version: "+err.Error(), http.StatusBadRequest) + return + } + + var attFailures []*server.IndexedVerificationFailure + failedBroadcasts := make([]string, 0) + + if v >= version.Electra { + var sourceAttestations []*structs.AttestationElectra + if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { + // If that fails, try unmarshaling into a single attestation + var singleAttestation *structs.AttestationElectra + if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { + httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) + return + } + sourceAttestations = append(sourceAttestations, singleAttestation) + } + if len(sourceAttestations) == 0 { + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + return + } + + var validAttestations []*eth.AttestationElectra + for i, sourceAtt := range sourceAttestations { + att, err := sourceAtt.ToConsensus() + if err != nil { + attFailures = append(attFailures, &server.IndexedVerificationFailure{ + Index: i, + Message: "Could not convert request attestation to consensus attestation: " + err.Error(), + }) + continue + } + if _, err = bls.SignatureFromBytes(att.Signature); err != nil { + attFailures = append(attFailures, &server.IndexedVerificationFailure{ + Index: i, + Message: "Incorrect attestation signature: " + err.Error(), + }) + continue + } + validAttestations = append(validAttestations, att) + } + + for i, att := range validAttestations { + if !corehelpers.IsAggregated(att) { + s.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.UnaggregatedAttReceived, + Data: &operation.UnAggregatedAttReceivedData{ + Attestation: att, + }, + }) + } + + wantedEpoch := slots.ToEpoch(att.Data.Slot) + vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) + if err != nil { + httputil.HandleError(w, "Could not get head validator indices: "+err.Error(), http.StatusInternalServerError) + return + } + subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) + + if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { + log.WithError(err).Errorf("could not broadcast attestation at index %d", i) + failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + continue + } + + if corehelpers.IsAggregated(att) { + if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil { + log.WithError(err).Error("could not save aggregated attestation") + } + } else { + if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { + log.WithError(err).Error("could not save unaggregated attestation") + } + } + } + } else { + var sourceAttestations []*structs.Attestation + if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { + // If that fails, try unmarshaling into a single attestation + var singleAttestation *structs.Attestation + if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { + httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) + return + } + sourceAttestations = append(sourceAttestations, singleAttestation) + } + if len(sourceAttestations) == 0 { + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + return + } + + var validAttestations []*eth.Attestation + for i, sourceAtt := range sourceAttestations { + att, err := sourceAtt.ToConsensus() + if err != nil { + attFailures = append(attFailures, &server.IndexedVerificationFailure{ + Index: i, + Message: "Could not convert request attestation to consensus attestation: " + err.Error(), + }) + continue + } + if _, err = bls.SignatureFromBytes(att.Signature); err != nil { + attFailures = append(attFailures, &server.IndexedVerificationFailure{ + Index: i, + Message: "Incorrect attestation signature: " + err.Error(), + }) + continue + } + validAttestations = append(validAttestations, att) + } + + for i, att := range validAttestations { + if !corehelpers.IsAggregated(att) { + s.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.UnaggregatedAttReceived, + Data: &operation.UnAggregatedAttReceivedData{ + Attestation: att, + }, + }) + } + + wantedEpoch := slots.ToEpoch(att.Data.Slot) + vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) + if err != nil { + httputil.HandleError(w, "Could not get head validator indices: "+err.Error(), http.StatusInternalServerError) + return + } + subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) + + if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { + log.WithError(err).Errorf("could not broadcast attestation at index %d", i) + failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + continue + } + + if corehelpers.IsAggregated(att) { + if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil { + log.WithError(err).Error("could not save aggregated attestation") + } + } else { + if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { + log.WithError(err).Error("could not save unaggregated attestation") + } + } + } + } + + if len(failedBroadcasts) > 0 { + httputil.HandleError( + w, + fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")), + http.StatusInternalServerError, + ) + return + } + + if len(attFailures) > 0 { + failuresErr := &server.IndexedVerificationFailureError{ + Code: http.StatusBadRequest, + Message: "One or more attestations failed validation", + Failures: attFailures, + } + httputil.WriteError(w, failuresErr) + } +} + // ListVoluntaryExits retrieves voluntary exits known by the node but // not necessarily incorporated into any block. func (s *Server) ListVoluntaryExits(w http.ResponseWriter, r *http.Request) { diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go index 5c627acdd69..1ad1498918a 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go @@ -514,6 +514,7 @@ func TestSubmitAttestations(t *testing.T) { writer.Body = &bytes.Buffer{} s.SubmitAttestations(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, 1, broadcaster.NumAttestations()) From 744aca0e5eaa68e163d600d2b2af9157430d2aab Mon Sep 17 00:00:00 2001 From: Saolyn Date: Tue, 5 Nov 2024 20:28:24 +0100 Subject: [PATCH 2/6] test --- .../rpc/eth/beacon/handlers_pool_test.go | 429 ++++++++++++++---- 1 file changed, 351 insertions(+), 78 deletions(-) diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go index 1ad1498918a..590eddb86e0 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go @@ -500,96 +500,290 @@ func TestSubmitAttestations(t *testing.T) { ChainInfoFetcher: chainService, OperationNotifier: &blockchainmock.MockOperationNotifier{}, } + t.Run("V1", func(t *testing.T) { + t.Run("single", func(t *testing.T) { + broadcaster := &p2pMock.MockBroadcaster{} + s.Broadcaster = broadcaster + s.AttestationsPool = attestations.NewPool() - t.Run("single", func(t *testing.T) { - broadcaster := &p2pMock.MockBroadcaster{} - s.Broadcaster = broadcaster - s.AttestationsPool = attestations.NewPool() + var body bytes.Buffer + _, err := body.WriteString(singleAtt) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} - var body bytes.Buffer - _, err := body.WriteString(singleAtt) - require.NoError(t, err) - request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) - writer := httptest.NewRecorder() - writer.Body = &bytes.Buffer{} + s.SubmitAttestations(writer, request) - s.SubmitAttestations(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) + assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) + assert.Equal(t, 1, broadcaster.NumAttestations()) + assert.Equal(t, "0x03", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetAggregationBits())) + assert.Equal(t, "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetSignature())) + assert.Equal(t, primitives.Slot(0), broadcaster.BroadcastAttestations[0].GetData().Slot) + assert.Equal(t, primitives.CommitteeIndex(0), broadcaster.BroadcastAttestations[0].GetData().CommitteeIndex) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().BeaconBlockRoot)) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Source.Root)) + assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) + assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) + assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) + }) + t.Run("multiple", func(t *testing.T) { + broadcaster := &p2pMock.MockBroadcaster{} + s.Broadcaster = broadcaster + s.AttestationsPool = attestations.NewPool() - assert.Equal(t, http.StatusOK, writer.Code) - assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) - assert.Equal(t, 1, broadcaster.NumAttestations()) - assert.Equal(t, "0x03", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetAggregationBits())) - assert.Equal(t, "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetSignature())) - assert.Equal(t, primitives.Slot(0), broadcaster.BroadcastAttestations[0].GetData().Slot) - assert.Equal(t, primitives.CommitteeIndex(0), broadcaster.BroadcastAttestations[0].GetData().CommitteeIndex) - assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().BeaconBlockRoot)) - assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Source.Root)) - assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) - assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) - assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) - assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) - }) - t.Run("multiple", func(t *testing.T) { - broadcaster := &p2pMock.MockBroadcaster{} - s.Broadcaster = broadcaster - s.AttestationsPool = attestations.NewPool() + var body bytes.Buffer + _, err := body.WriteString(multipleAtts) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} - var body bytes.Buffer - _, err := body.WriteString(multipleAtts) - require.NoError(t, err) - request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) - writer := httptest.NewRecorder() - writer.Body = &bytes.Buffer{} + s.SubmitAttestations(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) + assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) + assert.Equal(t, 2, broadcaster.NumAttestations()) + assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) + }) + t.Run("no body", func(t *testing.T) { + request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} - s.SubmitAttestations(writer, request) - assert.Equal(t, http.StatusOK, writer.Code) - assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) - assert.Equal(t, 2, broadcaster.NumAttestations()) - assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) - }) - t.Run("no body", func(t *testing.T) { - request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) - writer := httptest.NewRecorder() - writer.Body = &bytes.Buffer{} + s.SubmitAttestations(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + }) + t.Run("empty", func(t *testing.T) { + var body bytes.Buffer + _, err := body.WriteString("[]") + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} - s.SubmitAttestations(writer, request) - assert.Equal(t, http.StatusBadRequest, writer.Code) - e := &httputil.DefaultJsonError{} - require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) - assert.Equal(t, http.StatusBadRequest, e.Code) - assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + s.SubmitAttestations(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + }) + t.Run("invalid", func(t *testing.T) { + var body bytes.Buffer + _, err := body.WriteString(invalidAtt) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestations(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &server.IndexedVerificationFailureError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + require.Equal(t, 1, len(e.Failures)) + assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature")) + }) }) - t.Run("empty", func(t *testing.T) { - var body bytes.Buffer - _, err := body.WriteString("[]") - require.NoError(t, err) - request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) - writer := httptest.NewRecorder() - writer.Body = &bytes.Buffer{} + t.Run("V2-pre-electra", func(t *testing.T) { + t.Run("single", func(t *testing.T) { + broadcaster := &p2pMock.MockBroadcaster{} + s.Broadcaster = broadcaster + s.AttestationsPool = attestations.NewPool() - s.SubmitAttestations(writer, request) - assert.Equal(t, http.StatusBadRequest, writer.Code) - e := &httputil.DefaultJsonError{} - require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) - assert.Equal(t, http.StatusBadRequest, e.Code) - assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + var body bytes.Buffer + _, err := body.WriteString(singleAtt) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Phase0)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + + assert.Equal(t, http.StatusOK, writer.Code) + assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) + assert.Equal(t, 1, broadcaster.NumAttestations()) + assert.Equal(t, "0x03", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetAggregationBits())) + assert.Equal(t, "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetSignature())) + assert.Equal(t, primitives.Slot(0), broadcaster.BroadcastAttestations[0].GetData().Slot) + assert.Equal(t, primitives.CommitteeIndex(0), broadcaster.BroadcastAttestations[0].GetData().CommitteeIndex) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().BeaconBlockRoot)) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Source.Root)) + assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) + assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) + assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) + }) + t.Run("multiple", func(t *testing.T) { + broadcaster := &p2pMock.MockBroadcaster{} + s.Broadcaster = broadcaster + s.AttestationsPool = attestations.NewPool() + + var body bytes.Buffer + _, err := body.WriteString(multipleAtts) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Phase0)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) + assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) + assert.Equal(t, 2, broadcaster.NumAttestations()) + assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) + }) + t.Run("no body", func(t *testing.T) { + request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) + request.Header.Set(api.VersionHeader, version.String(version.Phase0)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + }) + t.Run("empty", func(t *testing.T) { + var body bytes.Buffer + _, err := body.WriteString("[]") + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Phase0)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + }) + t.Run("invalid", func(t *testing.T) { + var body bytes.Buffer + _, err := body.WriteString(invalidAtt) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Phase0)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &server.IndexedVerificationFailureError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + require.Equal(t, 1, len(e.Failures)) + assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature")) + }) }) - t.Run("invalid", func(t *testing.T) { - var body bytes.Buffer - _, err := body.WriteString(invalidAtt) - require.NoError(t, err) - request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) - writer := httptest.NewRecorder() - writer.Body = &bytes.Buffer{} + t.Run("V2-post-electra", func(t *testing.T) { + t.Run("single", func(t *testing.T) { + broadcaster := &p2pMock.MockBroadcaster{} + s.Broadcaster = broadcaster + s.AttestationsPool = attestations.NewPool() - s.SubmitAttestations(writer, request) - assert.Equal(t, http.StatusBadRequest, writer.Code) - e := &server.IndexedVerificationFailureError{} - require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) - assert.Equal(t, http.StatusBadRequest, e.Code) - require.Equal(t, 1, len(e.Failures)) - assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature")) + var body bytes.Buffer + _, err := body.WriteString(singleAttElectra) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Electra)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + + assert.Equal(t, http.StatusOK, writer.Code) + assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) + assert.Equal(t, 1, broadcaster.NumAttestations()) + assert.Equal(t, "0x03", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetAggregationBits())) + assert.Equal(t, "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetSignature())) + assert.Equal(t, primitives.Slot(0), broadcaster.BroadcastAttestations[0].GetData().Slot) + assert.Equal(t, primitives.CommitteeIndex(0), broadcaster.BroadcastAttestations[0].GetData().CommitteeIndex) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().BeaconBlockRoot)) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Source.Root)) + assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) + assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) + assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) + assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) + }) + t.Run("multiple", func(t *testing.T) { + broadcaster := &p2pMock.MockBroadcaster{} + s.Broadcaster = broadcaster + s.AttestationsPool = attestations.NewPool() + + var body bytes.Buffer + _, err := body.WriteString(multipleAttsElectra) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Electra)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) + assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) + assert.Equal(t, 2, broadcaster.NumAttestations()) + assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) + }) + t.Run("no body", func(t *testing.T) { + request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) + request.Header.Set(api.VersionHeader, version.String(version.Electra)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + }) + t.Run("empty", func(t *testing.T) { + var body bytes.Buffer + _, err := body.WriteString("[]") + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Electra)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) + }) + t.Run("invalid", func(t *testing.T) { + var body bytes.Buffer + _, err := body.WriteString(invalidAttElectra) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Electra)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusBadRequest, writer.Code) + e := &server.IndexedVerificationFailureError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, e.Code) + require.Equal(t, 1, len(e.Failures)) + assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature")) + }) }) + } func TestListVoluntaryExits(t *testing.T) { @@ -2064,6 +2258,85 @@ var ( } } } +]` + singleAttElectra = `[ + { + "aggregation_bits": "0x03", + "committee_bits": "0x0100000000000000", + "signature": "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", + "data": { + "slot": "0", + "index": "0", + "beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", + "source": { + "epoch": "0", + "root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2" + }, + "target": { + "epoch": "0", + "root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2" + } + } + } +]` + multipleAttsElectra = `[ + { + "aggregation_bits": "0x03", + "committee_bits": "0x0100000000000000", + "signature": "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", + "data": { + "slot": "0", + "index": "0", + "beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", + "source": { + "epoch": "0", + "root": "0x736f75726365726f6f7431000000000000000000000000000000000000000000" + }, + "target": { + "epoch": "0", + "root": "0x746172676574726f6f7431000000000000000000000000000000000000000000" + } + } + }, + { + "aggregation_bits": "0x03", + "committee_bits": "0x0100000000000000", + "signature": "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", + "data": { + "slot": "0", + "index": "0", + "beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", + "source": { + "epoch": "0", + "root": "0x736f75726365726f6f7431000000000000000000000000000000000000000000" + }, + "target": { + "epoch": "0", + "root": "0x746172676574726f6f7432000000000000000000000000000000000000000000" + } + } + } +]` + // signature is invalid + invalidAttElectra = `[ + { + "aggregation_bits": "0x03", + "committee_bits": "0x0100000000000000", + "signature": "0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "data": { + "slot": "0", + "index": "0", + "beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", + "source": { + "epoch": "0", + "root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2" + }, + "target": { + "epoch": "0", + "root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2" + } + } + } ]` exit1 = `{ "message": { From d9cda32f98d0b51af081e6f0ff245e4648e5266c Mon Sep 17 00:00:00 2001 From: Saolyn Date: Wed, 6 Nov 2024 15:39:13 +0100 Subject: [PATCH 3/6] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1f62791968..12024afba8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - Updated the `beacon-chain/monitor` package to Electra. [PR](https://github.com/prysmaticlabs/prysm/pull/14562) - Added ListAttestationsV2 endpoint. - Add ability to rollback node's internal state during processing. +- Added SubmitAttestationsV2 endpoint. ### Changed From c526fceb6edb3dad7e53ddf25bc8e268bd897a1a Mon Sep 17 00:00:00 2001 From: Saolyn Date: Wed, 6 Nov 2024 16:26:45 +0100 Subject: [PATCH 4/6] small fixes --- api/server/structs/endpoints_beacon.go | 4 --- beacon-chain/rpc/endpoints.go | 10 ++++++ beacon-chain/rpc/endpoints_test.go | 2 +- beacon-chain/rpc/eth/beacon/handlers_pool.go | 35 ++------------------ 4 files changed, 14 insertions(+), 37 deletions(-) diff --git a/api/server/structs/endpoints_beacon.go b/api/server/structs/endpoints_beacon.go index 9fc51c9ac8c..61b96f7a582 100644 --- a/api/server/structs/endpoints_beacon.go +++ b/api/server/structs/endpoints_beacon.go @@ -25,10 +25,6 @@ type ListAttestationsResponse struct { Data json.RawMessage `json:"data"` } -//type SubmitAttestationsRequest struct { -// Data []*Attestation `json:"data"` -//} - type SubmitAttestationsRequest struct { Data json.RawMessage `json:"data"` } diff --git a/beacon-chain/rpc/endpoints.go b/beacon-chain/rpc/endpoints.go index cdd46f11cec..668122fcc5a 100644 --- a/beacon-chain/rpc/endpoints.go +++ b/beacon-chain/rpc/endpoints.go @@ -650,6 +650,16 @@ func (s *Service) beaconEndpoints( handler: server.SubmitAttestations, methods: []string{http.MethodPost}, }, + { + template: "/eth/v2/beacon/pool/attestations", + name: namespace + ".SubmitAttestationsV2", + middleware: []middleware.Middleware{ + middleware.ContentTypeHandler([]string{api.JsonMediaType}), + middleware.AcceptHeaderHandler([]string{api.JsonMediaType}), + }, + handler: server.SubmitAttestationsV2, + methods: []string{http.MethodPost}, + }, { template: "/eth/v1/beacon/pool/voluntary_exits", name: namespace + ".ListVoluntaryExits", diff --git a/beacon-chain/rpc/endpoints_test.go b/beacon-chain/rpc/endpoints_test.go index 3270327ff53..85925ad4c54 100644 --- a/beacon-chain/rpc/endpoints_test.go +++ b/beacon-chain/rpc/endpoints_test.go @@ -41,7 +41,7 @@ func Test_endpoints(t *testing.T) { "/eth/v1/beacon/deposit_snapshot": {http.MethodGet}, "/eth/v1/beacon/blinded_blocks/{block_id}": {http.MethodGet}, "/eth/v1/beacon/pool/attestations": {http.MethodGet, http.MethodPost}, - "/eth/v2/beacon/pool/attestations": {http.MethodGet}, + "/eth/v2/beacon/pool/attestations": {http.MethodGet, http.MethodPost}, "/eth/v1/beacon/pool/attester_slashings": {http.MethodGet, http.MethodPost}, "/eth/v2/beacon/pool/attester_slashings": {http.MethodGet, http.MethodPost}, "/eth/v1/beacon/pool/proposer_slashings": {http.MethodGet, http.MethodPost}, diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index 8a7e8c472d6..8b5c01642dc 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -195,20 +195,9 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { return } - // Attempt to unmarshal Data into a slice of attestations var sourceAttestations []*structs.Attestation if err := json.Unmarshal(req.Data, &sourceAttestations); err != nil { - // If that fails, try unmarshaling into a single attestation - var singleAttestation *structs.Attestation - if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { - httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) - return - } - sourceAttestations = append(sourceAttestations, singleAttestation) - } - - if len(sourceAttestations) == 0 { - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + httputil.HandleError(w, fmt.Sprintf("Failed to unmarshal request: %v", err), http.StatusInternalServerError) return } @@ -329,16 +318,7 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { if v >= version.Electra { var sourceAttestations []*structs.AttestationElectra if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { - // If that fails, try unmarshaling into a single attestation - var singleAttestation *structs.AttestationElectra - if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { - httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) - return - } - sourceAttestations = append(sourceAttestations, singleAttestation) - } - if len(sourceAttestations) == 0 { - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + httputil.HandleError(w, fmt.Sprintf("Failed to unmarshal request: %v", err), http.StatusInternalServerError) return } @@ -399,16 +379,7 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { } else { var sourceAttestations []*structs.Attestation if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { - // If that fails, try unmarshaling into a single attestation - var singleAttestation *structs.Attestation - if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { - httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) - return - } - sourceAttestations = append(sourceAttestations, singleAttestation) - } - if len(sourceAttestations) == 0 { - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + httputil.HandleError(w, fmt.Sprintf("Failed to unmarshal request: %v", err), http.StatusInternalServerError) return } From d01183809ba5ebcaedc1d5f423e74e46c676b13e Mon Sep 17 00:00:00 2001 From: Saolyn Date: Wed, 6 Nov 2024 16:39:35 +0100 Subject: [PATCH 5/6] fix tests --- beacon-chain/rpc/eth/beacon/handlers_pool.go | 33 ++++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index 8b5c01642dc..9ea961eb2ac 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -197,7 +197,16 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { var sourceAttestations []*structs.Attestation if err := json.Unmarshal(req.Data, &sourceAttestations); err != nil { - httputil.HandleError(w, fmt.Sprintf("Failed to unmarshal request: %v", err), http.StatusInternalServerError) + var singleAttestation *structs.Attestation + if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { + httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) + return + } + sourceAttestations = append(sourceAttestations, singleAttestation) + } + + if len(sourceAttestations) == 0 { + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) return } @@ -318,7 +327,16 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { if v >= version.Electra { var sourceAttestations []*structs.AttestationElectra if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { - httputil.HandleError(w, fmt.Sprintf("Failed to unmarshal request: %v", err), http.StatusInternalServerError) + // If that fails, try unmarshaling into a single attestation + var singleAttestation *structs.AttestationElectra + if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { + httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) + return + } + sourceAttestations = append(sourceAttestations, singleAttestation) + } + if len(sourceAttestations) == 0 { + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) return } @@ -379,7 +397,16 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { } else { var sourceAttestations []*structs.Attestation if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { - httputil.HandleError(w, fmt.Sprintf("Failed to unmarshal request: %v", err), http.StatusInternalServerError) + // If that fails, try unmarshaling into a single attestation + var singleAttestation *structs.Attestation + if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { + httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) + return + } + sourceAttestations = append(sourceAttestations, singleAttestation) + } + if len(sourceAttestations) == 0 { + httputil.HandleError(w, "No data submitted", http.StatusBadRequest) return } From 32c97e9309d38d5e1d8890de31dbacfe8d473ef1 Mon Sep 17 00:00:00 2001 From: Saolyn Date: Wed, 6 Nov 2024 17:39:16 +0100 Subject: [PATCH 6/6] simplify functions + remove duplication --- beacon-chain/rpc/eth/beacon/handlers_pool.go | 363 ++++++++----------- 1 file changed, 153 insertions(+), 210 deletions(-) diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index 9ea961eb2ac..6b10c66d73f 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -190,28 +190,108 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest) return } - if len(req.Data) == 0 { - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) + + attFailures, failedBroadcasts, err := s.handleAttestations(ctx, req.Data) + if err != nil { + httputil.HandleError(w, err.Error(), http.StatusBadRequest) return } - var sourceAttestations []*structs.Attestation - if err := json.Unmarshal(req.Data, &sourceAttestations); err != nil { - var singleAttestation *structs.Attestation - if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { - httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) - return + if len(failedBroadcasts) > 0 { + httputil.HandleError( + w, + fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")), + http.StatusInternalServerError, + ) + return + } + + if len(attFailures) > 0 { + failuresErr := &server.IndexedVerificationFailureError{ + Code: http.StatusBadRequest, + Message: "One or more attestations failed validation", + Failures: attFailures, } - sourceAttestations = append(sourceAttestations, singleAttestation) + httputil.WriteError(w, failuresErr) } +} - if len(sourceAttestations) == 0 { +// SubmitAttestationsV2 submits an attestation object to node. If the attestation passes all validation +// constraints, node MUST publish the attestation on an appropriate subnet. +func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { + ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2") + defer span.End() + + versionHeader := r.Header.Get(api.VersionHeader) + if versionHeader == "" { + httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest) + return + } + v, err := version.FromString(versionHeader) + if err != nil { + httputil.HandleError(w, "Invalid version: "+err.Error(), http.StatusBadRequest) + return + } + + var req structs.SubmitAttestationsRequest + err = json.NewDecoder(r.Body).Decode(&req.Data) + switch { + case errors.Is(err, io.EOF): httputil.HandleError(w, "No data submitted", http.StatusBadRequest) return + case err != nil: + httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest) + return } - var validAttestations []*eth.Attestation var attFailures []*server.IndexedVerificationFailure + var failedBroadcasts []string + + if v >= version.Electra { + attFailures, failedBroadcasts, err = s.handleAttestationsElectra(ctx, req.Data) + } else { + attFailures, failedBroadcasts, err = s.handleAttestations(ctx, req.Data) + } + if err != nil { + httputil.HandleError(w, err.Error(), http.StatusBadRequest) + return + } + + if len(failedBroadcasts) > 0 { + httputil.HandleError( + w, + fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")), + http.StatusInternalServerError, + ) + return + } + + if len(attFailures) > 0 { + failuresErr := &server.IndexedVerificationFailureError{ + Code: http.StatusBadRequest, + Message: "One or more attestations failed validation", + Failures: attFailures, + } + httputil.WriteError(w, failuresErr) + } +} + +func (s *Server) handleAttestationsElectra(ctx context.Context, data json.RawMessage) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) { + var sourceAttestations []*structs.AttestationElectra + + if err = json.Unmarshal(data, &sourceAttestations); err != nil { + var singleAttestation *structs.AttestationElectra + if err = json.Unmarshal(data, &singleAttestation); err != nil { + return nil, nil, errors.New("Failed to unmarshal attestation") + } + sourceAttestations = append(sourceAttestations, singleAttestation) + } + + if len(sourceAttestations) == 0 { + return nil, nil, errors.New("No data submitted") + } + + var validAttestations []*eth.AttestationElectra for i, sourceAtt := range sourceAttestations { att, err := sourceAtt.ToConsensus() if err != nil { @@ -228,7 +308,10 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { }) continue } + validAttestations = append(validAttestations, att) + } + for i, att := range validAttestations { // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. // Note we can't send for aggregated att because we don't have selection proof. @@ -241,20 +324,14 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { }) } - validAttestations = append(validAttestations, att) - } - - failedBroadcasts := make([]string, 0) - for i, att := range validAttestations { - // Determine subnet to broadcast attestation to wantedEpoch := slots.ToEpoch(att.Data.Slot) vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { - httputil.HandleError(w, "Could not get head validator indices: "+err.Error(), http.StatusInternalServerError) - return + failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + continue } - subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) + subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { log.WithError(err).Errorf("could not broadcast attestation at index %d", i) failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) @@ -271,218 +348,84 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { } } } - if len(failedBroadcasts) > 0 { - httputil.HandleError( - w, - fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")), - http.StatusInternalServerError, - ) - return - } - if len(attFailures) > 0 { - failuresErr := &server.IndexedVerificationFailureError{ - Code: http.StatusBadRequest, - Message: "One or more attestations failed validation", - Failures: attFailures, - } - httputil.WriteError(w, failuresErr) - } + return attFailures, failedBroadcasts, nil } -// SubmitAttestationsV2 submits an attestation object to node. If the attestation passes all validation -// constraints, node MUST publish the attestation on an appropriate subnet. -func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { - ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2") - defer span.End() +func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) { + var sourceAttestations []*structs.Attestation - var req structs.SubmitAttestationsRequest - err := json.NewDecoder(r.Body).Decode(&req.Data) - switch { - case errors.Is(err, io.EOF): - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) - return - case err != nil: - httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest) - return - } - if len(req.Data) == 0 { - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) - return + if err = json.Unmarshal(data, &sourceAttestations); err != nil { + var singleAttestation *structs.Attestation + if err = json.Unmarshal(data, &singleAttestation); err != nil { + return nil, nil, errors.New("Failed to unmarshal attestation") + } + sourceAttestations = append(sourceAttestations, singleAttestation) } - versionHeader := r.Header.Get(api.VersionHeader) - if versionHeader == "" { - httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest) - } - v, err := version.FromString(versionHeader) - if err != nil { - httputil.HandleError(w, "Invalid version: "+err.Error(), http.StatusBadRequest) - return + if len(sourceAttestations) == 0 { + return nil, nil, errors.New("No data submitted") } - var attFailures []*server.IndexedVerificationFailure - failedBroadcasts := make([]string, 0) - - if v >= version.Electra { - var sourceAttestations []*structs.AttestationElectra - if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { - // If that fails, try unmarshaling into a single attestation - var singleAttestation *structs.AttestationElectra - if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { - httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) - return - } - sourceAttestations = append(sourceAttestations, singleAttestation) + var validAttestations []*eth.Attestation + for i, sourceAtt := range sourceAttestations { + att, err := sourceAtt.ToConsensus() + if err != nil { + attFailures = append(attFailures, &server.IndexedVerificationFailure{ + Index: i, + Message: "Could not convert request attestation to consensus attestation: " + err.Error(), + }) + continue } - if len(sourceAttestations) == 0 { - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) - return + if _, err = bls.SignatureFromBytes(att.Signature); err != nil { + attFailures = append(attFailures, &server.IndexedVerificationFailure{ + Index: i, + Message: "Incorrect attestation signature: " + err.Error(), + }) + continue } + validAttestations = append(validAttestations, att) + } - var validAttestations []*eth.AttestationElectra - for i, sourceAtt := range sourceAttestations { - att, err := sourceAtt.ToConsensus() - if err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ - Index: i, - Message: "Could not convert request attestation to consensus attestation: " + err.Error(), - }) - continue - } - if _, err = bls.SignatureFromBytes(att.Signature); err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ - Index: i, - Message: "Incorrect attestation signature: " + err.Error(), - }) - continue - } - validAttestations = append(validAttestations, att) + for i, att := range validAttestations { + // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node + // of a received unaggregated attestation. + // Note we can't send for aggregated att because we don't have selection proof. + if !corehelpers.IsAggregated(att) { + s.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.UnaggregatedAttReceived, + Data: &operation.UnAggregatedAttReceivedData{ + Attestation: att, + }, + }) } - for i, att := range validAttestations { - if !corehelpers.IsAggregated(att) { - s.OperationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{ - Attestation: att, - }, - }) - } - - wantedEpoch := slots.ToEpoch(att.Data.Slot) - vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) - if err != nil { - httputil.HandleError(w, "Could not get head validator indices: "+err.Error(), http.StatusInternalServerError) - return - } - subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) - - if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { - log.WithError(err).Errorf("could not broadcast attestation at index %d", i) - failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) - continue - } - - if corehelpers.IsAggregated(att) { - if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil { - log.WithError(err).Error("could not save aggregated attestation") - } - } else { - if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { - log.WithError(err).Error("could not save unaggregated attestation") - } - } - } - } else { - var sourceAttestations []*structs.Attestation - if err = json.Unmarshal(req.Data, &sourceAttestations); err != nil { - // If that fails, try unmarshaling into a single attestation - var singleAttestation *structs.Attestation - if err := json.Unmarshal(req.Data, &singleAttestation); err != nil { - httputil.HandleError(w, "Could not parse data into attestations: "+err.Error(), http.StatusBadRequest) - return - } - sourceAttestations = append(sourceAttestations, singleAttestation) - } - if len(sourceAttestations) == 0 { - httputil.HandleError(w, "No data submitted", http.StatusBadRequest) - return + wantedEpoch := slots.ToEpoch(att.Data.Slot) + vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) + if err != nil { + failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + continue } - var validAttestations []*eth.Attestation - for i, sourceAtt := range sourceAttestations { - att, err := sourceAtt.ToConsensus() - if err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ - Index: i, - Message: "Could not convert request attestation to consensus attestation: " + err.Error(), - }) - continue - } - if _, err = bls.SignatureFromBytes(att.Signature); err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ - Index: i, - Message: "Incorrect attestation signature: " + err.Error(), - }) - continue - } - validAttestations = append(validAttestations, att) + subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) + if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { + log.WithError(err).Errorf("could not broadcast attestation at index %d", i) + failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + continue } - for i, att := range validAttestations { - if !corehelpers.IsAggregated(att) { - s.OperationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{ - Attestation: att, - }, - }) - } - - wantedEpoch := slots.ToEpoch(att.Data.Slot) - vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) - if err != nil { - httputil.HandleError(w, "Could not get head validator indices: "+err.Error(), http.StatusInternalServerError) - return - } - subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) - - if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { - log.WithError(err).Errorf("could not broadcast attestation at index %d", i) - failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) - continue + if corehelpers.IsAggregated(att) { + if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil { + log.WithError(err).Error("could not save aggregated attestation") } - - if corehelpers.IsAggregated(att) { - if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil { - log.WithError(err).Error("could not save aggregated attestation") - } - } else { - if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { - log.WithError(err).Error("could not save unaggregated attestation") - } + } else { + if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { + log.WithError(err).Error("could not save unaggregated attestation") } } } - if len(failedBroadcasts) > 0 { - httputil.HandleError( - w, - fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")), - http.StatusInternalServerError, - ) - return - } - - if len(attFailures) > 0 { - failuresErr := &server.IndexedVerificationFailureError{ - Code: http.StatusBadRequest, - Message: "One or more attestations failed validation", - Failures: attFailures, - } - httputil.WriteError(w, failuresErr) - } + return attFailures, failedBroadcasts, nil } // ListVoluntaryExits retrieves voluntary exits known by the node but