Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate tasks for main functions #230

Merged
merged 7 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 25 additions & 63 deletions src/commands/start.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import asyncio
import logging
import time
import warnings
from pathlib import Path

import click
from eth_typing import ChecksumAddress
from sw_utils import EventScanner, InterruptHandler
from sw_utils.typings import ChainHead

import src
from src.common.clients import consensus_client, execution_client
from src.common.execution import check_hot_wallet_balance
from src.common.metrics import metrics, metrics_server
from src.common.consensus import get_chain_finalized_head
from src.common.execution import WalletTask
from src.common.metrics import MetricsTask, metrics_server
from src.common.startup_check import startup_checks
from src.common.utils import get_build_version, log_verbose
from src.common.validators import validate_eth_address
Expand All @@ -24,19 +22,16 @@
DEFAULT_METRICS_PORT,
settings,
)
from src.exits.tasks import update_exit_signatures
from src.harvest.tasks import harvest_vault as harvest_vault_task
from src.exits.tasks import ExitSignatureTask
from src.harvest.tasks import HarvestTask
from src.validators.database import NetworkValidatorCrud
from src.validators.execution import (
NetworkValidatorsProcessor,
update_unused_validator_keys_metric,
)
from src.validators.execution import NetworkValidatorsProcessor
from src.validators.signing.hashi_vault import (
HashiVaultConfiguration,
load_hashi_vault_keys,
)
from src.validators.signing.remote import RemoteSignerConfiguration
from src.validators.tasks import load_genesis_validators, register_validators
from src.validators.tasks import ValidatorsTask, load_genesis_validators
from src.validators.typings import Keystores
from src.validators.utils import load_deposit_data, load_keystores

Expand Down Expand Up @@ -291,59 +286,30 @@ async def main() -> None:

logger.info('Syncing network validator events...')
chain_state = await get_chain_finalized_head()

to_block = chain_state.execution_block
await network_validators_scanner.process_new_events(to_block)
await network_validators_scanner.process_new_events(chain_state.execution_block)

if settings.enable_metrics:
await metrics_server()

logger.info('Started operator service')
with InterruptHandler() as interrupt_handler:
while not interrupt_handler.exit:
start_time = time.time()
try:
chain_state = await get_chain_finalized_head()
metrics.slot_number.set(chain_state.consensus_block)

to_block = chain_state.execution_block
# process new network validators
await network_validators_scanner.process_new_events(to_block)
# check and register new validators
await update_unused_validator_keys_metric(
keystores=keystores,
remote_signer_config=remote_signer_config,
deposit_data=deposit_data,
)
await register_validators(
keystores=keystores,
remote_signer_config=remote_signer_config,
deposit_data=deposit_data,
)

# submit harvest vault transaction
if settings.harvest_vault:
await harvest_vault_task()

# process outdated exit signatures
await update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
)
# check balance
await check_hot_wallet_balance()

# update metrics
metrics.block_number.set(await execution_client.eth.get_block_number())

except Exception as exc:
log_verbose(exc)

block_processing_time = time.time() - start_time
sleep_time = max(
float(settings.network_config.SECONDS_PER_BLOCK) - block_processing_time, 0
)
await asyncio.sleep(sleep_time)
tasks = [
ValidatorsTask(
keystores=keystores,
remote_signer_config=remote_signer_config,
deposit_data=deposit_data,
).run(interrupt_handler),
ExitSignatureTask(
keystores=keystores,
remote_signer_config=remote_signer_config,
).run(interrupt_handler),
MetricsTask().run(interrupt_handler),
WalletTask().run(interrupt_handler),
]
if settings.harvest_vault:
tasks.append(HarvestTask().run(interrupt_handler))

await asyncio.gather(*tasks)


def log_start() -> None:
Expand Down Expand Up @@ -380,7 +346,3 @@ def setup_logging():

# Logging config does not affect messages issued by `warnings` module
warnings.simplefilter('ignore')


async def get_chain_finalized_head() -> ChainHead:
return await consensus_client.get_chain_finalized_head(settings.network_config.SLOTS_PER_EPOCH)
8 changes: 8 additions & 0 deletions src/common/consensus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from sw_utils.typings import ChainHead

from src.common.clients import consensus_client
from src.config.settings import settings


async def get_chain_finalized_head() -> ChainHead:
return await consensus_client.get_chain_finalized_head(settings.network_config.SLOTS_PER_EPOCH)
6 changes: 6 additions & 0 deletions src/common/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from src.common.clients import execution_client, ipfs_fetch_client
from src.common.contracts import keeper_contract, multicall_contract
from src.common.metrics import metrics
from src.common.tasks import BaseTask
from src.common.typings import Oracles, OraclesCache
from src.common.wallet import hot_wallet
from src.config.settings import settings
Expand Down Expand Up @@ -179,3 +180,8 @@ async def _calculate_median_priority_fee(block_id: BlockIdentifier = 'latest') -
return await _calculate_median_priority_fee(block['number'] - 1)

return Wei(statistics.median(priority_fees))


class WalletTask(BaseTask):
async def process_block(self) -> None:
await check_hot_wallet_balance()
10 changes: 10 additions & 0 deletions src/common/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from prometheus_client import Gauge, Info, start_http_server

