Skip to content

Commit

Permalink
Only reindex singletons/events if service is synced (#2348)
Browse files Browse the repository at this point in the history
* Only reindex singletons/events if service is synced

* Fix PR comments

* Fix tests

---------

Co-authored-by: Uxio Fuentefria <6909403+Uxio0@users.noreply.github.com>
  • Loading branch information
Uxio0 and Uxio0 authored Dec 3, 2024
1 parent 3f80247 commit 9731735
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 31 deletions.
64 changes: 44 additions & 20 deletions safe_transaction_service/history/services/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


@dataclass
class IndexingStatus:
class AllIndexingStatus:
current_block_number: int
current_block_timestamp: int
erc20_block_number: int
Expand All @@ -39,10 +39,10 @@ class IndexingStatus:


@dataclass
class ERC20IndexingStatus:
class SpecificIndexingStatus:
current_block_number: int
erc20_block_number: int
erc20_synced: bool
block_number: int
synced: bool


class IndexingException(Exception):
Expand Down Expand Up @@ -126,16 +126,23 @@ def get_master_copies_current_indexing_block_number(self) -> Optional[int]:
min_master_copies_block_number=Min("tx_block_number")
)["min_master_copies_block_number"]

def get_indexing_status(self) -> IndexingStatus:
current_block = self.ethereum_client.get_block("latest")
current_block_number = current_block["number"]

# Indexing points to the next block to be indexed, we need the previous ones
def get_erc20_indexing_status(
self, current_block_number: int
) -> SpecificIndexingStatus:
erc20_block_number = min(
max(self.get_erc20_721_current_indexing_block_number() - 1, 0),
current_block_number,
)
erc20_synced = (
current_block_number - erc20_block_number <= self.eth_reorg_blocks
)
return SpecificIndexingStatus(
current_block_number, erc20_block_number, erc20_synced
)

def get_master_copies_indexing_status(
self, current_block_number: int
) -> SpecificIndexingStatus:
if (
master_copies_current_indexing_block_number := self.get_master_copies_current_indexing_block_number()
) is None:
Expand All @@ -146,33 +153,50 @@ def get_indexing_status(self) -> IndexingStatus:
current_block_number,
)

erc20_synced = (
current_block_number - erc20_block_number <= self.eth_reorg_blocks
)
master_copies_synced = (
current_block_number - master_copies_block_number <= self.eth_reorg_blocks
)
return SpecificIndexingStatus(
current_block_number, master_copies_block_number, master_copies_synced
)

def get_indexing_status(self) -> AllIndexingStatus:
current_block = self.ethereum_client.get_block("latest")
current_block_number = current_block["number"]

erc20_indexing_status = self.get_erc20_indexing_status(current_block_number)
master_copies_indexing_status = self.get_master_copies_indexing_status(
current_block_number
)

if erc20_block_number == master_copies_block_number == current_block_number:
if (
erc20_indexing_status.block_number
== master_copies_indexing_status.block_number
== current_block_number
):
erc20_block, master_copies_block = [current_block, current_block]
else:
erc20_block, master_copies_block = self.ethereum_client.get_blocks(
[erc20_block_number, master_copies_block_number]
[
erc20_indexing_status.block_number,
master_copies_indexing_status.block_number,
]
)
current_block_timestamp = current_block["timestamp"]
erc20_block_timestamp = erc20_block["timestamp"]
master_copies_block_timestamp = master_copies_block["timestamp"]

