Skip to content

Commit

Permalink
Refactored exit signature rotation (#192)
Browse files Browse the repository at this point in the history
* Refactored exit signature rotation

Signed-off-by: cyc60 <avsysoev60@gmail.com>

* Fetch indexes from synced oracles

Signed-off-by: cyc60 <avsysoev60@gmail.com>

* Refactored exit signature fetch

Signed-off-by: cyc60 <avsysoev60@gmail.com>

* Fix is_block_synced func

Signed-off-by: cyc60 <avsysoev60@gmail.com>

* Review fix

Signed-off-by: cyc60 <avsysoev60@gmail.com>

* Review fixes

Signed-off-by: cyc60 <avsysoev60@gmail.com>

---------

Signed-off-by: cyc60 <avsysoev60@gmail.com>
  • Loading branch information
cyc60 authored Oct 9, 2023
1 parent 82a204b commit f31e576
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 369 deletions.
436 changes: 228 additions & 208 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 5 additions & 8 deletions src/commands/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,6 @@ async def main() -> None:
if settings.enable_metrics:
await metrics_server()

# process outdated exit signatures
asyncio.create_task(
update_exit_signatures_periodically(
keystores=keystores,
remote_signer_config=remote_signer_config,
)
)

logger.info('Started operator service')
with InterruptHandler() as interrupt_handler:
while not interrupt_handler.exit:
Expand Down Expand Up @@ -333,6 +325,11 @@ async def main() -> None:
if settings.harvest_vault:
await harvest_vault_task()

# process outdated exit signatures
await update_exit_signatures_periodically(
keystores=keystores,
remote_signer_config=remote_signer_config,
)
# check balance
await check_hot_wallet_balance()

Expand Down
24 changes: 6 additions & 18 deletions src/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import asyncio
import logging
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
Expand All @@ -10,7 +8,7 @@
from web3 import Web3
from web3.types import Timestamp, Wei

from src.common.clients import consensus_client, execution_client
from src.common.clients import consensus_client
from src.common.exceptions import (
InvalidOraclesRequestError,
NotEnoughOracleApprovalsError,
Expand Down Expand Up @@ -58,21 +56,11 @@ def format_error(e: Exception) -> str:
return repr(e)


async def wait_block_finalization(block_number: BlockNumber | None = None):
block_number = block_number or await execution_client.eth.get_block_number()
chain_head = None
sleep_time = 0.0

while not chain_head or chain_head.execution_block < block_number:
await asyncio.sleep(sleep_time)
start = time.time()

chain_head = await consensus_client.get_chain_finalized_head(
settings.network_config.SLOTS_PER_EPOCH
)

elapsed = time.time() - start
sleep_time = float(settings.network_config.SECONDS_PER_BLOCK) - elapsed
async def is_block_finalized(block_number: BlockNumber) -> bool:
chain_head = await consensus_client.get_chain_finalized_head(
settings.network_config.SLOTS_PER_EPOCH
)
return chain_head.execution_block >= block_number


def get_current_timestamp() -> Timestamp:
Expand Down
5 changes: 0 additions & 5 deletions src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,3 @@ def network_config(self) -> NetworkConfig:

# Hashi vault timeout
HASHI_VAULT_TIMEOUT = 10

# Oracles signature update sync (10 minutes)
ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT = 600
# How often to pull update for oracle signature update (every minute)
ORACLE_SIGNATURE_UPDATE_SYNC_DELAY = 60
128 changes: 31 additions & 97 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
import logging
import random
import time
from random import shuffle

from eth_typing import BlockNumber, BLSPubkey
from web3 import Web3
Expand All @@ -12,12 +10,8 @@
from src.common.execution import get_oracles
from src.common.metrics import metrics
from src.common.typings import Oracles
from src.common.utils import get_current_timestamp, wait_block_finalization
from src.config.settings import (
ORACLE_SIGNATURE_UPDATE_SYNC_DELAY,
ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
settings,
)
from src.common.utils import get_current_timestamp, is_block_finalized
from src.config.settings import settings
from src.exits.consensus import get_validator_public_keys
from src.exits.execution import submit_exit_signatures
from src.exits.typings import SignatureRotationRequest
Expand All @@ -39,99 +33,39 @@ async def update_exit_signatures_periodically(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
):
# Oracle may have lag if operator was stopped
# during `update_exit_signatures_periodically` process.
# Wait oracles sync.
oracles = await get_oracles()
await _wait_oracles_signature_update(oracles)

while True:
timer_start = time.time()

try:
oracles = await get_oracles()

oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await _fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
await _update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)

# Wait oracles sync.
await _wait_oracles_signature_update(oracles)
except Exception as e:
logger.exception(e)

elapsed = time.time() - timer_start
await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed)


