diff --git a/scraper_service/requirements.txt b/scraper_service/requirements.txt index ac01f7a..c8c4f49 100644 --- a/scraper_service/requirements.txt +++ b/scraper_service/requirements.txt @@ -27,7 +27,6 @@ requests==2.32.3 scalecodec==1.2.10 six==1.16.0 substrate-interface==1.7.10 -tenacity==9.0.0 toolz==0.12.1 tqdm==4.66.4 typing_extensions==4.12.2 diff --git a/scraper_service/shared/exceptions.py b/scraper_service/shared/exceptions.py new file mode 100644 index 0000000..7d6d755 --- /dev/null +++ b/scraper_service/shared/exceptions.py @@ -0,0 +1,11 @@ +class ShovelException(Exception): + """Base exception for all shovel-related errors""" + pass + +class ShovelProcessingError(ShovelException): + """Fatal error that should crash the process""" + pass + +class DatabaseConnectionError(ShovelException): + """Retryable error for database connection issues""" + pass diff --git a/scraper_service/shared/shovel_base_class.py b/scraper_service/shared/shovel_base_class.py index 1d1034e..5e2a017 100644 --- a/scraper_service/shared/shovel_base_class.py +++ b/scraper_service/shared/shovel_base_class.py @@ -1,14 +1,16 @@ from shared.clickhouse.batch_insert import buffer_insert, flush_buffer, batch_insert_into_clickhouse_table -from shared.substrate import get_substrate_client +from shared.substrate import get_substrate_client, reconnect_substrate from time import sleep from shared.clickhouse.utils import ( get_clickhouse_client, table_exists, ) +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError from tqdm import tqdm import logging import threading from concurrent.futures import ThreadPoolExecutor +import sys class ShovelBaseClass: @@ -16,6 +18,8 @@ class ShovelBaseClass: last_buffer_flush_call_block_number = 0 name = None skip_interval = 1 + MAX_RETRIES = 3 + RETRY_DELAY = 5 def __init__(self, name, skip_interval=1): """ @@ -26,49 +30,80 @@ def __init__(self, name, skip_interval=1): self.starting_block = 0 # Default value, can be overridden by subclasses def start(self): - print("Initialising Substrate client") - substrate = get_substrate_client() - - print("Fetching the finalized block") - finalized_block_hash = substrate.get_chain_finalised_head() - finalized_block_number = substrate.get_block_number( - finalized_block_hash) - - # Start the clickhouse buffer - print("Starting Clickhouse buffer") - executor = ThreadPoolExecutor(max_workers=1) - threading.Thread( - target=flush_buffer, - args=(executor, self._buffer_flush_started, self._buffer_flush_done), - ).start() - - last_scraped_block_number = self.get_checkpoint() - logging.info(f"Last scraped block is {last_scraped_block_number}") - - # Create a list of block numbers to scrape + retry_count = 0 while True: - block_numbers = tqdm( - range(last_scraped_block_number + - 1, finalized_block_number + 1, self.skip_interval) - ) - - if len(block_numbers) > 0: - logging.info( - f"Catching up {len(block_numbers)} blocks") - for block_number in block_numbers: - self.process_block(block_number) - self.checkpoint_block_number = block_number - else: - logging.info( - "Already up to latest finalized block, checking again in 12s...") - - # Make sure to sleep so buffer with checkpoint update is flushed to Clickhouse - # before trying again - sleep(12) - last_scraped_block_number = self.get_checkpoint() - finalized_block_hash = substrate.get_chain_finalised_head() - finalized_block_number = substrate.get_block_number( - finalized_block_hash) + try: + print("Initialising Substrate client") + substrate = get_substrate_client() + + print("Fetching the finalized block") + finalized_block_hash = substrate.get_chain_finalised_head() + finalized_block_number = substrate.get_block_number(finalized_block_hash) + + # Start the clickhouse buffer + print("Starting Clickhouse buffer") + executor = ThreadPoolExecutor(max_workers=1) + buffer_thread = threading.Thread( + target=flush_buffer, + args=(executor, self._buffer_flush_started, self._buffer_flush_done), + daemon=True # Make it a daemon thread so it exits with the main thread + ) + buffer_thread.start() + + last_scraped_block_number = self.get_checkpoint() + logging.info(f"Last scraped block is {last_scraped_block_number}") + + # Create a list of block numbers to scrape + while True: + try: + block_numbers = list(range( + last_scraped_block_number + 1, + finalized_block_number + 1, + self.skip_interval + )) + + if len(block_numbers) > 0: + logging.info(f"Catching up {len(block_numbers)} blocks") + for block_number in tqdm(block_numbers): + try: + self.process_block(block_number) + self.checkpoint_block_number = block_number + except DatabaseConnectionError as e: + logging.error(f"Database connection error while processing block {block_number}: {str(e)}") + raise # Re-raise to be caught by outer try-except + except Exception as e: + logging.error(f"Fatal error while processing block {block_number}: {str(e)}") + raise ShovelProcessingError(f"Failed to process block {block_number}: {str(e)}") + else: + logging.info("Already up to latest finalized block, checking again in 12s...") + + # Reset retry count on successful iteration + retry_count = 0 + + # Make sure to sleep so buffer with checkpoint update is flushed to Clickhouse + sleep(12) + last_scraped_block_number = self.get_checkpoint() + finalized_block_hash = substrate.get_chain_finalised_head() + finalized_block_number = substrate.get_block_number(finalized_block_hash) + + except DatabaseConnectionError as e: + retry_count += 1 + if retry_count > self.MAX_RETRIES: + logging.error(f"Max retries ({self.MAX_RETRIES}) exceeded for database connection. Exiting.") + raise ShovelProcessingError("Max database connection retries exceeded") + + logging.warning(f"Database connection error (attempt {retry_count}/{self.MAX_RETRIES}): {str(e)}") + logging.info(f"Retrying in {self.RETRY_DELAY} seconds...") + sleep(self.RETRY_DELAY) + reconnect_substrate() # Try to reconnect to substrate + continue + + except ShovelProcessingError as e: + logging.error(f"Fatal shovel error: {str(e)}") + sys.exit(1) + except Exception as e: + logging.error(f"Unexpected error: {str(e)}") + sys.exit(1) def process_block(self, n): raise NotImplementedError( @@ -106,7 +141,18 @@ def _buffer_flush_done(self, tables, rows): def get_checkpoint(self): if not table_exists("shovel_checkpoints"): - return self.starting_block - 1 + return max(0, self.starting_block - 1) + + # First check if our shovel has any entries + query = f""" + SELECT count(*) + FROM shovel_checkpoints + WHERE shovel_name = '{self.name}' + """ + count = get_clickhouse_client().execute(query)[0][0] + if count == 0: + return max(0, self.starting_block - 1) + query = f""" SELECT block_number FROM shovel_checkpoints @@ -118,4 +164,4 @@ def get_checkpoint(self): if res: return res[0][0] else: - return self.starting_block - 1 + return max(0, self.starting_block - 1) # This case shouldn't happen due to count check above diff --git a/scraper_service/shovel_block_timestamp/main.py b/scraper_service/shovel_block_timestamp/main.py index 8b73223..ffb7881 100644 --- a/scraper_service/shovel_block_timestamp/main.py +++ b/scraper_service/shovel_block_timestamp/main.py @@ -5,6 +5,7 @@ get_clickhouse_client, table_exists, ) +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError import logging @@ -20,31 +21,50 @@ def process_block(self, n): def do_process_block(self, n): - substrate = get_substrate_client() - - # Create table if it doesn't exist - if not table_exists(self.table_name): - query = f""" - CREATE TABLE IF NOT EXISTS {self.table_name} ( - block_number UInt64 CODEC(Delta, ZSTD), - timestamp DateTime CODEC(Delta, ZSTD), - ) ENGINE = ReplacingMergeTree() - PARTITION BY toYYYYMM(timestamp) - ORDER BY block_number - """ - get_clickhouse_client().execute(query) - - block_hash = substrate.get_block_hash(n) - block_timestamp = int( - substrate.query( - "Timestamp", - "Now", - block_hash=block_hash, - ).serialize() - / 1000 - ) - - buffer_insert(self.table_name, [n, block_timestamp]) + try: + substrate = get_substrate_client() + + try: + if not table_exists(self.table_name): + query = f""" + CREATE TABLE IF NOT EXISTS {self.table_name} ( + block_number UInt64 CODEC(Delta, ZSTD), + timestamp DateTime CODEC(Delta, ZSTD), + ) ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY block_number + """ + get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/check table: {str(e)}") + + try: + block_hash = substrate.get_block_hash(n) + block_timestamp = int( + substrate.query( + "Timestamp", + "Now", + block_hash=block_hash, + ).serialize() + / 1000 + ) + except Exception as e: + raise ShovelProcessingError(f"Failed to get block timestamp from substrate: {str(e)}") + + if block_timestamp == 0 and n != 0: + raise ShovelProcessingError(f"Invalid block timestamp (0) for block {n}") + + try: + buffer_insert(self.table_name, [n, block_timestamp]) + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Convert unexpected exceptions to ShovelProcessingError + raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}") def main(): diff --git a/scraper_service/shovel_daily_balance/main.py b/scraper_service/shovel_daily_balance/main.py index 463e89f..97f54c2 100644 --- a/scraper_service/shovel_daily_balance/main.py +++ b/scraper_service/shovel_daily_balance/main.py @@ -5,7 +5,7 @@ from shared.clickhouse.utils import get_clickhouse_client, table_exists from shared.shovel_base_class import ShovelBaseClass from shared.substrate import get_substrate_client, reconnect_substrate -from tenacity import retry, stop_after_attempt, wait_fixed +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError logging.basicConfig(level=logging.INFO, format="%(asctime)s %(process)d %(message)s") @@ -17,58 +17,92 @@ def process_block(self, n): do_process_block(n, self.table_name) -@retry( - wait=wait_fixed(2), - before_sleep=lambda _: reconnect_substrate(), - stop=stop_after_attempt(15) -) def do_process_block(n, table_name): - if not table_exists(table_name): - query = f""" - CREATE TABLE IF NOT EXISTS {table_name} ( - block_number UInt64 CODEC(Delta, ZSTD), - timestamp DateTime CODEC(Delta, ZSTD), - address String CODEC(ZSTD), - free_balance UInt64 CODEC(Delta, ZSTD), - reserved_balance UInt64 CODEC(Delta, ZSTD), - frozen_balance UInt64 CODEC(Delta, ZSTD) - ) ENGINE = ReplacingMergeTree() - PARTITION BY toYYYYMM(timestamp) - ORDER BY (address, timestamp) - """ - get_clickhouse_client().execute(query) - - (block_timestamp, block_hash) = get_block_metadata(n) - - results = fetch_all_free_balances_at_block(block_hash) - print(f"Processing block {n}. Found {len(results)} balance entries") - for address, balance in results.items(): - buffer_insert( - table_name, - [n, block_timestamp, f"'{address}'", balance["free"], balance["reserved"], balance["frozen"]] - ) + try: + # Create table if it doesn't exist + try: + if not table_exists(table_name): + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + block_number UInt64 CODEC(Delta, ZSTD), + timestamp DateTime CODEC(Delta, ZSTD), + address String CODEC(ZSTD), + free_balance UInt64 CODEC(Delta, ZSTD), + reserved_balance UInt64 CODEC(Delta, ZSTD), + frozen_balance UInt64 CODEC(Delta, ZSTD) + ) ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (address, timestamp) + """ + get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/check table: {str(e)}") + + try: + (block_timestamp, block_hash) = get_block_metadata(n) + except Exception as e: + raise ShovelProcessingError(f"Failed to get block metadata: {str(e)}") + + try: + results = fetch_all_free_balances_at_block(block_hash) + except Exception as e: + raise ShovelProcessingError(f"Failed to fetch balances from substrate: {str(e)}") + + if not results: + raise ShovelProcessingError(f"No balance data returned for block {n}") + + logging.info(f"Processing block {n}. Found {len(results)} balance entries") + + try: + for address, balance in results.items(): + buffer_insert( + table_name, + [n, block_timestamp, f"'{address}'", balance["free"], balance["reserved"], balance["frozen"]] + ) + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Convert unexpected exceptions to ShovelProcessingError + raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}") def fetch_all_free_balances_at_block(block_hash): - substrate = get_substrate_client() - raw_balances = substrate.query_map( - module='System', - storage_function='Account', - block_hash=block_hash, - page_size=1000 - ) - - balances = {} - for address in raw_balances: - address_id = address[0].value - address_info = address[1] - balances[address_id] = { - 'free': address_info['data']['free'], - 'reserved': address_info['data']['reserved'], - 'frozen': address_info['data']['frozen'] if 'frozen' in address_info['data'] else int(address_info['data']['misc_frozen'].value) + int(address_info['data']['fee_frozen'].value), - } - - return balances + """Fetch all balances at a given block hash.""" + try: + substrate = get_substrate_client() + raw_balances = substrate.query_map( + module='System', + storage_function='Account', + block_hash=block_hash, + page_size=1000 + ) + + if not raw_balances: + raise ShovelProcessingError("No balance data returned from substrate") + + balances = {} + for address in raw_balances: + try: + address_id = address[0].value + address_info = address[1] + balances[address_id] = { + 'free': address_info['data']['free'], + 'reserved': address_info['data']['reserved'], + 'frozen': address_info['data']['frozen'] if 'frozen' in address_info['data'] else int(address_info['data']['misc_frozen'].value) + int(address_info['data']['fee_frozen'].value), + } + except (KeyError, ValueError) as e: + logging.warning(f"Skipping malformed account data for {address}: {str(e)}") + continue + + return balances + + except Exception as e: + raise ShovelProcessingError(f"Error fetching balances: {str(e)}") + def main(): BalanceDailyMapShovel(name="balance_daily_map", skip_interval=7200).start() diff --git a/scraper_service/shovel_daily_stake/main.py b/scraper_service/shovel_daily_stake/main.py index bb113fd..4bcddd1 100644 --- a/scraper_service/shovel_daily_stake/main.py +++ b/scraper_service/shovel_daily_stake/main.py @@ -6,7 +6,7 @@ from shared.clickhouse.utils import get_clickhouse_client, table_exists from shared.shovel_base_class import ShovelBaseClass from shared.substrate import reconnect_substrate -from tenacity import retry, stop_after_attempt, wait_fixed +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError logging.basicConfig(level=logging.INFO, format="%(asctime)s %(process)d %(message)s") @@ -17,36 +17,59 @@ class StakeDailyMapShovel(ShovelBaseClass): def process_block(self, n): do_process_block(n, self.table_name) -@retry( - wait=wait_fixed(2), - before_sleep=lambda _: reconnect_substrate(), - stop=stop_after_attempt(15) -) + def do_process_block(n, table_name): - if not table_exists(table_name): - query = f""" - CREATE TABLE IF NOT EXISTS {table_name} ( - block_number UInt64 CODEC(Delta, ZSTD), - timestamp DateTime CODEC(Delta, ZSTD), - coldkey String CODEC(ZSTD), - hotkey String CODEC(ZSTD), - stake UInt64 CODEC(Delta, ZSTD) - ) ENGINE = ReplacingMergeTree() - PARTITION BY toYYYYMM(timestamp) - ORDER BY (coldkey, hotkey, timestamp) - """ - get_clickhouse_client().execute(query) - - (block_timestamp, block_hash) = get_block_metadata(n) - results = rust_bindings.query_block_stakes(block_hash) - print(f"Processing block {n}. Found {len(results)} stake entries") - for result in results: - coldkey, stake = result[1][0] - hotkey = result[0] - buffer_insert( - table_name, - [n, block_timestamp, f"'{coldkey}'", f"'{hotkey}'", stake] - ) + try: + # Create table if it doesn't exist + try: + if not table_exists(table_name): + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + block_number UInt64 CODEC(Delta, ZSTD), + timestamp DateTime CODEC(Delta, ZSTD), + coldkey String CODEC(ZSTD), + hotkey String CODEC(ZSTD), + stake UInt64 CODEC(Delta, ZSTD) + ) ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (coldkey, hotkey, timestamp) + """ + get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/check table: {str(e)}") + + try: + (block_timestamp, block_hash) = get_block_metadata(n) + except Exception as e: + raise ShovelProcessingError(f"Failed to get block metadata: {str(e)}") + + try: + results = rust_bindings.query_block_stakes(block_hash) + except Exception as e: + raise ShovelProcessingError(f"Failed to fetch stakes from substrate: {str(e)}") + + if not results: + raise ShovelProcessingError(f"No stake data returned for block {n}") + + logging.info(f"Processing block {n}. Found {len(results)} stake entries") + + try: + for result in results: + coldkey, stake = result[1][0] # First element is hotkey, second is list of (coldkey, stake) pairs + hotkey = result[0] + buffer_insert( + table_name, + [n, block_timestamp, f"'{coldkey}'", f"'{hotkey}'", stake] + ) + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Convert unexpected exceptions to ShovelProcessingError + raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}") def main(): diff --git a/scraper_service/shovel_events/main.py b/scraper_service/shovel_events/main.py index c410db1..4e2035a 100644 --- a/scraper_service/shovel_events/main.py +++ b/scraper_service/shovel_events/main.py @@ -1,5 +1,3 @@ -from tenacity import retry, stop_after_attempt, wait_fixed - from shared.block_metadata import get_block_metadata from shared.clickhouse.batch_insert import buffer_insert from shared.shovel_base_class import ShovelBaseClass @@ -7,6 +5,7 @@ from shared.clickhouse.utils import ( table_exists, ) +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError import logging from shovel_events.utils import ( @@ -29,47 +28,72 @@ def main(): EventsShovel(name="events").start() -@retry( - wait=wait_fixed(2), - before_sleep=lambda _: reconnect_substrate(), - stop=stop_after_attempt(15) -) def do_process_block(n): - substrate = get_substrate_client() - - (block_timestamp, block_hash) = get_block_metadata(n) - - events = substrate.query( - "System", - "Events", - block_hash=block_hash, - ) - - # Needed to handle edge case of duplicate events in the same block - event_id = 0 - for e in events: - event = e.value["event"] - (column_names, column_types, values) = generate_column_definitions( - event["attributes"] - ) - - table_name = get_table_name( - event["module_id"], event["event_id"], tuple(column_names) - ) - - # Dynamically create table if not exists - if not table_exists(table_name): - create_clickhouse_table( - table_name, column_names, column_types, values) - - # Insert event data into table - all_values = [ - n, - block_timestamp, - event_id, - ] + values - buffer_insert(table_name, all_values) - event_id += 1 + try: + try: + substrate = get_substrate_client() + (block_timestamp, block_hash) = get_block_metadata(n) + except Exception as e: + raise ShovelProcessingError(f"Failed to initialize block processing: {str(e)}") + + try: + events = substrate.query( + "System", + "Events", + block_hash=block_hash, + ) + if not events and n != 0: + raise ShovelProcessingError(f"No events returned for block {n}") + except Exception as e: + raise ShovelProcessingError(f"Failed to fetch events from substrate: {str(e)}") + + # Needed to handle edge case of duplicate events in the same block + event_id = 0 + for e in events: + try: + event = e.value["event"] + # Let column generation errors propagate up - we want to fail on new event types + (column_names, column_types, values) = generate_column_definitions( + event["attributes"] + ) + + try: + table_name = get_table_name( + event["module_id"], event["event_id"], tuple(column_names) + ) + + # Dynamically create table if not exists + if not table_exists(table_name): + create_clickhouse_table( + table_name, column_names, column_types, values) + + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/check table {table_name}: {str(e)}") + + try: + # Insert event data into table + all_values = [ + n, + block_timestamp, + event_id, + ] + values + buffer_insert(table_name, all_values) + event_id += 1 + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except DatabaseConnectionError: + raise + except Exception as e: + # Convert any other errors to ShovelProcessingError to fail the shovel + raise ShovelProcessingError(f"Failed to process event in block {n}: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Convert unexpected exceptions to ShovelProcessingError + raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}") if __name__ == "__main__": diff --git a/scraper_service/shovel_extrinsics/main.py b/scraper_service/shovel_extrinsics/main.py index 19e03d4..1ba5782 100644 --- a/scraper_service/shovel_extrinsics/main.py +++ b/scraper_service/shovel_extrinsics/main.py @@ -1,5 +1,3 @@ -from tenacity import retry, stop_after_attempt, wait_fixed - from shared.block_metadata import get_block_metadata from shared.clickhouse.batch_insert import buffer_insert from shared.clickhouse.utils import ( @@ -8,6 +6,7 @@ ) from shared.shovel_base_class import ShovelBaseClass from shared.substrate import get_substrate_client, reconnect_substrate +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError import logging from shovel_extrinsics.utils import ( @@ -31,84 +30,110 @@ def main(): ExtrinsicsShovel(name="extrinsics").start() -@retry( - wait=wait_fixed(2), - before_sleep=lambda _: reconnect_substrate(), - stop=stop_after_attempt(15) -) def do_process_block(n): - substrate = get_substrate_client() - - (block_timestamp, block_hash) = get_block_metadata(n) - extrinsics = substrate.get_extrinsics(block_number=n) - - events = substrate.query( - "System", - "Events", - block_hash=block_hash, - ) - - extrinsics_success_map = {} - - for e in events: - event = e.value - # Skip irrelevant events - if event["event"]["module_id"] != "System" or (event["event"]["event_id"] != "ExtrinsicSuccess" and event["event"]["event_id"] != "ExtrinsicFailed"): - continue - - extrinsics_success_map[int(event["extrinsic_idx"]) - ] = event["event"]["event_id"] == "ExtrinsicSuccess" - - # Needed to handle edge case of duplicate events in the same block - extrinsic_id = 0 - for e in extrinsics: - extrinsic = e.value - address = extrinsic.get("address", None) - nonce = extrinsic.get("nonce", None) - tip = extrinsic.get("tip", None) - call_function = extrinsic["call"]["call_function"] - call_module = extrinsic["call"]["call_module"] - - base_column_names = ["block_number", "timestamp", "extrinsic_index", - "call_function", "call_module", "success", "address", "nonce", "tip"] - base_column_types = ["UInt64", "DateTime", "UInt64", "String", - "String", "Bool", "Nullable(String)", "Nullable(UInt64)", "Nullable(UInt64)"] - - base_column_values = [format_value(value) for value in [ - n, block_timestamp, extrinsic_id, call_function, call_module, extrinsics_success_map[extrinsic_id], address, nonce, tip]] - - arg_column_names = [] - arg_column_types = [] - arg_values = [] - for arg in extrinsic["call"]["call_args"]: - (_arg_column_names, _arg_column_types, _arg_values) = generate_column_definitions( - arg["value"], arg["name"], arg["type"] - ) - arg_column_names.extend(_arg_column_names) - arg_column_types.extend(_arg_column_types) - arg_values.extend(_arg_values) - - column_names = base_column_names + arg_column_names - column_types = base_column_types + arg_column_types - values = base_column_values + arg_values - - table_name = get_table_name( - call_module, call_function, tuple(column_names) - ) - - # Dynamically create table if not exists - if not table_exists(table_name): - create_clickhouse_table( - table_name, column_names, column_types + try: + try: + substrate = get_substrate_client() + (block_timestamp, block_hash) = get_block_metadata(n) + except Exception as e: + raise ShovelProcessingError(f"Failed to initialize block processing: {str(e)}") + + try: + extrinsics = substrate.get_extrinsics(block_number=n) + if not extrinsics and n != 0: + raise ShovelProcessingError(f"No extrinsics returned for block {n}") + + events = substrate.query( + "System", + "Events", + block_hash=block_hash, ) - - buffer_insert(table_name, values) - extrinsic_id += 1 - - if len(extrinsics_success_map) != extrinsic_id: - logging.error( - f"Expected {len(extrinsics_success_map)} extrinsics, but only found {extrinsic_id}") - exit(1) + if not events and n != 0: + raise ShovelProcessingError(f"No events returned for block {n}") + except Exception as e: + raise ShovelProcessingError(f"Failed to fetch extrinsics or events from substrate: {str(e)}") + + # Map extrinsic success/failure status + extrinsics_success_map = {} + for e in events: + event = e.value + # Skip irrelevant events + if event["event"]["module_id"] != "System" or (event["event"]["event_id"] != "ExtrinsicSuccess" and event["event"]["event_id"] != "ExtrinsicFailed"): + continue + + extrinsics_success_map[int(event["extrinsic_idx"])] = event["event"]["event_id"] == "ExtrinsicSuccess" + + # Needed to handle edge case of duplicate events in the same block + extrinsic_id = 0 + for e in extrinsics: + try: + extrinsic = e.value + address = extrinsic.get("address", None) + nonce = extrinsic.get("nonce", None) + tip = extrinsic.get("tip", None) + call_function = extrinsic["call"]["call_function"] + call_module = extrinsic["call"]["call_module"] + + base_column_names = ["block_number", "timestamp", "extrinsic_index", + "call_function", "call_module", "success", "address", "nonce", "tip"] + base_column_types = ["UInt64", "DateTime", "UInt64", "String", + "String", "Bool", "Nullable(String)", "Nullable(UInt64)", "Nullable(UInt64)"] + + base_column_values = [format_value(value) for value in [ + n, block_timestamp, extrinsic_id, call_function, call_module, extrinsics_success_map[extrinsic_id], address, nonce, tip]] + + # Let column generation errors propagate up - we want to fail on new extrinsic types + arg_column_names = [] + arg_column_types = [] + arg_values = [] + for arg in extrinsic["call"]["call_args"]: + (_arg_column_names, _arg_column_types, _arg_values) = generate_column_definitions( + arg["value"], arg["name"], arg["type"] + ) + arg_column_names.extend(_arg_column_names) + arg_column_types.extend(_arg_column_types) + arg_values.extend(_arg_values) + + column_names = base_column_names + arg_column_names + column_types = base_column_types + arg_column_types + values = base_column_values + arg_values + + try: + table_name = get_table_name( + call_module, call_function, tuple(column_names) + ) + + # Dynamically create table if not exists + if not table_exists(table_name): + create_clickhouse_table( + table_name, column_names, column_types + ) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/check table {table_name}: {str(e)}") + + try: + buffer_insert(table_name, values) + extrinsic_id += 1 + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except DatabaseConnectionError: + raise + except Exception as e: + # Convert any other errors to ShovelProcessingError to fail the shovel + raise ShovelProcessingError(f"Failed to process extrinsic in block {n}: {str(e)}") + + # Verify we processed all extrinsics + if len(extrinsics_success_map) != extrinsic_id: + raise ShovelProcessingError( + f"Expected {len(extrinsics_success_map)} extrinsics, but only found {extrinsic_id}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Convert unexpected exceptions to ShovelProcessingError + raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}") if __name__ == "__main__": diff --git a/scraper_service/shovel_hotkey_owner_map/main.py b/scraper_service/shovel_hotkey_owner_map/main.py index 221b48b..c06fff9 100644 --- a/scraper_service/shovel_hotkey_owner_map/main.py +++ b/scraper_service/shovel_hotkey_owner_map/main.py @@ -6,6 +6,7 @@ get_clickhouse_client, table_exists, ) +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError import logging @@ -20,20 +21,24 @@ def check_root_read_proof(block_hash): + """Check if the owner map has changed using storage proof.""" global last_proof - substrate = get_substrate_client() - r = substrate.rpc_request( - "state_getReadProof", - params=[[OWNERS_PREFIX], block_hash] - ) - this_stakes_proof = set(r["result"]["proof"]) - map_changed = last_proof is None or last_proof.isdisjoint( - this_stakes_proof - ) - last_proof = this_stakes_proof + try: + substrate = get_substrate_client() + r = substrate.rpc_request( + "state_getReadProof", + params=[[OWNERS_PREFIX], block_hash] + ) + this_stakes_proof = set(r["result"]["proof"]) + map_changed = last_proof is None or last_proof.isdisjoint( + this_stakes_proof + ) + last_proof = this_stakes_proof - return map_changed + return map_changed + except Exception as e: + raise ShovelProcessingError(f"Failed to check root read proof: {str(e)}") class HotkeyOwnerMapShovel(ShovelBaseClass): @@ -45,41 +50,79 @@ def process_block(self, n): def do_process_block(self, n): global last_owners - substrate = get_substrate_client() - - # Create table if it doesn't exist - if not table_exists(self.table_name): - query = f""" - CREATE TABLE IF NOT EXISTS {self.table_name} ( - block_number UInt64 CODEC(Delta, ZSTD), - timestamp DateTime CODEC(Delta, ZSTD), - hotkey String CODEC(ZSTD), - coldkey String CODEC(ZSTD), - ) ENGINE = ReplacingMergeTree() - PARTITION BY toYYYYMM(timestamp) - ORDER BY (hotkey, coldkey, timestamp) - """ - get_clickhouse_client().execute(query) - - (block_timestamp, block_hash) = get_block_metadata(n) - map_changed = check_root_read_proof(block_hash) - - # Store owners for every block for fast queries - owners = last_owners if map_changed is False else substrate.query_map( - "SubtensorModule", - "Owner", - block_hash=block_hash, - page_size=1000 - ) - - if map_changed: - last_owners = owners - - for (hotkey, coldkey) in owners: - buffer_insert( - self.table_name, - [n, block_timestamp, f"'{hotkey}'", f"'{coldkey}'"] - ) + + try: + try: + substrate = get_substrate_client() + except Exception as e: + raise ShovelProcessingError(f"Failed to initialize substrate client: {str(e)}") + + # Create table if it doesn't exist + try: + if not table_exists(self.table_name): + query = f""" + CREATE TABLE IF NOT EXISTS {self.table_name} ( + block_number UInt64 CODEC(Delta, ZSTD), + timestamp DateTime CODEC(Delta, ZSTD), + hotkey String CODEC(ZSTD), + coldkey String CODEC(ZSTD), + ) ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (hotkey, coldkey, timestamp) + """ + get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/check table: {str(e)}") + + try: + (block_timestamp, block_hash) = get_block_metadata(n) + except Exception as e: + raise ShovelProcessingError(f"Failed to get block metadata: {str(e)}") + + try: + map_changed = check_root_read_proof(block_hash) + except ShovelProcessingError: + raise + except Exception as e: + raise ShovelProcessingError(f"Failed to check if owner map changed: {str(e)}") + + try: + # Store owners for every block for fast queries + owners = last_owners if map_changed is False else substrate.query_map( + "SubtensorModule", + "Owner", + block_hash=block_hash, + page_size=1000 + ) + + if not owners and n != 0: + raise ShovelProcessingError(f"No owner data returned for block {n}") + + if map_changed: + last_owners = owners + + try: + for (hotkey, coldkey) in owners: + buffer_insert( + self.table_name, + [n, block_timestamp, f"'{hotkey}'", f"'{coldkey}'"] + ) + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except DatabaseConnectionError: + raise + except ShovelProcessingError: + raise + except Exception as e: + raise ShovelProcessingError(f"Failed to process owner data: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Convert unexpected exceptions to ShovelProcessingError + raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}") def main(): diff --git a/scraper_service/shovel_stake_map/main.py b/scraper_service/shovel_stake_map/main.py index 7a1b589..3e87a6f 100644 --- a/scraper_service/shovel_stake_map/main.py +++ b/scraper_service/shovel_stake_map/main.py @@ -8,6 +8,7 @@ from shared.shovel_base_class import ShovelBaseClass from shared.clickhouse.batch_insert import buffer_insert from shared.block_metadata import get_block_metadata +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError from datetime import datetime import time import rust_bindings @@ -19,25 +20,31 @@ format="%(asctime)s %(process)d %(message)s") last_stakes_proof = None +prev_pending_emissions = {} +stake_map = dict() STAKES_PREFIX = "0x658faa385070e074c85bf6b568cf055522fbe0bd0cb77b6b6f365f641b0de381" def check_root_read_proof(block_hash): + """Check if the stake map has changed using storage proof.""" global last_stakes_proof - substrate = get_substrate_client() - r = substrate.rpc_request( - "state_getReadProof", - params=[[STAKES_PREFIX], block_hash] - ) - this_stakes_proof = set(r["result"]["proof"]) - stake_map_changed = last_stakes_proof is None or last_stakes_proof.isdisjoint( - this_stakes_proof - ) - last_stakes_proof = this_stakes_proof + try: + substrate = get_substrate_client() + r = substrate.rpc_request( + "state_getReadProof", + params=[[STAKES_PREFIX], block_hash] + ) + this_stakes_proof = set(r["result"]["proof"]) + stake_map_changed = last_stakes_proof is None or last_stakes_proof.isdisjoint( + this_stakes_proof + ) + last_stakes_proof = this_stakes_proof - return stake_map_changed + return stake_map_changed + except Exception as e: + raise ShovelProcessingError(f"Failed to check root read proof: {str(e)}") class StakeDoubleMapShovel(ShovelBaseClass): @@ -47,126 +54,158 @@ def process_block(self, n): do_process_block(n, self.table_name) -prev_pending_emissions = {} - -stake_map = dict() - - def do_process_block(n, table_name): - # Create table if it doesn't exist - if not table_exists(table_name): - query = f""" - CREATE TABLE IF NOT EXISTS {table_name} ( - block_number UInt64 CODEC(Delta, ZSTD), - timestamp DateTime CODEC(Delta, ZSTD), - hotkey String CODEC(ZSTD), - coldkey String CODEC(ZSTD), - stake UInt64 CODEC(Delta, ZSTD) - ) ENGINE = ReplacingMergeTree() - PARTITION BY toYYYYMM(timestamp) - ORDER BY (coldkey, hotkey, timestamp) - """ - get_clickhouse_client().execute(query) - - if not table_exists("agg_stake_events"): - query = """ - CREATE VIEW agg_stake_events - ( - `block_number` UInt64, - `timestamp` DateTime, - `hotkey` String, - `coldkey` String, - `amount` Int64, - `operation` String - ) - AS SELECT - l.block_number AS block_number, - l.timestamp AS timestamp, - l.tuple_0 AS hotkey, - r.coldkey AS coldkey, - l.tuple_1 AS amount, - 'remove' AS operation - FROM shovel_hotkey_owner_map AS r - INNER JOIN shovel_events_SubtensorModule_StakeRemoved_v0 AS l ON (l.tuple_0 = r.hotkey) AND (l.timestamp = r.timestamp) - UNION ALL - SELECT - sa.block_number, - sa.timestamp, - sa.tuple_0 AS hotkey, - r.coldkey, - sa.tuple_1 AS amount, - 'add' AS operation - FROM shovel_hotkey_owner_map AS r - INNER JOIN shovel_events_SubtensorModule_StakeAdded_v0 AS sa ON (sa.tuple_0 = r.hotkey) AND (sa.timestamp = r.timestamp); - """ - get_clickhouse_client().execute(query) - - (block_timestamp, block_hash) = get_block_metadata(n) - - hotkeys_needing_update = set() - - # Get pending emission amount for every subnet - result = rust_bindings.query_map_pending_emission(block_hash) - for subnet_id, pending_emission in result: - if (subnet_id not in prev_pending_emissions) or pending_emission == 0 and prev_pending_emissions[subnet_id] != 0: - print( - f"Refreshing all hotkeys from subnet { - subnet_id} due to update..." + try: + # Create table if it doesn't exist + try: + if not table_exists(table_name): + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + block_number UInt64 CODEC(Delta, ZSTD), + timestamp DateTime CODEC(Delta, ZSTD), + hotkey String CODEC(ZSTD), + coldkey String CODEC(ZSTD), + stake UInt64 CODEC(Delta, ZSTD) + ) ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (coldkey, hotkey, timestamp) + """ + get_clickhouse_client().execute(query) + + if not table_exists("agg_stake_events"): + query = """ + CREATE VIEW agg_stake_events + ( + `block_number` UInt64, + `timestamp` DateTime, + `hotkey` String, + `coldkey` String, + `amount` Int64, + `operation` String + ) + AS SELECT + l.block_number AS block_number, + l.timestamp AS timestamp, + l.tuple_0 AS hotkey, + r.coldkey AS coldkey, + l.tuple_1 AS amount, + 'remove' AS operation + FROM shovel_hotkey_owner_map AS r + INNER JOIN shovel_events_SubtensorModule_StakeRemoved_v0 AS l ON (l.tuple_0 = r.hotkey) AND (l.timestamp = r.timestamp) + UNION ALL + SELECT + sa.block_number, + sa.timestamp, + sa.tuple_0 AS hotkey, + r.coldkey, + sa.tuple_1 AS amount, + 'add' AS operation + FROM shovel_hotkey_owner_map AS r + INNER JOIN shovel_events_SubtensorModule_StakeAdded_v0 AS sa ON (sa.tuple_0 = r.hotkey) AND (sa.timestamp = r.timestamp); + """ + get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/check tables: {str(e)}") + + try: + (block_timestamp, block_hash) = get_block_metadata(n) + except Exception as e: + raise ShovelProcessingError(f"Failed to get block metadata: {str(e)}") + + hotkeys_needing_update = set() + + try: + # Get pending emission amount for every subnet + result = rust_bindings.query_map_pending_emission(block_hash) + if result is None: + raise ShovelProcessingError(f"Failed to get pending emissions for block {n}") + + for subnet_id, pending_emission in result: + if (subnet_id not in prev_pending_emissions) or pending_emission == 0 and prev_pending_emissions[subnet_id] != 0: + logging.info(f"Refreshing all hotkeys from subnet {subnet_id} due to update...") + + subnet_hotkeys = rust_bindings.query_subnet_hotkeys( + block_hash, subnet_id + ) + if subnet_hotkeys is None: + raise ShovelProcessingError(f"Failed to get subnet hotkeys for subnet {subnet_id}") + + count = 0 + for (neuron_id, hotkey) in subnet_hotkeys: + hotkeys_needing_update.add(hotkey) + count += 1 + logging.info(f"Found {count} hotkeys for {subnet_id}") + + prev_pending_emissions[subnet_id] = pending_emission + + except ShovelProcessingError: + raise + except Exception as e: + raise ShovelProcessingError(f"Failed to process subnet data: {str(e)}") + + try: + # Check if we're up to date with dependencies + events_synced_block_query = "SELECT block_number FROM shovel_checkpoints FINAL WHERE shovel_name = 'events';" + events_synced_block = get_clickhouse_client().execute( + events_synced_block_query)[0][0] + hotkey_owner_map_synced_block_query = "SELECT block_number FROM shovel_checkpoints FINAL WHERE shovel_name = 'hotkey_owner_map';" + hotkey_owner_map_synced_block = get_clickhouse_client().execute( + hotkey_owner_map_synced_block_query)[0][0] + + while (events_synced_block < n or hotkey_owner_map_synced_block < n): + logging.info("Waiting for events and hotkey_owner_map tables to sync...") + time.sleep(60) + events_synced_block = get_clickhouse_client().execute( + events_synced_block_query)[0][0] + hotkey_owner_map_synced_block = get_clickhouse_client().execute( + hotkey_owner_map_synced_block_query)[0][0] + + # Get hotkeys with stake events this block + dt_object = datetime.fromtimestamp(block_timestamp) + formatted_date = dt_object.strftime("%Y-%m-%d %H:%M:%S") + distinct_hotkeys_query = f""" + SELECT DISTINCT(hotkey) from agg_stake_events WHERE timestamp = '{formatted_date}' + """ + distinct_hotkeys = get_clickhouse_client().execute(distinct_hotkeys_query) + + for r in distinct_hotkeys: + hotkeys_needing_update.add(r[0]) + + except Exception as e: + raise DatabaseConnectionError(f"Failed to check dependency sync status: {str(e)}") + + try: + # Get agg stake events for this block + r = rust_bindings.query_hotkeys_stakes( + block_hash, list(hotkeys_needing_update) ) - subnet_hotkeys = rust_bindings.query_subnet_hotkeys( - block_hash, subnet_id - ) - - count = 0 - for (neuron_id, hotkey) in subnet_hotkeys: - hotkeys_needing_update.add(hotkey) - count += 1 - print( - f"Found {count} hotkeys for {subnet_id}" - ) - - prev_pending_emissions[subnet_id] = pending_emission - - # Check if we're up to date - events_synced_block_query = "SELECT block_number FROM shovel_checkpoints FINAL WHERE shovel_name = 'events';" - events_synced_block = get_clickhouse_client().execute( - events_synced_block_query)[0][0] - hotkey_owner_map_synced_block_query = "SELECT block_number FROM shovel_checkpoints FINAL WHERE shovel_name = 'hotkey_owner_map';" - hotkey_owner_map_synced_block = get_clickhouse_client().execute( - hotkey_owner_map_synced_block_query)[0][0] - - while (events_synced_block < n or hotkey_owner_map_synced_block < n): - print("Waiting for events and hotkey_owner_map tables to sync...") - time.sleep(60) - events_synced_block = get_clickhouse_client().execute( - events_synced_block_query)[0][0] - hotkey_owner_map_synced_block = get_clickhouse_client().execute( - hotkey_owner_map_synced_block_query)[0][0] - - # Get hotkeys with stake events this block - dt_object = datetime.fromtimestamp(block_timestamp) - formatted_date = dt_object.strftime("%Y-%m-%d %H:%M:%S") - distinct_hotkeys_query = f""" - SELECT DISTINCT(hotkey) from agg_stake_events WHERE timestamp = '{formatted_date}' - """ - distinct_hotkeys = get_clickhouse_client().execute(distinct_hotkeys_query) - - for r in distinct_hotkeys: - hotkeys_needing_update.add(r[0]) - - # Get agg stake events for this block - r = rust_bindings.query_hotkeys_stakes( - block_hash, list(hotkeys_needing_update) - ) - for (hotkey, coldkey_stakes) in r: - for (coldkey, stake) in coldkey_stakes: - stake_map[(hotkey, coldkey)] = stake - - for ((hotkey, coldkey), stake) in stake_map.items(): - buffer_insert( - table_name, - [n, block_timestamp, f"'{hotkey}'", f"'{coldkey}'", stake] - ) + if r is None: + raise ShovelProcessingError(f"Failed to get stakes for hotkeys in block {n}") + + for (hotkey, coldkey_stakes) in r: + for (coldkey, stake) in coldkey_stakes: + stake_map[(hotkey, coldkey)] = stake + + try: + for ((hotkey, coldkey), stake) in stake_map.items(): + buffer_insert( + table_name, + [n, block_timestamp, f"'{hotkey}'", f"'{coldkey}'", stake] + ) + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except DatabaseConnectionError: + raise + except Exception as e: + raise ShovelProcessingError(f"Failed to process stake data: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Convert unexpected exceptions to ShovelProcessingError + raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}") @lru_cache diff --git a/scraper_service/shovel_subnets/main.py b/scraper_service/shovel_subnets/main.py index 318f9ae..91ac3ff 100644 --- a/scraper_service/shovel_subnets/main.py +++ b/scraper_service/shovel_subnets/main.py @@ -1,94 +1,122 @@ from shared.substrate import reconnect_substrate -from tenacity import retry, stop_after_attempt, wait_fixed from shared.block_metadata import get_block_metadata from shared.clickhouse.batch_insert import buffer_insert from shared.shovel_base_class import ShovelBaseClass import logging import rust_bindings -import time from shovel_subnets.utils import create_table, get_axon_cache, get_coldkeys_and_stakes, refresh_axon_cache, default_axon +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError logging.basicConfig(level=logging.INFO, format="%(asctime)s %(process)d %(message)s") -# temporary hack around memory leak: restart shovel every 10mins -# TODO: find and fix the actual issue causing the leak! -start_time = time.time() - class SubnetsShovel(ShovelBaseClass): def process_block(self, n): - cur_time = time.time() - if cur_time - start_time > 600: - logging.info("Restarting shovel to avoid memory leak.") - exit(0) - do_process_block(n) - - -@retry( - wait=wait_fixed(2), - before_sleep=lambda _: reconnect_substrate(), - stop=stop_after_attempt(15) -) + try: + do_process_block(n) + except Exception as e: + if isinstance(e, (DatabaseConnectionError, ShovelProcessingError)): + raise + raise ShovelProcessingError(f"Failed to process block {n}: {str(e)}") + + def do_process_block(n): - # Create table if it doesn't exist - create_table() - - (block_timestamp, block_hash) = get_block_metadata(n) - - (neurons, hotkeys) = rust_bindings.query_neuron_info(block_hash) - - coldkeys_and_stakes = get_coldkeys_and_stakes( - hotkeys, block_timestamp, block_hash, n - ) - - refresh_axon_cache(block_timestamp, block_hash, n) - - axon_cache = get_axon_cache() - for neuron in neurons: - subnet_id = neuron.subnet_id - hotkey = neuron.hotkey - axon = axon_cache.get((subnet_id, hotkey)) or default_axon - coldkey_and_stake = coldkeys_and_stakes.get(hotkey) - if coldkey_and_stake is None: - print(f"{hotkey} has no coldkey and stake!") - exit(1) - - buffer_insert("shovel_subnets", [ - n, # block_number UInt64 CODEC(Delta, ZSTD), - block_timestamp, # timestamp DateTime CODEC(Delta, ZSTD), - neuron.subnet_id, # subnet_id UInt16 CODEC(Delta, ZSTD), - neuron.neuron_id, # neuron_id UInt16 CODEC(Delta, ZSTD), - - f"'{neuron.hotkey}'", # hotkey String CODEC(ZSTD), - f"'{coldkey_and_stake[0]}'", # coldkey String CODEC(ZSTD), - neuron.active, # active Bool CODEC(ZSTD), - - axon.block, # axon_block UInt64 CODEC(Delta, ZSTD), - axon.version, # axon_version UInt32 CODEC(Delta, ZSTD), - f'{axon.ip}', # axon_ip String CODEC(ZSTD), - axon.port, # axon_port UInt16 CODEC(Delta, ZSTD), - axon.ip_type, # axon_ip_type UInt8 CODEC(Delta, ZSTD), - axon.protocol, # axon_protocol UInt8 CODEC(Delta, ZSTD), - axon.placeholder1, # axon_placeholder1 UInt8 - axon.placeholder2, # axon_placeholder2 UInt8 - - neuron.rank, # rank UInt16 CODEC(Delta, ZSTD), - neuron.emission, # emission UInt64 CODEC(Delta, ZSTD), - neuron.incentive, # incentive UInt16 CODEC(Delta, ZSTD), - neuron.consensus, # consensus UInt16 CODEC(Delta, ZSTD), - neuron.trust, # trust UInt16 CODEC(Delta, ZSTD), - neuron.validator_trust, # validator_trust UInt16 - neuron.dividends, # dividends UInt16 CODEC(Delta, ZSTD), - coldkey_and_stake[1], # stake UInt64 CODEC(Delta, ZSTD), - neuron.weights, # weights Array(Tuple(UInt16, UInt16)), - neuron.bonds, # bonds Array(Tuple(UInt16, UInt16)) CODEC(ZSTD), - neuron.last_update, # last_update UInt64 CODEC(Delta, ZSTD), - - neuron.validator_permit, # validator_permit Bool - neuron.pruning_scores # pruning_score UInt16 CODEC(Delta, ZSTD) - ]) + try: + # Create table if it doesn't exist + try: + create_table() + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/verify table: {str(e)}") + + try: + (block_timestamp, block_hash) = get_block_metadata(n) + except Exception as e: + raise ShovelProcessingError(f"Failed to get block metadata: {str(e)}") + + try: + (neurons, hotkeys) = rust_bindings.query_neuron_info(block_hash) + if neurons is None or hotkeys is None: + raise ShovelProcessingError("Received None response from query_neuron_info") + except Exception as e: + raise ShovelProcessingError(f"Failed to query neuron info: {str(e)}") + + try: + coldkeys_and_stakes = get_coldkeys_and_stakes( + hotkeys, block_timestamp, block_hash, n + ) + if coldkeys_and_stakes is None: + raise ShovelProcessingError("Received None response from get_coldkeys_and_stakes") + except Exception as e: + if isinstance(e, DatabaseConnectionError): + raise + raise ShovelProcessingError(f"Failed to get coldkeys and stakes: {str(e)}") + + try: + refresh_axon_cache(block_timestamp, block_hash, n) + except Exception as e: + if isinstance(e, DatabaseConnectionError): + raise + raise ShovelProcessingError(f"Failed to refresh axon cache: {str(e)}") + + axon_cache = get_axon_cache() + + try: + for neuron in neurons: + subnet_id = neuron.subnet_id + hotkey = neuron.hotkey + axon = axon_cache.get((subnet_id, hotkey)) or default_axon + coldkey_and_stake = coldkeys_and_stakes.get(hotkey) + if coldkey_and_stake is None: + logging.error(f"{hotkey} has no coldkey and stake!") + raise ShovelProcessingError(f"Neuron {hotkey} has no coldkey and stake data") + + buffer_insert("shovel_subnets", [ + n, # block_number UInt64 CODEC(Delta, ZSTD), + block_timestamp, # timestamp DateTime CODEC(Delta, ZSTD), + neuron.subnet_id, # subnet_id UInt16 CODEC(Delta, ZSTD), + neuron.neuron_id, # neuron_id UInt16 CODEC(Delta, ZSTD), + + f"'{neuron.hotkey}'", # hotkey String CODEC(ZSTD), + f"'{coldkey_and_stake[0]}'", # coldkey String CODEC(ZSTD), + neuron.active, # active Bool CODEC(ZSTD), + + axon.block, # axon_block UInt64 CODEC(Delta, ZSTD), + axon.version, # axon_version UInt32 CODEC(Delta, ZSTD), + f'{axon.ip}', # axon_ip String CODEC(ZSTD), + axon.port, # axon_port UInt16 CODEC(Delta, ZSTD), + axon.ip_type, # axon_ip_type UInt8 CODEC(Delta, ZSTD), + axon.protocol, # axon_protocol UInt8 CODEC(Delta, ZSTD), + axon.placeholder1, # axon_placeholder1 UInt8 + axon.placeholder2, # axon_placeholder2 UInt8 + + neuron.rank, # rank UInt16 CODEC(Delta, ZSTD), + neuron.emission, # emission UInt64 CODEC(Delta, ZSTD), + neuron.incentive, # incentive UInt16 CODEC(Delta, ZSTD), + neuron.consensus, # consensus UInt16 CODEC(Delta, ZSTD), + neuron.trust, # trust UInt16 CODEC(Delta, ZSTD), + neuron.validator_trust, # validator_trust UInt16 + neuron.dividends, # dividends UInt16 CODEC(Delta, ZSTD), + coldkey_and_stake[1], # stake UInt64 CODEC(Delta, ZSTD), + neuron.weights, # weights Array(Tuple(UInt16, UInt16)), + neuron.bonds, # bonds Array(Tuple(UInt16, UInt16)) CODEC(ZSTD), + neuron.last_update, # last_update UInt64 CODEC(Delta, ZSTD), + + neuron.validator_permit, + neuron.pruning_scores # pruning_score UInt16 CODEC(Delta, ZSTD) + ]) + except Exception as e: + if isinstance(e, DatabaseConnectionError): + raise + raise ShovelProcessingError(f"Failed to process neuron data: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Catch any other unexpected errors and wrap them + raise ShovelProcessingError(f"Unexpected error in do_process_block: {str(e)}") def main(): diff --git a/scraper_service/shovel_subnets/utils.py b/scraper_service/shovel_subnets/utils.py index 133862a..60f0bd9 100644 --- a/scraper_service/shovel_subnets/utils.py +++ b/scraper_service/shovel_subnets/utils.py @@ -9,6 +9,8 @@ table_exists, ) from collections import namedtuple +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError +import logging Axon = namedtuple('Axon', ['block', 'version', 'ip', 'port', 'ip_type', 'protocol', 'placeholder1', 'placeholder2']) @@ -17,46 +19,54 @@ def create_table(): - if not table_exists("shovel_subnets"): - query = """ - CREATE TABLE IF NOT EXISTS shovel_subnets ( - block_number UInt64 CODEC(Delta, ZSTD), - timestamp DateTime CODEC(Delta, ZSTD), - subnet_id UInt16 CODEC(Delta, ZSTD), - neuron_id UInt16 CODEC(Delta, ZSTD), - - hotkey String CODEC(ZSTD), - coldkey String CODEC(ZSTD), - active Bool CODEC(ZSTD), - - axon_block UInt64 CODEC(Delta, ZSTD), - axon_version UInt32 CODEC(Delta, ZSTD), - axon_ip String CODEC(ZSTD), - axon_port UInt16 CODEC(Delta, ZSTD), - axon_ip_type UInt8 CODEC(Delta, ZSTD), - axon_protocol UInt8 CODEC(Delta, ZSTD), - axon_placeholder1 UInt8 CODEC(Delta, ZSTD), - axon_placeholder2 UInt8 CODEC(Delta, ZSTD), - - rank UInt16 CODEC(Delta, ZSTD), - emission UInt64 CODEC(Delta, ZSTD), - incentive UInt16 CODEC(Delta, ZSTD), - consensus UInt16 CODEC(Delta, ZSTD), - trust UInt16 CODEC(Delta, ZSTD), - validator_trust UInt16 CODEC(Delta, ZSTD), - dividends UInt16 CODEC(Delta, ZSTD), - stake UInt64 CODEC(Delta, ZSTD), - weights Array(Tuple(UInt16, UInt16)) CODEC(ZSTD), - bonds Array(Tuple(UInt16, UInt16)) CODEC(ZSTD), - last_update UInt64 CODEC(Delta, ZSTD), - - validator_permit Bool CODEC(Delta, ZSTD), - pruning_scores UInt16 CODEC(Delta, ZSTD) - ) ENGINE = ReplacingMergeTree() - PARTITION BY toYYYYMM(timestamp) - ORDER BY (subnet_id, neuron_id, timestamp) - """ - get_clickhouse_client().execute(query) + try: + if not table_exists("shovel_subnets"): + query = """ + CREATE TABLE IF NOT EXISTS shovel_subnets ( + block_number UInt64 CODEC(Delta, ZSTD), + timestamp DateTime CODEC(Delta, ZSTD), + subnet_id UInt16 CODEC(Delta, ZSTD), + neuron_id UInt16 CODEC(Delta, ZSTD), + + hotkey String CODEC(ZSTD), + coldkey String CODEC(ZSTD), + active Bool CODEC(ZSTD), + + axon_block UInt64 CODEC(Delta, ZSTD), + axon_version UInt32 CODEC(Delta, ZSTD), + axon_ip String CODEC(ZSTD), + axon_port UInt16 CODEC(Delta, ZSTD), + axon_ip_type UInt8 CODEC(Delta, ZSTD), + axon_protocol UInt8 CODEC(Delta, ZSTD), + axon_placeholder1 UInt8 CODEC(Delta, ZSTD), + axon_placeholder2 UInt8 CODEC(Delta, ZSTD), + + rank UInt16 CODEC(Delta, ZSTD), + emission UInt64 CODEC(Delta, ZSTD), + incentive UInt16 CODEC(Delta, ZSTD), + consensus UInt16 CODEC(Delta, ZSTD), + trust UInt16 CODEC(Delta, ZSTD), + validator_trust UInt16 CODEC(Delta, ZSTD), + dividends UInt16 CODEC(Delta, ZSTD), + stake UInt64 CODEC(Delta, ZSTD), + weights Array(Tuple(UInt16, UInt16)) CODEC(ZSTD), + bonds Array(Tuple(UInt16, UInt16)) CODEC(ZSTD), + last_update UInt64 CODEC(Delta, ZSTD), + + validator_permit Bool CODEC(Delta, ZSTD), + pruning_scores UInt16 CODEC(Delta, ZSTD) + ) ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (subnet_id, neuron_id, timestamp) + """ + try: + get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to execute table creation query: {str(e)}") + except Exception as e: + if isinstance(e, DatabaseConnectionError): + raise + raise DatabaseConnectionError(f"Failed to create table: {str(e)}") axon_extrinsics_cache = {} @@ -72,57 +82,101 @@ def get_axon_cache(): def refresh_axon_cache(block_timestamp, block_hash, block_number): global axon_cache - extrinsics_synced_block_query = f"SELECT block_number FROM shovel_checkpoints FINAL WHERE shovel_name = 'extrinsics';" - extrinsics_synced_block = get_clickhouse_client().execute( - extrinsics_synced_block_query)[0][0] - - while (extrinsics_synced_block < block_number): - print("Waiting for extrinsics table to sync...") - time.sleep(60) - extrinsics_synced_block = get_clickhouse_client().execute( - extrinsics_synced_block_query)[0][0] + try: + extrinsics_synced_block_query = f"SELECT block_number FROM shovel_checkpoints FINAL WHERE shovel_name = 'extrinsics';" + try: + result = get_clickhouse_client().execute(extrinsics_synced_block_query) + if not result: + raise DatabaseConnectionError("No checkpoint found for extrinsics shovel") + extrinsics_synced_block = result[0][0] + except Exception as e: + raise DatabaseConnectionError(f"Failed to query extrinsics sync status: {str(e)}") + + while (extrinsics_synced_block < block_number): + logging.info("Waiting for extrinsics table to sync...") + time.sleep(60) + try: + result = get_clickhouse_client().execute(extrinsics_synced_block_query) + if not result: + raise DatabaseConnectionError("No checkpoint found for extrinsics shovel") + extrinsics_synced_block = result[0][0] + except Exception as e: + raise DatabaseConnectionError(f"Failed to query extrinsics sync status during wait: {str(e)}") + except Exception as e: + if isinstance(e, DatabaseConnectionError): + raise + raise DatabaseConnectionError(f"Failed to check extrinsics sync status: {str(e)}") # Init the axon cache on first run if len(axon_cache) == 0: - print("Initializing axon cache...") - axon_cache = rust_bindings.query_axons(block_hash) - print("Initialized axon cache") + try: + logging.info("Initializing axon cache...") + axon_cache = rust_bindings.query_axons(block_hash) + if axon_cache is None: + raise ShovelProcessingError("Received None response from query_axons") + logging.info("Initialized axon cache") + except Exception as e: + if "DatabaseConnectionError" in str(type(e)): + raise + raise ShovelProcessingError(f"Failed to initialize axon cache: {str(e)}") # Get events this block if block_timestamp not in axon_extrinsics_cache: - # get any axon extrinsics this block - dt_object = datetime.fromtimestamp(block_timestamp) - formatted_date = dt_object.strftime("%Y-%m-%d %H:%M:%S") - query = f""" - SELECT address, arg_netuid, arg_version, arg_ip, arg_port, arg_ip_type, arg_protocol, arg_placeholder1, arg_placeholder2 - FROM shovel_extrinsics_SubtensorModule_serve_axon_v0 - WHERE timestamp = '{formatted_date}' AND success = True - """ - axon_events = get_clickhouse_client().execute(query) - axon_extrinsics_cache[block_timestamp] = axon_events + try: + # get any axon extrinsics this block + dt_object = datetime.fromtimestamp(block_timestamp) + formatted_date = dt_object.strftime("%Y-%m-%d %H:%M:%S") + query = f""" + SELECT address, arg_netuid, arg_version, arg_ip, arg_port, arg_ip_type, arg_protocol, arg_placeholder1, arg_placeholder2 + FROM shovel_extrinsics_SubtensorModule_serve_axon_v0 + WHERE timestamp = '{formatted_date}' AND success = True + """ + try: + axon_events = get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to execute axon events query: {str(e)}") + + if axon_events is None: + raise ShovelProcessingError("Received None response from axon events query") + axon_extrinsics_cache[block_timestamp] = axon_events + except Exception as e: + if isinstance(e, (DatabaseConnectionError, ShovelProcessingError)): + raise + raise DatabaseConnectionError(f"Failed to fetch axon events: {str(e)}") for e in axon_events: - hotkey = e[0] - subnet_id = e[1] - version = e[2] - ip = e[3] - port = e[4] - ip_type = e[5] - protocol = e[6] - placeholder1 = e[7] - placeholder2 = e[8] - - print(f"Updating axon cache for {subnet_id}:{hotkey}") - axon_cache[(subnet_id, hotkey)] = Axon( - block=block_number, - version=version, - ip=ip, - port=port, - ip_type=ip_type, - protocol=protocol, - placeholder1=placeholder1, - placeholder2=placeholder2 - ) + try: + if len(e) < 9: + raise ShovelProcessingError(f"Invalid axon event data: expected 9 fields, got {len(e)}") + + hotkey = e[0] + subnet_id = e[1] + version = e[2] + ip = e[3] + port = e[4] + ip_type = e[5] + protocol = e[6] + placeholder1 = e[7] + placeholder2 = e[8] + + if not all(x is not None for x in [hotkey, subnet_id, version, ip, port, ip_type, protocol, placeholder1, placeholder2]): + raise ShovelProcessingError(f"Invalid axon event data: contains None values") + + logging.info(f"Updating axon cache for {subnet_id}:{hotkey}") + axon_cache[(subnet_id, hotkey)] = Axon( + block=block_number, + version=version, + ip=ip, + port=port, + ip_type=ip_type, + protocol=protocol, + placeholder1=placeholder1, + placeholder2=placeholder2 + ) + except Exception as e: + if isinstance(e, ShovelProcessingError): + raise + raise ShovelProcessingError(f"Failed to process axon event: {str(e)}") coldkey_stake_cache = {} @@ -136,24 +190,41 @@ def refresh_axon_cache(block_timestamp, block_hash, block_number): def batch(iterable, n=1): + if not iterable: + return [] length = len(iterable) for i in range(0, length, n): yield iterable[i:i + n] def get_coldkeys_and_stakes(hotkeys, block_timestamp, block_hash, block_number): + if not hotkeys: + raise ShovelProcessingError("Empty hotkeys list provided") + global coldkey_stake_cache global stake_map_synced_block global hotkey_owner_map_synced_block - while (hotkey_owner_map_synced_block < block_number or stake_map_synced_block < block_number): - if hotkey_owner_map_synced_block > -1 and stake_map_synced_block > -1: - print("Waiting for stake_double_map and hotkey_owner_map tables to sync...") - time.sleep(1) - stake_map_synced_block = get_clickhouse_client().execute( - stake_map_synced_block_query)[0][0] - hotkey_owner_map_synced_block = get_clickhouse_client().execute( - hotkey_owner_map_synced_block_query)[0][0] + try: + while (hotkey_owner_map_synced_block < block_number or stake_map_synced_block < block_number): + if hotkey_owner_map_synced_block > -1 and stake_map_synced_block > -1: + logging.info("Waiting for stake_double_map and hotkey_owner_map tables to sync...") + time.sleep(1) + try: + stake_result = get_clickhouse_client().execute(stake_map_synced_block_query) + hotkey_result = get_clickhouse_client().execute(hotkey_owner_map_synced_block_query) + + if not stake_result or not hotkey_result: + raise DatabaseConnectionError("No checkpoint found for stake_map or hotkey_owner_map") + + stake_map_synced_block = stake_result[0][0] + hotkey_owner_map_synced_block = hotkey_result[0][0] + except Exception as e: + raise DatabaseConnectionError(f"Failed to query dependency sync status: {str(e)}") + except Exception as e: + if isinstance(e, DatabaseConnectionError): + raise + raise DatabaseConnectionError(f"Failed to check dependency sync status: {str(e)}") need_to_query = [] for hotkey in hotkeys: @@ -165,71 +236,114 @@ def get_coldkeys_and_stakes(hotkeys, block_timestamp, block_hash, block_number): block_timestamps = set() block_timestamps.add(block_timestamp) if len(need_to_query) > 0: - clickhouse = get_clickhouse_client() - batch_size = 1000 - responses = [] - dt_object = datetime.fromtimestamp(block_timestamp) - formatted_date = dt_object.strftime("%Y-%m-%d %H:%M:%S") - print( - f"Quering {len(need_to_query) / - batch_size} batches of coldkeys and stakes" - ) - for hotkey_batch in tqdm(batch(need_to_query, batch_size)): - hotkeys_list = "', '".join(hotkey_batch) - hotkeys_str = f"('{hotkeys_list}')" - query = f""" - SELECT - timestamp, - hotkey, - coldkey, - stake - FROM shovel_hotkey_owner_map AS o - INNER JOIN shovel_stake_double_map AS s - ON o.timestamp = s.timestamp AND o.coldkey = s.coldkey AND o.hotkey = s.hotkey - WHERE hotkey IN {hotkeys_str} - AND timestamp >= '{formatted_date}' AND timestamp < addMinutes('{formatted_date}', 30) - """ - r = clickhouse.execute(query) - responses.extend(r) - for response in responses: - timestamp = int(response[0].timestamp()) - block_timestamps.add(timestamp) - coldkey_stake_cache[(timestamp, response[1])] = ( - response[2], response[3] + try: + clickhouse = get_clickhouse_client() + batch_size = 1000 + responses = [] + dt_object = datetime.fromtimestamp(block_timestamp) + formatted_date = dt_object.strftime("%Y-%m-%d %H:%M:%S") + logging.info( + f"Querying {len(need_to_query) / batch_size} batches of coldkeys and stakes" ) + for hotkey_batch in tqdm(batch(need_to_query, batch_size)): + if not hotkey_batch: + continue + + hotkeys_list = "', '".join(hotkey_batch) + hotkeys_str = f"('{hotkeys_list}')" + query = f""" + SELECT + timestamp, + hotkey, + coldkey, + stake + FROM shovel_hotkey_owner_map AS o + INNER JOIN shovel_stake_double_map AS s + ON o.timestamp = s.timestamp AND o.coldkey = s.coldkey AND o.hotkey = s.hotkey + WHERE hotkey IN {hotkeys_str} + AND timestamp >= '{formatted_date}' AND timestamp < addMinutes('{formatted_date}', 30) + """ + try: + r = clickhouse.execute(query) + if r is None: + raise DatabaseConnectionError("Received None response from coldkeys and stakes query") + responses.extend(r) + except Exception as e: + raise DatabaseConnectionError(f"Failed to execute coldkeys and stakes query: {str(e)}") + except Exception as e: + if isinstance(e, DatabaseConnectionError): + raise + raise DatabaseConnectionError(f"Failed to query coldkeys and stakes: {str(e)}") + + for response in responses: + try: + if len(response) < 4: + raise ShovelProcessingError(f"Invalid response data: expected 4 fields, got {len(response)}") + + timestamp = int(response[0].timestamp()) + hotkey = response[1] + coldkey = response[2] + stake = response[3] + + if not all(x is not None for x in [timestamp, hotkey, coldkey, stake]): + raise ShovelProcessingError(f"Invalid response data: contains None values") + + block_timestamps.add(timestamp) + coldkey_stake_cache[(timestamp, hotkey)] = (coldkey, stake) + except Exception as e: + raise ShovelProcessingError(f"Failed to process response data: {str(e)}") coldkeys_and_stakes = dict() for hotkey in hotkeys: - # handle a hotkey without a stake - if (block_timestamp, hotkey) not in coldkey_stake_cache: - coldkey = get_substrate_client().query( - "SubtensorModule", - "Owner", - block_hash=block_hash, - params=[hotkey], - ).value - stake = get_substrate_client().query( - "SubtensorModule", - "Stake", - block_hash=block_hash, - params=[hotkey, coldkey], - ) - if stake != 0: - print( - f"ERROR: Hotkey {hotkey} has stake { - stake} but not in Clickhouse" - ) - exit(0) - for t in block_timestamps: - coldkey_stake_cache[(t, hotkey)] = (coldkey, stake) - - # we should have stakes for all hotkeys now! - coldkey, stake = coldkey_stake_cache[(block_timestamp, hotkey)] - coldkeys_and_stakes[hotkey] = (coldkey, stake) + try: + # handle a hotkey without a stake + if (block_timestamp, hotkey) not in coldkey_stake_cache: + try: + coldkey_result = get_substrate_client().query( + "SubtensorModule", + "Owner", + block_hash=block_hash, + params=[hotkey], + ) + if coldkey_result is None or coldkey_result.value is None: + raise ShovelProcessingError(f"Failed to get coldkey for hotkey {hotkey}") + coldkey = coldkey_result.value + + stake = get_substrate_client().query( + "SubtensorModule", + "Stake", + block_hash=block_hash, + params=[hotkey, coldkey], + ) + if stake is None: + raise ShovelProcessingError(f"Failed to get stake for hotkey {hotkey}") + except Exception as e: + raise ShovelProcessingError(f"Failed to query substrate for hotkey {hotkey}: {str(e)}") + + if stake != 0: + logging.error( + f"ERROR: Hotkey {hotkey} has stake {stake} but not in Clickhouse" + ) + raise ShovelProcessingError(f"Inconsistent stake data for hotkey {hotkey}") + for t in block_timestamps: + coldkey_stake_cache[(t, hotkey)] = (coldkey, stake) + + # we should have stakes for all hotkeys now! + try: + coldkey, stake = coldkey_stake_cache[(block_timestamp, hotkey)] + if coldkey is None or stake is None: + raise ShovelProcessingError(f"Invalid cache data for hotkey {hotkey}") + coldkeys_and_stakes[hotkey] = (coldkey, stake) + except KeyError: + raise ShovelProcessingError(f"Missing cache data for hotkey {hotkey}") + except Exception as e: + if isinstance(e, (DatabaseConnectionError, ShovelProcessingError)): + raise + raise ShovelProcessingError(f"Failed to process hotkey {hotkey}: {str(e)}") # periodically clear out historical keys if len(coldkey_stake_cache) > 1_000_000: - print("Clearing historical coldkey_stake_cache") + logging.info("Clearing historical coldkey_stake_cache") deleted = 0 keys_to_delete = [] for (timestamp, hotkey) in coldkey_stake_cache.keys(): @@ -238,6 +352,6 @@ def get_coldkeys_and_stakes(hotkeys, block_timestamp, block_hash, block_number): for key in keys_to_delete: del coldkey_stake_cache[key] deleted += 1 - print(f"Deleted {deleted} keys, {len(coldkey_stake_cache)} remain") + logging.info(f"Deleted {deleted} keys, {len(coldkey_stake_cache)} remain") return coldkeys_and_stakes diff --git a/scraper_service/shovel_tao_price/cmc_client.py b/scraper_service/shovel_tao_price/cmc_client.py index b2c0b56..93f5ca9 100644 --- a/scraper_service/shovel_tao_price/cmc_client.py +++ b/scraper_service/shovel_tao_price/cmc_client.py @@ -2,86 +2,170 @@ import logging import os from datetime import datetime, timedelta +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError CMC_TAO_ID = 22974 CMC_TOKEN = os.getenv("CMC_TOKEN") FIRST_TAO_LISTING_DAY = datetime(2023, 3, 6) def fetch_cmc_data(params, endpoint): - url = f"https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/{endpoint}" - headers = { - 'Accepts': 'application/json', - 'X-CMC_PRO_API_KEY': CMC_TOKEN - } + try: + if not CMC_TOKEN: + raise ShovelProcessingError("CMC_TOKEN is not set") + + url = f"https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/{endpoint}" + headers = { + 'Accepts': 'application/json', + 'X-CMC_PRO_API_KEY': CMC_TOKEN + } + + try: + response = requests.get(url, headers=headers, params=params, timeout=30) # Add timeout + except requests.Timeout: + raise ShovelProcessingError("CMC API request timed out") + except requests.ConnectionError: + raise ShovelProcessingError("Failed to connect to CMC API") + + # Handle rate limiting explicitly + if response.status_code == 429: + raise ShovelProcessingError("CMC API rate limit exceeded") - response = requests.get(url, headers=headers, params=params) - return response.json(), response.status_code + # Handle other common error codes + if response.status_code == 401: + raise ShovelProcessingError("Invalid CMC API key") + elif response.status_code == 403: + raise ShovelProcessingError("CMC API access forbidden") + elif response.status_code >= 500: + raise ShovelProcessingError(f"CMC API server error: {response.status_code}") + elif response.status_code != 200: + raise ShovelProcessingError(f"CMC API request failed with status code: {response.status_code}") + + try: + data = response.json() + except ValueError: + raise ShovelProcessingError("Failed to parse CMC API response as JSON") + + # Check for API-level errors + if 'status' in data and 'error_code' in data['status'] and data['status']['error_code'] != 0: + error_message = data['status'].get('error_message', 'Unknown API error') + raise ShovelProcessingError(f"CMC API error: {error_message}") + + return data, response.status_code + except requests.exceptions.RequestException as e: + raise ShovelProcessingError(f"Failed to make CMC API request: {str(e)}") + except Exception as e: + raise ShovelProcessingError(f"Unexpected error in CMC API request: {str(e)}") def get_price_by_time(timestamp): - logging.info(f"Getting price for timestamp {timestamp}") - - # Calculate the time 48 hours ago from now - time_48_hours_ago = datetime.now() - timedelta(hours=48) - logging.info(f"48 hours ago: {time_48_hours_ago}") - - # Determine the interval based on the timestamp - timestamp_dt = datetime.fromtimestamp(timestamp) - logging.info(f"Timestamp as datetime: {timestamp_dt}") - - if timestamp_dt > time_48_hours_ago: - interval = '5m' - logging.info("Using 5m interval (within last 48 hours)") - else: - interval = '24h' - logging.info("Using 24h interval (older than 48 hours)") - - parameters = { - 'id': CMC_TAO_ID, - 'convert': 'USD', - 'interval': interval, - 'time_start': timestamp, - 'count': 1 - } - logging.info(f"Request parameters: {parameters}") + if timestamp is None or timestamp <= 0: + raise ShovelProcessingError("Invalid timestamp provided") try: - logging.info("Fetching data from CMC API...") + # Calculate the time 48 hours ago from now + time_48_hours_ago = datetime.now() - timedelta(hours=48) + logging.debug(f"48 hours ago: {time_48_hours_ago}") + + # Determine the interval based on the timestamp + timestamp_dt = datetime.fromtimestamp(timestamp) + logging.debug(f"Timestamp as datetime: {timestamp_dt}") + + # Validate timestamp is not before TAO listing + if timestamp_dt < FIRST_TAO_LISTING_DAY: + raise ShovelProcessingError(f"Timestamp {timestamp_dt} is before TAO listing date {FIRST_TAO_LISTING_DAY}") + + if timestamp_dt > time_48_hours_ago: + interval = '5m' + logging.debug("Using 5m interval (within last 48 hours)") + else: + interval = '24h' + logging.debug("Using 24h interval (older than 48 hours)") + + parameters = { + 'id': CMC_TAO_ID, + 'convert': 'USD', + 'interval': interval, + 'time_start': timestamp, + 'count': 1 + } + logging.debug(f"Request parameters: {parameters}") + data, status_code = fetch_cmc_data(parameters, 'historical') - logging.info(f"Got response with status code: {status_code}") - except Exception as e: - logging.error("Error fetching CMC data: %s", str(e)) - logging.error("Full exception:", exc_info=True) - return None + logging.debug(f"Got response with status code: {status_code}") + + if 'data' not in data: + raise ShovelProcessingError(f"Invalid CMC API response: missing 'data' field") + if 'quotes' not in data['data']: + raise ShovelProcessingError(f"Invalid CMC API response: missing 'quotes' field") + if not data['data']['quotes']: + raise ShovelProcessingError(f"No price data available for timestamp {timestamp}") - if status_code == 200 and 'data' in data and 'quotes' in data['data']: - logging.info("Successfully parsed response data") quote = data['data']['quotes'][0] + if 'quote' not in quote or 'USD' not in quote['quote']: + raise ShovelProcessingError(f"Invalid CMC API response: missing USD quote data") + usd_quote = quote['quote']['USD'] + required_fields = ['price', 'market_cap', 'volume_24h'] + for field in required_fields: + if field not in usd_quote: + raise ShovelProcessingError(f"Invalid CMC API response: missing {field} field") + if usd_quote[field] is None: + raise ShovelProcessingError(f"Invalid CMC API response: {field} is None") + price = usd_quote['price'] market_cap = usd_quote['market_cap'] volume = usd_quote['volume_24h'] - logging.info(f"Returning price={price}, market_cap={market_cap}, volume={volume}") + + # Validate values + if price < 0 or market_cap < 0 or volume < 0: + raise ShovelProcessingError(f"Invalid negative values in price data: price={price}, market_cap={market_cap}, volume={volume}") + + logging.debug(f"Returning price={price}, market_cap={market_cap}, volume={volume}") return price, market_cap, volume - else: - logging.error("Failed to fetch TAO price with parameters %s: %s", parameters, data.get('status', {}).get('error_message', 'Unknown error')) - logging.error(f"Full response data: {data}") - return None + + except ShovelProcessingError: + raise + except Exception as e: + raise ShovelProcessingError(f"Failed to get price data: {str(e)}") def get_latest_price(): - parameters = { - 'id': CMC_TAO_ID, - 'convert': 'USD' - } + try: + parameters = { + 'id': CMC_TAO_ID, + 'convert': 'USD' + } + + data, status_code = fetch_cmc_data(parameters, 'latest') - data, status_code = fetch_cmc_data(parameters, 'latest') + if 'data' not in data: + raise ShovelProcessingError(f"Invalid CMC API response: missing 'data' field") + + tao_id_str = str(CMC_TAO_ID) + if tao_id_str not in data['data']: + raise ShovelProcessingError(f"No data available for TAO (ID: {CMC_TAO_ID})") + + tao_data = data['data'][tao_id_str] + if 'quote' not in tao_data or 'USD' not in tao_data['quote']: + raise ShovelProcessingError(f"Invalid CMC API response: missing USD quote data") - if status_code == 200 and 'data' in data and str(CMC_TAO_ID) in data['data']: - tao_data = data['data'][str(CMC_TAO_ID)] usd_quote = tao_data['quote']['USD'] + required_fields = ['price', 'market_cap', 'volume_24h'] + for field in required_fields: + if field not in usd_quote: + raise ShovelProcessingError(f"Invalid CMC API response: missing {field} field") + if usd_quote[field] is None: + raise ShovelProcessingError(f"Invalid CMC API response: {field} is None") + price = usd_quote['price'] market_cap = usd_quote['market_cap'] volume = usd_quote['volume_24h'] + + # Validate values + if price < 0 or market_cap < 0 or volume < 0: + raise ShovelProcessingError(f"Invalid negative values in price data: price={price}, market_cap={market_cap}, volume={volume}") + return price, market_cap, volume - else: - logging.error("Failed to fetch latest TAO price: %s", data.get('status', {}).get('error_message', 'Unknown error')) - return None, None, None + + except ShovelProcessingError: + raise + except Exception as e: + raise ShovelProcessingError(f"Failed to get latest price data: {str(e)}") diff --git a/scraper_service/shovel_tao_price/main.py b/scraper_service/shovel_tao_price/main.py index ff17d97..9507ea5 100644 --- a/scraper_service/shovel_tao_price/main.py +++ b/scraper_service/shovel_tao_price/main.py @@ -3,15 +3,16 @@ from cmc_client import get_price_by_time, CMC_TOKEN from shared.clickhouse.batch_insert import buffer_insert from shared.shovel_base_class import ShovelBaseClass -from shared.substrate import get_substrate_client +from shared.substrate import get_substrate_client, reconnect_substrate from shared.clickhouse.utils import ( get_clickhouse_client, table_exists, ) -from tenacity import retry, wait_fixed +from shared.exceptions import DatabaseConnectionError, ShovelProcessingError +from shared.block_metadata import get_block_metadata BLOCKS_A_DAY = (24 * 60 * 60) / 12 -FETCH_EVERY_N_BLOCKS = (60 * 5) / 12; +FETCH_EVERY_N_BLOCKS = (60 * 5) / 12 # After this block change the interval from daily to every 5 mins THRESHOLD_BLOCK = 4249779 @@ -22,93 +23,87 @@ class TaoPriceShovel(ShovelBaseClass): table_name = "shovel_tao_price" - starting_block=2137 + starting_block = 2137 def process_block(self, n): - logging.info("0. processing block...") - # `skip_interval` has a hiccup sometimes - # for unknown reasons and its not elastic - # enough to handle conditions - if n > THRESHOLD_BLOCK: - logging.info("1") - if n % FETCH_EVERY_N_BLOCKS != 0: - logging.info("2") - return - else: - logging.info("3") - if n % BLOCKS_A_DAY != 0: - logging.info("4") - return - logging.info("5. processing.") - do_process_block(n, self.table_name) - -@retry( - wait=wait_fixed(30), -) -def do_process_block(n, table_name): - logging.info(f"starting block {n}") - substrate = get_substrate_client() - - logging.info("got substrate client") - if not table_exists(table_name): - logging.info("no table....") - first_run(table_name) - - logging.info("got table") - block_hash = substrate.get_block_hash(n) - logging.info(f"getting block hash {block_hash}") - block_timestamp = int( - substrate.query( - "Timestamp", - "Now", - block_hash=block_hash, - ).serialize() - / 1000 - ) - - logging.info(f"block timestamp {block_timestamp}") - - if block_timestamp == 0: - logging.info(f"timestamp is 0") - return - - logging.info("getting price") - latest_price_data = get_price_by_time(block_timestamp) - logging.info("got price data") - - if latest_price_data: - logging.info("and it is here") - buffer_insert(table_name, [block_timestamp, *latest_price_data]) - else: - raise Exception("Rate limit error encountered. Waiting before retrying...") + try: + # `skip_interval` has a hiccup sometimes + # for unknown reasons and its not elastic + # enough to handle conditions + if n > THRESHOLD_BLOCK: + if n % FETCH_EVERY_N_BLOCKS != 0: + return + else: + if n % BLOCKS_A_DAY != 0: + return + do_process_block(n, self.table_name) + except Exception as e: + if isinstance(e, (DatabaseConnectionError, ShovelProcessingError)): + raise + raise ShovelProcessingError(f"Failed to process block {n}: {str(e)}") +def do_process_block(n, table_name): + try: + # Create table if it doesn't exist + try: + if not table_exists(table_name): + first_run(table_name) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create/verify table: {str(e)}") + + try: + (block_timestamp, block_hash) = get_block_metadata(n) + if block_timestamp == 0: + raise ShovelProcessingError(f"Invalid block timestamp 0 for block {n}") + except Exception as e: + raise ShovelProcessingError(f"Failed to get block metadata: {str(e)}") + + try: + latest_price_data = get_price_by_time(block_timestamp) + if latest_price_data is None: + raise ShovelProcessingError("Failed to fetch price data from CMC") + except Exception as e: + if isinstance(e, ShovelProcessingError): + raise + raise ShovelProcessingError(f"Failed to get price data: {str(e)}") + + try: + buffer_insert(table_name, [block_timestamp, *latest_price_data]) + except Exception as e: + raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}") + + except (DatabaseConnectionError, ShovelProcessingError): + # Re-raise these exceptions to be handled by the base class + raise + except Exception as e: + # Catch any other unexpected errors and wrap them + raise ShovelProcessingError(f"Unexpected error in do_process_block: {str(e)}") def first_run(table_name): - query = f""" - CREATE TABLE IF NOT EXISTS {table_name} ( - timestamp DateTime CODEC(Delta, ZSTD), - price Float64 CODEC(ZSTD), - market_cap Float64 CODEC(ZSTD), - volume Float64 CODEC(ZSTD) - ) ENGINE = ReplacingMergeTree() - PARTITION BY toYYYYMM(timestamp) - ORDER BY timestamp - """ - get_clickhouse_client().execute(query) + try: + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + timestamp DateTime CODEC(Delta, ZSTD), + price Float64 CODEC(ZSTD), + market_cap Float64 CODEC(ZSTD), + volume Float64 CODEC(ZSTD) + ) ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY timestamp + """ + get_clickhouse_client().execute(query) + except Exception as e: + raise DatabaseConnectionError(f"Failed to create table: {str(e)}") def main(): if not CMC_TOKEN: - logging.error("CMC_TOKEN is not set. Doing nothing...") - else: - TaoPriceShovel(name="tao_price").start() + logging.error("CMC_TOKEN is not set") + return + TaoPriceShovel(name="tao_price").start() if __name__ == "__main__": - try: - main() - except Exception as e: - logging.error("An error occurred: %s", e) - exit(1) + main()