Skip to content

Commit

Permalink
Return to frontend raw data (#390)
Browse files Browse the repository at this point in the history
* return to frontend raw data

* fix type export

* use single map

* fix tests

* fix trackValidatorsPerformance test

* rename to fetchAndInsertEpochValidatorsData

* write null if undefined

* store db initialized

* use keys with type of number instead of string in maps
  • Loading branch information
pablomendezroyo authored Oct 11, 2024
1 parent 58db880 commit 725e61b
Show file tree
Hide file tree
Showing 31 changed files with 404 additions and 1,428 deletions.
21 changes: 5 additions & 16 deletions packages/brain/src/calls/fetchValidatorsPerformanceData.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,23 @@
import { fetchAndProcessValidatorsData } from "../modules/validatorsDataIngest/index.js";
import { minGenesisTime, secondsPerSlot } from "../index.js";
import type {
ValidatorsDataProcessed,
Granularity,
NumberOfDaysToQuery
} from "../modules/validatorsDataIngest/types.js";
import { PostgresClient } from "../modules/apiClients/index.js";
import type { NumberOfDaysToQuery } from "../modules/validatorsDataIngest/types.js";
import type { EpochsValidatorsMap } from "../modules/apiClients/postgres/types.js";

export async function fetchValidatorsPerformanceData({
postgresClient,
validatorIndexes,
numberOfDaysToQuery,
granularity
numberOfDaysToQuery
}: {
postgresClient: PostgresClient;
validatorIndexes: string[];
numberOfDaysToQuery?: NumberOfDaysToQuery;
granularity?: Granularity;
}): Promise<
Map<
number, // validatorIndex
ValidatorsDataProcessed // processed data of the validator
>
> {
}): Promise<EpochsValidatorsMap> {
return await fetchAndProcessValidatorsData({
validatorIndexes,
postgresClient,
minGenesisTime,
secondsPerSlot,
numberOfDaysToQuery,
granularity
numberOfDaysToQuery
});
}
43 changes: 22 additions & 21 deletions packages/brain/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
BlockExplorerApi,
ValidatorApi,
DappnodeSignatureVerifier,
DappmanagerApi
DappmanagerApi,
PostgresClient
} from "./modules/apiClients/index.js";
import { startUiServer, startLaunchpadApi } from "./modules/apiServers/index.js";
import * as dotenv from "dotenv";
Expand All @@ -17,7 +18,8 @@ import {
CronJob,
reloadValidators,
// trackValidatorsPerformanceCron,
sendProofsOfValidation
sendProofsOfValidation,
trackValidatorsPerformanceCron
} from "./modules/cron/index.js";
// import { PostgresClient } from "./modules/apiClients/index.js";
import { brainConfig } from "./modules/config/index.js";
Expand Down Expand Up @@ -84,8 +86,7 @@ export const brainDb = new BrainDataBase(
);

// Create postgres client
// export const postgresClient = new PostgresClient(postgresUrl);
// await postgresClient.initialize().catch((err) => logger.error(`Error initializing table in postgres db`, err)); // TODO: handle error. Consider attempting to initialize on every cron iteration
export const postgresClient = new PostgresClient(postgresUrl);