async def _fetch_outdated_indexes(oracle_endpoint) -> list[int]:
response = await get_oracle_outdated_signatures_response(oracle_endpoint)
outdated_indexes = [val['index'] for val in response['validators']]

metrics.outdated_signatures.set(len(outdated_indexes))
return outdated_indexes


async def _wait_oracles_signature_update(oracles: Oracles) -> None:
last_event = await keeper_contract.get_exit_signatures_updated_event(vault=settings.vault)
if not last_event:
update_block = await _fetch_last_update_block()
if update_block and not await is_block_finalized(update_block):
logger.info('Signatures update block %d has not finalized yet', update_block)
return
update_block = BlockNumber(last_event['blockNumber'])

logger.info('Waiting for block %d finalization...', update_block)
await wait_block_finalization(update_block)

oracle_tasks = {
asyncio.create_task(
_wait_oracle_signature_update(
exit_signature_update_block=update_block,
oracle_endpoint=endpoint,
max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
)
outdated_indexes = await _fetch_outdated_indexes(oracles, update_block)
if outdated_indexes:
await _update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)
for replicas in oracles.endpoints
for endpoint in replicas
}
while oracle_tasks:
done, oracle_tasks = await asyncio.wait(oracle_tasks, return_when=asyncio.FIRST_COMPLETED)
if done:
for pending_task in oracle_tasks:
pending_task.cancel()
logger.info('Oracles have fetched exit signatures update')


async def _wait_oracle_signature_update(
exit_signature_update_block: BlockNumber, oracle_endpoint: str, max_time: int | float = 0
) -> None:
"""
Wait the oracle `oracle_endpoint` reads and processes `ExitSignatureUpdate` event
in the block `exit_signature_update_block`.
"""
elapsed = 0.0
start_time = time.time()

while elapsed <= max_time:
oracle_block = await _fetch_exit_signature_block(oracle_endpoint)
if oracle_block and oracle_block >= exit_signature_update_block:
return

logger.info(
'Waiting for %s to sync block %d...', oracle_endpoint, exit_signature_update_block
)
await asyncio.sleep(ORACLE_SIGNATURE_UPDATE_SYNC_DELAY)
elapsed = time.time() - start_time

raise asyncio.TimeoutError(
f'Timeout exceeded for wait_oracle_signature_block_update for {oracle_endpoint}'
)
async def _fetch_last_update_block() -> BlockNumber | None:
last_event = await keeper_contract.get_exit_signatures_updated_event(vault=settings.vault)
if last_event:
return BlockNumber(last_event['blockNumber'])
return None


async def _fetch_outdated_indexes(oracles: Oracles, update_block: BlockNumber | None) -> list[int]:
endpoints = [endpoint for replicas in oracles.endpoints for endpoint in replicas]
shuffle(endpoints)

for oracle_endpoint in endpoints:
response = await get_oracle_outdated_signatures_response(oracle_endpoint)
if not update_block or response['exit_signature_block_number'] >= update_block:
outdated_indexes = [val['index'] for val in response['validators']]
metrics.outdated_signatures.set(len(outdated_indexes))
return outdated_indexes
raise RuntimeError('Oracles have not synced exit signatures yet')


async def _update_exit_signatures(
Expand Down
35 changes: 2 additions & 33 deletions src/exits/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,20 @@
import asyncio
from pathlib import Path
from random import randint
from typing import Callable
from unittest import mock

import pytest
from eth_typing import BlockNumber, ChecksumAddress
from eth_typing import ChecksumAddress
from sw_utils.typings import ConsensusFork

from src.common.typings import Oracles
from src.common.utils import get_current_timestamp
from src.config.settings import settings
from src.exits.tasks import _get_oracles_request, _wait_oracle_signature_update
from src.exits.tasks import _get_oracles_request
from src.validators.signing.remote import RemoteSignerConfiguration
from src.validators.typings import ExitSignatureShards, Keystores


@pytest.mark.usefixtures('fake_settings')
class TestWaitOracleSignatureUpdate:
async def test_normal(self):
update_block = BlockNumber(3)
with (
mock.patch('asyncio.sleep'),
mock.patch('src.exits.tasks.time.time', return_value=100),
mock.patch(
'src.exits.tasks._fetch_exit_signature_block', side_effect=[None, 1, 2, 3]
) as fetch_mock,
):
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 4

async def test_timeout(self):
update_block = BlockNumber(3)
with (
mock.patch('asyncio.sleep'),
mock.patch('src.exits.tasks.time.time', side_effect=[100, 103, 106]),
mock.patch(
'src.exits.tasks._fetch_exit_signature_block', return_value=None
) as fetch_mock,
pytest.raises(asyncio.TimeoutError),
):
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 2


@pytest.mark.usefixtures('fake_settings')
class TestGetOraclesRequest:
async def test_local_keystores(
Expand Down

0 comments on commit f31e576

Please sign in to comment.