return IndexingStatus(
return AllIndexingStatus(
current_block_number=current_block_number,
current_block_timestamp=current_block_timestamp,
erc20_block_number=erc20_block_number,
erc20_block_number=erc20_indexing_status.block_number,
erc20_block_timestamp=erc20_block_timestamp,
erc20_synced=erc20_synced,
master_copies_block_number=master_copies_block_number,
erc20_synced=erc20_indexing_status.synced,
master_copies_block_number=master_copies_indexing_status.block_number,
master_copies_block_timestamp=master_copies_block_timestamp,
master_copies_synced=master_copies_synced,
synced=erc20_synced and master_copies_synced,
master_copies_synced=master_copies_indexing_status.synced,
synced=erc20_indexing_status.synced
and master_copies_indexing_status.synced,
)

def is_service_synced(self) -> bool:
Expand Down
26 changes: 24 additions & 2 deletions safe_transaction_service/history/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,21 @@ def process_decoded_internal_txs_for_safe_task(

@app.shared_task(bind=True)
@task_timeout(timeout_seconds=LOCK_TIMEOUT)
def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[int]:
def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> bool:
"""
Reindexes last hours for master copies to prevent indexing issues
:param hours: Hours to reindex from now
:return: `True` if reindexing is triggered, `False` otherwise
"""
with contextlib.suppress(LockError):
with only_one_running_task(self):
indexing_status = IndexServiceProvider().get_indexing_status()
if not indexing_status.master_copies_synced:
logger.warning(
"Reindexing master copies will not be executed as service is out of sync"
)
return False
if ethereum_block := EthereumBlock.objects.oldest_than(
seconds=60 * 60 * hours
).first():
Expand All @@ -333,16 +342,27 @@ def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[i
reindex_master_copies_task.delay(
from_block_number, to_block_number=to_block_number
)
return True
return False


@app.shared_task(bind=True)
@task_timeout(timeout_seconds=LOCK_TIMEOUT)
def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> Optional[int]:
def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> bool:
"""
Reindexes last hours for erx20 and erc721 to prevent indexing issues
:param hours: Hours to reindex from now
:return: `True` if reindexing is triggered, `False` otherwise
"""
with contextlib.suppress(LockError):
with only_one_running_task(self):
indexing_status = IndexServiceProvider().get_indexing_status()
if not indexing_status.erc20_synced:
logger.warning(
"Reindexing erc20/721 events will not be executed as service is out of sync"
)
return False
if ethereum_block := EthereumBlock.objects.oldest_than(
seconds=60 * 60 * hours
).first():
Expand All @@ -361,6 +381,8 @@ def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> Optional[i
reindex_erc20_events_task.delay(
from_block_number, to_block_number=to_block_number
)
return True
return False


@app.shared_task(bind=True)
Expand Down
42 changes: 33 additions & 9 deletions safe_transaction_service/history/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from eth_account import Account

from ...events.services import QueueService
from safe_transaction_service.events.services import QueueService

from ...utils.redis import get_redis
from ..indexers import (
Erc20EventsIndexerProvider,
Expand All @@ -30,6 +31,7 @@
ReorgService,
)
from ..services.collectibles_service import CollectibleWithMetadata
from ..services.index_service import SpecificIndexingStatus
from ..tasks import (
check_reorgs_task,
check_sync_status_task,
Expand Down Expand Up @@ -128,54 +130,76 @@ def test_index_new_proxies_task(self):
def test_index_safe_events_task(self):
self.assertEqual(index_safe_events_task.delay().result, (0, 0))

@patch.object(IndexService, "get_master_copies_indexing_status")
@patch.object(IndexService, "reindex_master_copies")
def test_reindex_mastercopies_last_hours_task(
self, reindex_master_copies_mock: MagicMock
self,
reindex_master_copies_mock: MagicMock,
get_master_copies_indexing_status_mock: MagicMock,
):
get_master_copies_indexing_status_mock.return_value = SpecificIndexingStatus(
0, 0, True
)

now = timezone.now()
one_hour_ago = now - datetime.timedelta(hours=1)
one_day_ago = now - datetime.timedelta(days=1)
one_week_ago = now - datetime.timedelta(weeks=1)

reindex_mastercopies_last_hours_task()
self.assertFalse(reindex_mastercopies_last_hours_task())
reindex_master_copies_mock.assert_not_called()

ethereum_block_0 = EthereumBlockFactory(timestamp=one_week_ago)
ethereum_block_1 = EthereumBlockFactory(timestamp=one_day_ago)
ethereum_block_2 = EthereumBlockFactory(timestamp=one_hour_ago)
ethereum_block_3 = EthereumBlockFactory(timestamp=now)

reindex_mastercopies_last_hours_task()
self.assertTrue(reindex_mastercopies_last_hours_task())
reindex_master_copies_mock.assert_called_once_with(
ethereum_block_1.number,
to_block_number=ethereum_block_3.number,
addresses=None,
)

get_master_copies_indexing_status_mock.return_value = SpecificIndexingStatus(
0, 0, False
)
self.assertFalse(reindex_mastercopies_last_hours_task())

@patch.object(IndexService, "get_erc20_indexing_status")
@patch.object(IndexService, "reindex_erc20_events")
def test_reindex_erc20_erc721_last_hours_task(
self, reindex_erc20_events: MagicMock
self,
reindex_erc20_events_mock: MagicMock,
get_erc20_indexing_status_mock: MagicMock,
):
get_erc20_indexing_status_mock.return_value = SpecificIndexingStatus(0, 0, True)

now = timezone.now()
one_hour_ago = now - datetime.timedelta(hours=1)
one_day_ago = now - datetime.timedelta(days=1)
one_week_ago = now - datetime.timedelta(weeks=1)

reindex_erc20_erc721_last_hours_task()
reindex_erc20_events.assert_not_called()
self.assertFalse(reindex_erc20_erc721_last_hours_task())
reindex_erc20_events_mock.assert_not_called()

ethereum_block_0 = EthereumBlockFactory(timestamp=one_week_ago)
ethereum_block_1 = EthereumBlockFactory(timestamp=one_day_ago)
ethereum_block_2 = EthereumBlockFactory(timestamp=one_hour_ago)
ethereum_block_3 = EthereumBlockFactory(timestamp=now)

reindex_erc20_erc721_last_hours_task()
reindex_erc20_events.assert_called_once_with(
self.assertTrue(reindex_erc20_erc721_last_hours_task())
reindex_erc20_events_mock.assert_called_once_with(
ethereum_block_1.number,
to_block_number=ethereum_block_3.number,
addresses=None,
)

get_erc20_indexing_status_mock.return_value = SpecificIndexingStatus(
0, 0, False
)
self.assertFalse(reindex_erc20_erc721_last_hours_task())

def test_process_decoded_internal_txs_task(self):
owner = Account.create().address
safe_address = Account.create().address
Expand Down

0 comments on commit 9731735

Please sign in to comment.