Skip to content

Commit

Permalink
Implement get active validators (#12)
Browse files Browse the repository at this point in the history
* Implement get active validators

---------

Co-authored-by: Marketen <marcfont12@gmail.com>
  • Loading branch information
pablomendezroyo and Marketen authored May 10, 2024
1 parent a24c936 commit 1c06eb4
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 50 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ MONGO_DB_NAME=
MONGO_DB_API_PORT=
API_PORT=
LOG_LEVEL=
BEACON_NODE_URL=
BYPASS_VALIDATORS_FILTERING=
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
MONGO_DB_NAME: "${MONGO_DB_NAME}"
API_PORT: "${API_PORT}"
LOG_LEVEL: "${LOG_LEVEL}"
BYPASS_VALIDATORS_FILTERING: "${BYPASS_VALIDATORS_FILTERING}"
depends_on:
- mongo
container_name: listener
Expand Down
2 changes: 2 additions & 0 deletions listener/cmd/listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func main() {
s := api.NewApi(
config.Port,
config.MongoDBURI,
config.BeaconNodeURLs,
config.BypassValidatorsFiltering,
)

s.Start()
Expand Down
18 changes: 11 additions & 7 deletions listener/internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
)

type httpApi struct {
server *http.Server
port string
dbUri string
server *http.Server
port string
dbUri string
beaconNodeUrls map[string]string
bypassValidatorFiltering bool
}

// create a new api instance
func NewApi(port string, mongoDbUri string) *httpApi {
func NewApi(port string, mongoDbUri string, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) *httpApi {
return &httpApi{
port: port,
dbUri: mongoDbUri,
port: port,
dbUri: mongoDbUri,
beaconNodeUrls: beaconNodeUrls,
bypassValidatorFiltering: bypassValidatorFiltering,
}
}

Expand Down Expand Up @@ -47,7 +51,7 @@ func (s *httpApi) Start() {
// setup the http api
s.server = &http.Server{
Addr: ":" + s.port,
Handler: routes.SetupRouter(dbCollection),
Handler: routes.SetupRouter(dbCollection, s.beaconNodeUrls, s.bypassValidatorFiltering),
}

// start the api
Expand Down
60 changes: 34 additions & 26 deletions listener/internal/api/handlers/postNewSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type signatureRequest struct {
Payload string `json:"payload"`
Signature string `json:"signature"`
Network string `json:"network"`
Label string `json:"label"`
Tag string `json:"tag"`
}

// decodeAndValidateRequests decodes and validates incoming HTTP requests.
Expand All @@ -31,7 +31,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded

var validRequests []types.SignatureRequestDecoded
for _, req := range requests {
if req.Network == "" || req.Label == "" || req.Signature == "" || req.Payload == "" {
if req.Network == "" || req.Tag == "" || req.Signature == "" || req.Payload == "" {
logger.Debug("Skipping invalid signature from request, missing fields")
continue
}
Expand All @@ -55,7 +55,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded
Payload: req.Payload,
Signature: req.Signature,
Network: req.Network,
Label: req.Label,
Tag: req.Tag,
})
} else {
logger.Debug("Skipping invalid signature from request, invalid payload format")
Expand All @@ -65,9 +65,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded
return validRequests, nil
}

func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection *mongo.Collection, wg *sync.WaitGroup) {
defer wg.Done()

func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection *mongo.Collection) {
isValidSignature, err := validation.IsValidSignature(req)
if err != nil {
logger.Error("Failed to validate signature: " + err.Error())
Expand All @@ -85,7 +83,7 @@ func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection
"pubkey": req.DecodedPayload.Pubkey,
"signature": req.Signature,
"network": req.Network,
"label": req.Label,
"tag": req.Tag,
})
if err != nil {
logger.Error("Failed to insert signature into MongoDB: " + err.Error())
Expand All @@ -99,7 +97,7 @@ func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection
// 1. Decode and validate
// 2. Get active validators
// 3. Validate signature and insert into MongoDB
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) {
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) {
logger.Debug("Received new POST '/newSignature' request")

// Decode and validate incoming requests
Expand All @@ -111,31 +109,41 @@ func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mong
}
// Respond with an error if no valid requests were found
if len(validRequests) == 0 {
logger.Error("No valid requests")
respondError(w, http.StatusBadRequest, "No valid requests")
return
}

// Get active validators
requestsWithActiveValidators, err := validation.GetActiveValidators(validRequests)
if err != nil {
respondError(w, http.StatusInternalServerError, "Failed to validate active validators")
return
}
// Respond with an error if no active validators were found
if len(requestsWithActiveValidators) == 0 {
respondError(w, http.StatusInternalServerError, "No active validators found in request")
return
}

var wg sync.WaitGroup
// Insert into MongoDB if signature is valid
for _, req := range requestsWithActiveValidators {
// create a goroutine for each request
appendMutex := new(sync.Mutex) // Mutex for appending to the slice
dbMutex := new(sync.Mutex) // Mutex for database operations
allValidatedRequests := []types.SignatureRequestDecoded{} // This will collect all valid requests

// Iterate over all beacon nodes and get active validators
for _, url := range beaconNodeUrls {
wg.Add(1)
go validateAndInsertSignature(req, dbCollection, &wg)
go func(url string) {
defer wg.Done()
activeValidators := validation.GetActiveValidators(validRequests, url, bypassValidatorFiltering)
if len(activeValidators) == 0 {
return
}

appendMutex.Lock()
allValidatedRequests = append(allValidatedRequests, activeValidators...) // Only one goroutine can append to the slice at a time
appendMutex.Unlock()

for _, req := range activeValidators {
dbMutex.Lock()
validateAndInsertSignature(req, dbCollection) // Do we really need to lock the db insertions?
dbMutex.Unlock()
}
}(url) // Pass the URL to the goroutine
}
// Wait for all goroutines to finish

wg.Wait()
if len(allValidatedRequests) == 0 {
respondError(w, http.StatusInternalServerError, "No active validators found in any network")
return
}
respondOK(w, "Finished processing signatures")
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCol
},
{
"$group": bson.M{
"_id": bson.M{"pubkey": "$pubkey", "network": "$network", "label": "$label"},
"_id": bson.M{"pubkey": "$pubkey", "network": "$network", "tag": "$tag"},
"signatures": bson.M{
"$push": bson.M{
"signature": "$signature",
Expand All @@ -64,7 +64,7 @@ func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCol
"_id": 0,
"pubkey": "$_id.pubkey",
"network": "$_id.network",
"label": "$_id.label",
"tag": "$_id.tag",
"signatures": 1,
},
},
Expand Down
4 changes: 2 additions & 2 deletions listener/internal/api/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"go.mongodb.org/mongo-driver/mongo"
)

func SetupRouter(dbCollection *mongo.Collection) *mux.Router {
func SetupRouter(dbCollection *mongo.Collection, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) *mux.Router {
r := mux.NewRouter()

// Define routes
r.HandleFunc("/", handlers.GetHealthCheck).Methods(http.MethodGet)
// closure function to inject dbCollection into the handler
r.HandleFunc("/newSignature", func(w http.ResponseWriter, r *http.Request) {
handlers.PostNewSignature(w, r, dbCollection)
handlers.PostNewSignature(w, r, dbCollection, beaconNodeUrls, bypassValidatorFiltering)
}).Methods(http.MethodPost)
r.HandleFunc("/signaturesByValidator", func(w http.ResponseWriter, r *http.Request) {
handlers.PostSignaturesByValidator(w, r, dbCollection)
Expand Down
25 changes: 24 additions & 1 deletion listener/internal/api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,34 @@ type SignatureRequestDecoded struct {
Payload string `json:"payload"`
Signature string `json:"signature"`
Network string `json:"network"`
Label string `json:"label"`
Tag string `json:"tag"`
}

type DecodedPayload struct {
Platform string `json:"platform"`
Timestamp string `json:"timestamp"`
Pubkey string `json:"pubkey"`
}

type ActiveValidator struct {
Pubkey string `json:"pubkey"`
WithdrawalCredentials string `json:"withdrawal_credentials"`
EffectiveBalance string `json:"effective_balance"`
Slashed bool `json:"slashed"`
ActivationEligibilityEpoch string `json:"activation_eligibility_epoch"`
ActivationEpoch string `json:"activation_epoch"`
ExitEpoch string `json:"exit_epoch"`
WithdrawableEpoch string `json:"withdrawable_epoch"`
}

// https://ethereum.github.io/beacon-APIs/#/Beacon /eth/v1/beacon/states/{state_id}/validators
type ActiveValidatorsApiResponse struct {
ExecutionOptimistic bool `json:"execution_optimistic"`
Finalized bool `json:"finalized"`
Data []struct {
Index string `json:"index"`
Balance string `json:"balance"`
Status string `json:"status"`
Validator ActiveValidator `json:"validator"`
} `json:"data"`
}
92 changes: 85 additions & 7 deletions listener/internal/api/validation/GetActiveValidators.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,90 @@
package validation

import "github.com/dappnode/validator-monitoring/listener/internal/api/types"
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"

// ValidatePubkeysWithConsensusClient checks if the given pubkeys from the requests are from active validators
// or not by making SINGLE API call to the consensus client. It returns an array of the active validators pubkeys.
func GetActiveValidators(requestsDecoded []types.SignatureRequestDecoded) ([]types.SignatureRequestDecoded, error) {
requestsActiveValidators := requestsDecoded
// make api call: GET /eth/v1/beacon/states/{state_id}/validators?id=validator_pubkey1,validator_pubkey2,validator_pubkey3
"github.com/dappnode/validator-monitoring/listener/internal/api/types"
)

return requestsActiveValidators, nil
// GetActiveValidators checks the active status of validators from a specific beacon node.
// If bypass is true, it simply returns all decoded requests.
func GetActiveValidators(requestsDecoded []types.SignatureRequestDecoded, beaconNodeUrl string, bypass bool) []types.SignatureRequestDecoded {

if len(requestsDecoded) == 0 {
fmt.Println("no requests to process")
return nil
}

ids := make([]string, 0, len(requestsDecoded))
for _, req := range requestsDecoded {
ids = append(ids, req.DecodedPayload.Pubkey)
}

if len(ids) == 0 {
fmt.Println("no valid public keys for network ", beaconNodeUrl, " to query")
return nil
}

// Serialize the request body to JSON
// See https://ethereum.github.io/beacon-APIs/#/Beacon/postStateValidators
jsonData, err := json.Marshal(struct {
Ids []string `json:"ids"`
Statuses []string `json:"statuses"`
}{
Ids: ids,
Statuses: []string{"active_ongoing"}, // Only interested in currently active validators
})
if err != nil {
fmt.Printf("error marshaling request data: %v\n", err)
return nil
}

// Create HTTP client with timeout
client := &http.Client{Timeout: 50 * time.Second}
apiUrl := fmt.Sprintf("%s/eth/v1/beacon/states/head/validators", beaconNodeUrl)

// Make API call
resp, err := client.Post(apiUrl, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
fmt.Printf("error making API call to %s: %v\n", apiUrl, err)
return nil
}
defer resp.Body.Close()

// Check the HTTP response status before reading the body
if resp.StatusCode != http.StatusOK {
fmt.Printf("unexpected response status: %d\n", resp.StatusCode)
return nil
}

// Decode the API response directly into the ApiResponse struct
var apiResponse types.ActiveValidatorsApiResponse
if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
fmt.Printf("error decoding response data: %v\n", err)
return nil
}

// Use a map to quickly lookup active validators
activeValidatorMap := make(map[string]bool)
for _, validator := range apiResponse.Data {
activeValidatorMap[validator.Validator.Pubkey] = true
}

// Filter the list of decoded requests to include only those that are active
var activeValidators []types.SignatureRequestDecoded
for _, req := range requestsDecoded {
if _, isActive := activeValidatorMap[req.DecodedPayload.Pubkey]; isActive {
activeValidators = append(activeValidators, req)
}
}

if bypass {
return requestsDecoded // do not return the filtered list
} else {
return activeValidators // return the filtered list (default behaviour)
}
}
44 changes: 44 additions & 0 deletions listener/internal/api/validation/GetActiveValidators_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package validation

import (
"testing"

"github.com/dappnode/validator-monitoring/listener/internal/api/types"
)

func TestGetActiveValidators(t *testing.T) {
// Setup the input data
beaconNodeUrls := map[string]string{
"holesky": "https://holeskyvals.53650f79ab75c6ff.dyndns.dappnode.io",
}

requestsDecoded := []types.SignatureRequestDecoded{
{
Network: "holesky",
DecodedPayload: types.DecodedPayload{
Pubkey: "0xa685beb5a1f317f5a01ecd6dade42113aad945b2ab53fb1b356334ab441323e538feadd2889894b17f8fa2babe1989ca",
},
},
{
Network: "holesky",
DecodedPayload: types.DecodedPayload{
Pubkey: "0xab31efdd97f32087e96d3262f6fb84a4480411d391689be0dfc931fd8a5c16c3f51f10b127040b1cb65eb955f2b78a63",
},
},
{
Network: "holesky",
DecodedPayload: types.DecodedPayload{
Pubkey: "0xa24a030d7d8ca3c5e1f5824760d0f4157a7a89bcca6414377cca97e6e63445bef0e1b63761ee35a0fc46bb317e31b34b",
},
},
}

// Call the function. "bypass" is set to false, so the function will do expected behaviour and filter out inactive validators
result := GetActiveValidators(requestsDecoded, beaconNodeUrls["holesky"], false)

// You may need to mock the server's response or adjust the expected values here according to your actual setup
expectedNumValidators := 3 // This should match the number of mock validators that are "active"
if len(result) != expectedNumValidators {
t.Errorf("Expected %d active validators, got %d", expectedNumValidators, len(result))
}
}
Loading

0 comments on commit 1c06eb4

Please sign in to comment.