// Start server APIs
const uiServer = startUiServer(path.resolve(__dirname, params.uiBuildDirName), network);
Expand All @@ -105,30 +106,30 @@ const proofOfValidationCron = new CronJob(shareCronInterval, () =>
proofOfValidationCron.start();

// execute the performance cron task every 1/4 of an epoch
// export const trackValidatorsPerformanceCronTask = new CronJob(
// ((slotsPerEpoch * secondsPerSlot) / 4) * 1000,
// async () => {
// await trackValidatorsPerformanceCron({
// brainDb,
// postgresClient,
// beaconchainApi,
// executionClient,
// consensusClient,
// dappmanagerApi,
// sendNotification: true
// });
// }
// );
// trackValidatorsPerformanceCronTask.start();
export const trackValidatorsPerformanceCronTask = new CronJob(
((slotsPerEpoch * secondsPerSlot) / 4) * 1000,
async () => {
await trackValidatorsPerformanceCron({
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient,
dappmanagerApi,
sendNotification: true
});
}
);
trackValidatorsPerformanceCronTask.start();

// Graceful shutdown
function handle(signal: string): void {
logger.info(`${signal} received. Shutting down...`);
reloadValidatorsCron.stop();
proofOfValidationCron.stop();
// trackValidatorsPerformanceCronTask.stop();
trackValidatorsPerformanceCronTask.stop();
brainDb.close();
// postgresClient.close().catch((err) => logger.error(`Error closing postgres client`, err)); // postgresClient db connection is the only external resource that needs to be closed
postgresClient.close().catch((err) => logger.error(`Error closing postgres client`, err)); // postgresClient db connection is the only external resource that needs to be closed
uiServer.close();
launchpadServer.close();
logger.debug(`Stopped all cron jobs and closed all connections.`);
Expand Down
208 changes: 56 additions & 152 deletions packages/brain/src/modules/apiClients/postgres/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import postgres from "postgres";
import logger from "../../logger/index.js";
import { BlockProposalStatus, Columns, ValidatorPerformance, ValidatorPerformancePostgres } from "./types.js";
import { ConsensusClient, ExecutionClient } from "@stakingbrain/common";
import { EpochsValidatorsMap, DataPerEpoch, Columns, ValidatorsDataPerEpochMap, PostgresDataRow } from "./types.js";

export class PostgresClient {
private readonly tableName = "validators_performance";
private readonly BLOCK_PROPOSAL_STATUS = "BLOCK_PROPOSAL_STATUS";
private readonly EXECUTION_CLIENT = "EXECUTION_CLIENT";
private readonly CONSENSUS_CLIENT = "CONSENSUS_CLIENT";
private readonly tableName = "epochs_data";
private sql: postgres.Sql;

/**
Expand Down Expand Up @@ -37,199 +33,107 @@ SELECT pg_total_relation_size('${this.tableName}');
}

/**
* Initializes the database by creating the required table if it does not exist with the required columns.
* Initialize the database table if it doesn't exist.
*/
public async initialize() {
// important: enum create types must be broken into separate conditional checks for each ENUM type before trying to create it.
// Check and create BLOCK_PROPOSAL_STATUS ENUM type if not exists
await this.sql.unsafe(`
DO $$
BEGIN
CREATE TYPE ${this.BLOCK_PROPOSAL_STATUS} AS ENUM('${BlockProposalStatus.Missed}', '${BlockProposalStatus.Proposed}', '${BlockProposalStatus.Unchosen}');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
`);
// Check and create EXECUTION_CLIENT ENUM type if not exists
await this.sql.unsafe(`
DO $$
BEGIN
CREATE TYPE ${this.EXECUTION_CLIENT} AS ENUM('${ExecutionClient.Besu}', '${ExecutionClient.Nethermind}', '${ExecutionClient.Geth}', '${ExecutionClient.Reth}', '${ExecutionClient.Erigon}', '${ExecutionClient.Unknown}');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
`);

// Check and create CONSENSUS_CLIENT ENUM type if not exists
await this.sql.unsafe(`
DO $$
BEGIN
CREATE TYPE ${this.CONSENSUS_CLIENT} AS ENUM('${ConsensusClient.Teku}', '${ConsensusClient.Lodestar}', '${ConsensusClient.Prysm}', '${ConsensusClient.Lighthouse}', '${ConsensusClient.Nimbus}', '${ConsensusClient.Unknown}');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
`);

const query = `
-- Create the table if not exists
CREATE TABLE IF NOT EXISTS ${this.tableName} (
${Columns.validatorIndex} BIGINT NOT NULL,
${Columns.validatorindex} BIGINT NOT NULL,
${Columns.epoch} BIGINT NOT NULL,
${Columns.executionClient} ${this.EXECUTION_CLIENT} NOT NULL,
${Columns.consensusClient} ${this.CONSENSUS_CLIENT} NOT NULL,
${Columns.slot} BIGINT,
${Columns.liveness} BOOLEAN,
${Columns.blockProposalStatus} ${this.BLOCK_PROPOSAL_STATUS},
${Columns.syncCommitteeRewards} BIGINT,
${Columns.attestationsTotalRewards} JSONB,
${Columns.attestationsIdealRewards} JSONB,
${Columns.clients} JSONB NOT NULL,
${Columns.attestation} JSONB,
${Columns.block} JSONB,
${Columns.synccommittee} JSONB,
${Columns.slot} BIGINT NULL,
${Columns.error} JSONB NULL,
PRIMARY KEY (${Columns.validatorIndex}, ${Columns.epoch})
PRIMARY KEY (validatorIndex, epoch)
);
`;
await this.sql.unsafe(query);
logger.info("Table created or already exists.");
}

/**
* Delete database table and its enum types.
* Delete database table.
*/
public async deleteDatabaseTableAndEnumTypes() {
public async deleteDatabaseTable() {
await this.sql.unsafe(`DROP TABLE IF EXISTS ${this.tableName}`);
await this.sql.unsafe(`
DROP TYPE IF EXISTS ${this.BLOCK_PROPOSAL_STATUS};
DROP TYPE IF EXISTS ${this.EXECUTION_CLIENT};
DROP TYPE IF EXISTS ${this.CONSENSUS_CLIENT};
`);
}

/**
* Inserts the given performance data into the database. If the data already exists for the given validator index and epoch,
*
* IMPORTANT: it must be noted that the query will update if the data already exists for the given validator index and epoch.
* If the data exists without an error and the new data has an error, the error will be updated and the other fields will remain the same.
*
* @param data - The performance data to insert.
* Inserts epoch data into the database. If the data already exists for the given validator index and epoch it will be updated.
*/
public async insertPerformanceData(data: ValidatorPerformance): Promise<void> {
public async insertValidatorDataPerEpoch(
epoch: number,
validatorsDataPerEpochMap: ValidatorsDataPerEpochMap
): Promise<void> {
const query = `
INSERT INTO ${this.tableName} (${Columns.validatorIndex}, ${Columns.epoch}, ${Columns.executionClient}, ${Columns.consensusClient}, ${Columns.slot}, ${Columns.liveness}, ${Columns.blockProposalStatus}, ${Columns.syncCommitteeRewards}, ${Columns.attestationsTotalRewards}, ${Columns.attestationsIdealRewards}, ${Columns.error})
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (${Columns.validatorIndex}, ${Columns.epoch})
INSERT INTO ${this.tableName} (${Columns.validatorindex}, ${Columns.epoch}, ${Columns.clients}, ${Columns.attestation}, ${Columns.block}, ${Columns.synccommittee}, ${Columns.slot}, ${Columns.error})
VALUES ${Array.from(validatorsDataPerEpochMap.entries())
.map(([validatorIndex, data]) => {
return `(${validatorIndex}, ${epoch}, '${JSON.stringify(data.clients)}', '${JSON.stringify(data.attestation) || null}', '${JSON.stringify(data.block) || null}', '${JSON.stringify(data.synccommittee) || null}', ${data.slot || null}, '${JSON.stringify(data.error) || null}')`;
})
.join(", ")}
ON CONFLICT (${Columns.validatorindex}, ${Columns.epoch})
DO UPDATE SET
${Columns.executionClient} = EXCLUDED.${Columns.executionClient},
${Columns.consensusClient} = EXCLUDED.${Columns.consensusClient},
${Columns.slot} = EXCLUDED.${Columns.slot},
${Columns.liveness} = EXCLUDED.${Columns.liveness},
${Columns.blockProposalStatus} = EXCLUDED.${Columns.blockProposalStatus},
${Columns.syncCommitteeRewards} = EXCLUDED.${Columns.syncCommitteeRewards},
${Columns.attestationsTotalRewards} = EXCLUDED.${Columns.attestationsTotalRewards},
${Columns.attestationsIdealRewards} = EXCLUDED.${Columns.attestationsIdealRewards},
${Columns.error} = EXCLUDED.${Columns.error};
${Columns.clients} = EXCLUDED.${Columns.clients},
${Columns.attestation} = EXCLUDED.${Columns.attestation},
${Columns.block} = EXCLUDED.${Columns.block},
${Columns.synccommittee} = EXCLUDED.${Columns.synccommittee},
${Columns.slot} = EXCLUDED.${Columns.slot},
${Columns.error} = EXCLUDED.${Columns.error}
`;

// Execute the query with the appropriate parameters
await this.sql.unsafe(query, [
data.validatorIndex,
data.epoch,
data.executionClient,
data.consensusClient,
data.slot ?? null,
data.liveness ?? null,
data.blockProposalStatus ?? null,
data.syncCommitteeRewards ?? null,
data.attestationsTotalRewards ? JSON.stringify(data.attestationsTotalRewards) : null, // JSONB expects a string or null
data.attestationsIdealRewards ? JSON.stringify(data.attestationsIdealRewards) : null, // JSONB expects a string or null
data.error ? JSON.stringify(data.error) : null // JSONB expects a string or null
]);
}

/**
* Get the validators data for the given validator indexes from all epochs. In order to improve data process
* it will return a map with the validator index as key and the performance data as value.
*
* @param validatorIndexes - The indexes of the validators to get the data for.
* @returns The performance data for the given validators.
*/
public async getValidatorsDataFromAllEpochs(validatorIndexes: string[]): Promise<ValidatorPerformance[]> {
const query = `
SELECT * FROM ${this.tableName}
WHERE ${Columns.validatorIndex} = ANY($1)
`;

const result = (await this.sql.unsafe(query, [validatorIndexes])) as ValidatorPerformancePostgres[];
return result.map((row: ValidatorPerformancePostgres) => ({
validatorIndex: row.validator_index,
epoch: row.epoch,
executionClient: row.execution_client,
consensusClient: row.consensus_client,
slot: row.slot,
liveness: row.liveness,
blockProposalStatus: row.block_proposal_status,
syncCommitteeRewards: row.sync_comittee_rewards,
attestationsTotalRewards: JSON.parse(row.attestations_total_rewards),
attestationsIdealRewards: JSON.parse(row.attestations_ideal_rewards),
error: JSON.parse(row.error)
}));
await this.sql.unsafe(query);
}

/**
* Get the validators data for the given validator indexes and an epoch start and end range. In order to improve data process
* it will return a map with the validator index as key and the performance data as value.
*
* @param validatorIndexes - The indexes of the validators to get the data for.
* @param startEpoch - The start epoch number.
* @param endEpoch - The end epoch number.
* @returns The performance data for the given validators.
* Get the epoch data for the given validator indexes and epoch range.
*/
public async getValidatorsDataMapForEpochRange({
public async getEpochsDataMapForEpochRange({
validatorIndexes,
startEpoch,
endEpoch
}: {
validatorIndexes: string[];
startEpoch: number;
endEpoch: number;
}): Promise<Map<number, ValidatorPerformance[]>> {
}): Promise<EpochsValidatorsMap> {
const query = `
SELECT * FROM ${this.tableName}
WHERE ${Columns.validatorIndex} = ANY($1)
WHERE ${Columns.validatorindex} = ANY($1)
AND ${Columns.epoch} >= $2
AND ${Columns.epoch} <= $3
`;

const result = (await this.sql.unsafe(query, [
validatorIndexes,
startEpoch,
endEpoch
])) as ValidatorPerformancePostgres[];

return result.reduce((map: Map<number, ValidatorPerformance[]>, row) => {
const key = row.validator_index;

const performanceData = {
validatorIndex: row.validator_index,
epoch: row.epoch,
executionClient: row.execution_client,
consensusClient: row.consensus_client,
slot: row.slot,
liveness: row.liveness,
blockProposalStatus: row.block_proposal_status,
syncCommitteeRewards: row.sync_comittee_rewards,
attestationsTotalRewards: JSON.parse(row.attestations_total_rewards),
attestationsIdealRewards: JSON.parse(row.attestations_ideal_rewards),
error: JSON.parse(row.error)
const result: PostgresDataRow[] = await this.sql.unsafe(query, [validatorIndexes, startEpoch, endEpoch]);

const epochsValidatorsMap: EpochsValidatorsMap = new Map();

for (const row of result) {
const { validatorindex, epoch, clients, attestation, block, synccommittee, slot, error } = row;
const validatorIndexNumber = parseInt(validatorindex);
const epochNumber = parseInt(epoch);
const data: DataPerEpoch = {
[Columns.clients]: clients,
[Columns.attestation]: attestation,
[Columns.block]: block,
[Columns.synccommittee]: synccommittee,
[Columns.slot]: slot,
[Columns.error]: error
};

if (map.has(key)) map.get(key)?.push(performanceData);
else map.set(key, [performanceData]);
if (!epochsValidatorsMap.has(epochNumber)) epochsValidatorsMap.set(epochNumber, new Map());

epochsValidatorsMap.get(epochNumber)!.set(validatorIndexNumber, data);
}

return map;
}, new Map<number, ValidatorPerformance[]>());
return epochsValidatorsMap;
}

/**
* Method to close the database connection.
* Close the database connection.
*/
public async close(): Promise<void> {
await this.sql.end();
Expand Down
Loading

0 comments on commit 725e61b

Please sign in to comment.