Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only reindex singletons/events if service is synced #2348

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions safe_transaction_service/history/services/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


@dataclass
class IndexingStatus:
class AllIndexingStatus:
current_block_number: int
current_block_timestamp: int
erc20_block_number: int
Expand All @@ -39,10 +39,10 @@ class IndexingStatus:


@dataclass
class ERC20IndexingStatus:
class SpecificIndexingStatus:
current_block_number: int
erc20_block_number: int
erc20_synced: bool
block_number: int
synced: bool


class IndexingException(Exception):
Expand Down Expand Up @@ -126,16 +126,23 @@ def get_master_copies_current_indexing_block_number(self) -> Optional[int]:
min_master_copies_block_number=Min("tx_block_number")
)["min_master_copies_block_number"]

def get_indexing_status(self) -> IndexingStatus:
current_block = self.ethereum_client.get_block("latest")
current_block_number = current_block["number"]

# Indexing points to the next block to be indexed, we need the previous ones
def get_erc20_indexing_status(
self, current_block_number: int
) -> SpecificIndexingStatus:
erc20_block_number = min(
max(self.get_erc20_721_current_indexing_block_number() - 1, 0),
current_block_number,
)
erc20_synced = (
current_block_number - erc20_block_number <= self.eth_reorg_blocks
)
return SpecificIndexingStatus(
current_block_number, erc20_block_number, erc20_synced
)

def get_master_copies_indexing_status(
self, current_block_number: int
) -> SpecificIndexingStatus:
if (
master_copies_current_indexing_block_number := self.get_master_copies_current_indexing_block_number()
) is None:
Expand All @@ -146,33 +153,50 @@ def get_indexing_status(self) -> IndexingStatus:
current_block_number,
)

erc20_synced = (
current_block_number - erc20_block_number <= self.eth_reorg_blocks
)
master_copies_synced = (
current_block_number - master_copies_block_number <= self.eth_reorg_blocks
)
return SpecificIndexingStatus(
current_block_number, master_copies_block_number, master_copies_synced
)

def get_indexing_status(self) -> AllIndexingStatus:
current_block = self.ethereum_client.get_block("latest")
current_block_number = current_block["number"]

erc20_indexing_status = self.get_erc20_indexing_status(current_block_number)
master_copies_indexing_status = self.get_master_copies_indexing_status(
current_block_number
)

if erc20_block_number == master_copies_block_number == current_block_number:
if (
erc20_indexing_status.block_number
== master_copies_indexing_status.block_number
== current_block_number
):
erc20_block, master_copies_block = [current_block, current_block]
else:
erc20_block, master_copies_block = self.ethereum_client.get_blocks(
[erc20_block_number, master_copies_block_number]
[
erc20_indexing_status.block_number,
master_copies_indexing_status.block_number,
]
)
current_block_timestamp = current_block["timestamp"]
erc20_block_timestamp = erc20_block["timestamp"]
master_copies_block_timestamp = master_copies_block["timestamp"]

return IndexingStatus(
return AllIndexingStatus(
current_block_number=current_block_number,
current_block_timestamp=current_block_timestamp,
erc20_block_number=erc20_block_number,
erc20_block_number=erc20_indexing_status.block_number,
erc20_block_timestamp=erc20_block_timestamp,
erc20_synced=erc20_synced,
master_copies_block_number=master_copies_block_number,
erc20_synced=erc20_indexing_status.synced,
master_copies_block_number=master_copies_indexing_status.block_number,
master_copies_block_timestamp=master_copies_block_timestamp,
master_copies_synced=master_copies_synced,
synced=erc20_synced and master_copies_synced,
master_copies_synced=master_copies_indexing_status.synced,
synced=erc20_indexing_status.synced
and master_copies_indexing_status.synced,
)

def is_service_synced(self) -> bool:
Expand Down
14 changes: 12 additions & 2 deletions safe_transaction_service/history/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,17 @@ def process_decoded_internal_txs_for_safe_task(

@app.shared_task(bind=True)
@task_timeout(timeout_seconds=LOCK_TIMEOUT)
def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[int]:
def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> None:
"""
Reindexes last hours for master copies to prevent indexing issues
"""
with contextlib.suppress(LockError):
with only_one_running_task(self):
indexing_status = IndexServiceProvider().get_indexing_status()
if not indexing_status.master_copies_synced:
logger.warning(
"Reindexing master copies will not be executed as service is out of sync"
)
Uxio0 marked this conversation as resolved.
Show resolved Hide resolved
if ethereum_block := EthereumBlock.objects.oldest_than(
seconds=60 * 60 * hours
).first():
Expand All @@ -337,12 +342,17 @@ def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[i

@app.shared_task(bind=True)
@task_timeout(timeout_seconds=LOCK_TIMEOUT)
def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> Optional[int]:
def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> None:
"""
Reindexes last hours for erx20 and erc721 to prevent indexing issues
"""
with contextlib.suppress(LockError):
with only_one_running_task(self):
indexing_status = IndexServiceProvider().get_indexing_status()
if not indexing_status.erc20_synced:
logger.warning(
"Reindexing erc20/721 events will not be executed as service is out of sync"
)
Uxio0 marked this conversation as resolved.
Show resolved Hide resolved
if ethereum_block := EthereumBlock.objects.oldest_than(
seconds=60 * 60 * hours
).first():
Expand Down