import src
from src.common.clients import execution_client
from src.common.consensus import get_chain_finalized_head
from src.common.tasks import BaseTask
from src.config.settings import settings


Expand All @@ -27,3 +30,10 @@ def set_app_version(self):

async def metrics_server() -> None:
start_http_server(settings.metrics_port, settings.metrics_host)


class MetricsTask(BaseTask):
async def process_block(self) -> None:
chain_state = await get_chain_finalized_head()
metrics.block_number.set(await execution_client.eth.get_block_number())
metrics.slot_number.set(chain_state.consensus_block)
29 changes: 29 additions & 0 deletions src/common/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
import logging
import time

from sw_utils import InterruptHandler

from src.common.utils import log_verbose
from src.config.settings import settings

logger = logging.getLogger(__name__)


class BaseTask:
async def process_block(self):
raise NotImplementedError

async def run(self, interrupt_handler: InterruptHandler) -> None:
while not interrupt_handler.exit:
start_time = time.time()
try:
await self.process_block()
except Exception as exc:
log_verbose(exc)

block_processing_time = time.time() - start_time
sleep_time = max(
float(settings.network_config.SECONDS_PER_BLOCK) - block_processing_time, 0
)
await asyncio.sleep(sleep_time)
50 changes: 29 additions & 21 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from src.common.exceptions import NotEnoughOracleApprovalsError
from src.common.execution import get_oracles
from src.common.metrics import metrics
from src.common.tasks import BaseTask
from src.common.typings import Oracles
from src.common.utils import get_current_timestamp, is_block_finalized
from src.config.settings import settings
Expand All @@ -30,28 +31,35 @@
logger = logging.getLogger(__name__)


async def update_exit_signatures(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
) -> None:
oracles = await get_oracles()
update_block = await _fetch_last_update_block()
if update_block and not await is_block_finalized(update_block):
logger.info('Waiting for signatures update block %d to finalize...', update_block)
return
class ExitSignatureTask(BaseTask):
keystores: Keystores
remote_signer_config: RemoteSignerConfiguration | None

if update_block and not await _check_majority_oracles_synced(oracles, update_block):
logger.info('Waiting for the majority of oracles to sync exit signatures')
return
def __init__(
self, keystores: Keystores, remote_signer_config: RemoteSignerConfiguration | None
):
self.keystores = keystores
self.remote_signer_config = remote_signer_config

outdated_indexes = await _fetch_outdated_indexes(oracles, update_block)
if outdated_indexes:
await _update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)
async def process_block(self) -> None:
oracles = await get_oracles()
update_block = await _fetch_last_update_block()
if update_block and not await is_block_finalized(update_block):
logger.info('Waiting for signatures update block %d to finalize...', update_block)
return

if update_block and not await _check_majority_oracles_synced(oracles, update_block):
logger.info('Waiting for the majority of oracles to sync exit signatures')
return

outdated_indexes = await _fetch_outdated_indexes(oracles, update_block)
if outdated_indexes:
await _update_exit_signatures(
keystores=self.keystores,
remote_signer_config=self.remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)


async def _check_majority_oracles_synced(oracles: Oracles, update_block: BlockNumber) -> bool:
Expand Down Expand Up @@ -139,7 +147,7 @@ async def _update_exit_signatures(
break
except NotEnoughOracleApprovalsError as e:
logger.error(
'Failed to fetch oracle approvals. Received %d out of %d, '
'Failed to fetch oracle exit signatures update. Received %d out of %d, '
'the oracles with endpoints %s have failed to respond.',
e.num_votes,
e.threshold,
Expand Down
53 changes: 27 additions & 26 deletions src/harvest/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,36 @@
from src.common.contracts import keeper_contract
from src.common.execution import check_gas_price
from src.common.ipfs import fetch_harvest_params
from src.common.tasks import BaseTask
from src.config.settings import settings
from src.harvest.execution import submit_harvest_transaction

logger = logging.getLogger(__name__)


async def harvest_vault() -> None:
"""Check vault state and send harvest transaction if needed."""

if not await keeper_contract.can_harvest(settings.vault):
return

# check current gas prices
if not await check_gas_price():
return

last_rewards = await keeper_contract.get_last_rewards_update()
if not last_rewards:
return
harvest_params = await fetch_harvest_params(
vault_address=settings.vault,
ipfs_hash=last_rewards.ipfs_hash,
rewards_root=last_rewards.rewards_root,
)
if not harvest_params:
return

logger.info('Starting vault harvest')
tx_hash = await submit_harvest_transaction(harvest_params)
if not tx_hash:
return
logger.info('Successfully harvested vault')
class HarvestTask(BaseTask):
async def process_block(self) -> None:
"""Check vault state and send harvest transaction if needed."""
if not await keeper_contract.can_harvest(settings.vault):
return

# check current gas prices
if not await check_gas_price():
return

last_rewards = await keeper_contract.get_last_rewards_update()
if not last_rewards:
return
harvest_params = await fetch_harvest_params(
vault_address=settings.vault,
ipfs_hash=last_rewards.ipfs_hash,
rewards_root=last_rewards.rewards_root,
)
if not harvest_params:
return

logger.info('Starting vault harvest')
tx_hash = await submit_harvest_transaction(harvest_params)
if not tx_hash:
return
logger.info('Successfully harvested vault')
Loading