From 6fae391d5421b76ccec28e4c33a37b12a2126e78 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Wed, 10 Jul 2024 11:54:54 +0000 Subject: [PATCH 1/8] Rewrite AppData sync using csv upload feat --- src/fetch/dune.py | 5 +- src/fetch/orderbook.py | 11 +++ src/main.py | 17 +--- src/sql/app_hash_latest_block.sql | 5 - src/sql/app_hashes.sql | 25 +---- src/sync/app_data.py | 153 +++--------------------------- src/sync/config.py | 16 ++-- tests/e2e/test_sync_app_data.py | 87 ++++++----------- 8 files changed, 72 insertions(+), 247 deletions(-) delete mode 100644 src/sql/app_hash_latest_block.sql diff --git a/src/fetch/dune.py b/src/fetch/dune.py index 450ac481..48df2d60 100644 --- a/src/fetch/dune.py +++ b/src/fetch/dune.py @@ -25,13 +25,12 @@ class DuneFetcher: def __init__( self, - api_key: str, + dune: DuneClient, ) -> None: """ Class constructor. - Builds DuneClient from `api_key` along with a logger and FileIO object. """ - self.dune = DuneClient(api_key) + self.dune = dune async def fetch(self, query: QueryBase) -> list[DuneRecord]: """Async Dune Fetcher with some exception handling.""" diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index 5067c7b5..82eb4e1b 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -151,3 +151,14 @@ def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame: if not barn.empty: return barn.copy() return pd.DataFrame() + + @classmethod + def get_app_hashes(cls) -> DataFrame: + """ + Fetches all appData hashes and preimages from Prod and Staging DB + """ + app_data_query = open_query("app_hashes.sql") + barn, prod = cls._query_both_dbs(app_data_query, app_data_query) + + # We are only interested in unique app data + return pd.concat([prod, barn]).drop_duplicates().reset_index(drop=True) diff --git a/src/main.py b/src/main.py index 7b7c8cea..ed47e1e1 100644 --- a/src/main.py +++ b/src/main.py @@ -6,8 +6,8 @@ from pathlib import Path from dotenv import load_dotenv +from dune_client.client import DuneClient -from src.fetch.dune import DuneFetcher from src.fetch.orderbook import OrderbookFetcher from src.logger import set_log from src.models.tables import SyncTable @@ -50,21 +50,14 @@ def __init__(self) -> None: volume_path = Path(os.environ["VOLUME_PATH"]) args = ScriptArgs() aws = AWSClient.new_from_environment() + dune = DuneClient(os.environ["DUNE_API_KEY"]) if args.sync_table == SyncTable.APP_DATA: asyncio.run( sync_app_data( - aws, - dune=DuneFetcher(os.environ["DUNE_API_KEY"]), - config=AppDataSyncConfig( - volume_path=volume_path, - missing_files_name="missing_app_hashes.json", - max_retries=int(os.environ.get("APP_DATA_MAX_RETRIES", 3)), - give_up_threshold=int( - os.environ.get("APP_DATA_GIVE_UP_THRESHOLD", 100) - ), - ), - ipfs_access_key=os.environ["IPFS_ACCESS_KEY"], + orderbook=OrderbookFetcher(), + dune=dune, + config=AppDataSyncConfig(), dry_run=args.dry_run, ) ) diff --git a/src/sql/app_hash_latest_block.sql b/src/sql/app_hash_latest_block.sql deleted file mode 100644 index e6396113..00000000 --- a/src/sql/app_hash_latest_block.sql +++ /dev/null @@ -1,5 +0,0 @@ --- https://dune.com/queries/1615490 -select - max(call_block_number) as latest_block -from - gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle \ No newline at end of file diff --git a/src/sql/app_hashes.sql b/src/sql/app_hashes.sql index daef7452..17bdbf7c 100644 --- a/src/sql/app_hashes.sql +++ b/src/sql/app_hashes.sql @@ -1,21 +1,6 @@ --- App Hashes: https://dune.com/queries/1610025 --- MIN(first_block_seen) = 12153263 --- Nov 16, 2022: Query takes 4 seconds to run for on full block range -with -app_hashes as ( - select - min(call_block_number) first_seen_block, - get_json_object(trade, '$.appData') as app_hash - from gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle - lateral view explode(trades) as trade - group by app_hash -) -select - app_hash, - first_seen_block -from app_hashes -where first_seen_block > '{{BlockFrom}}' -and first_seen_block <= '{{BlockTo}}' +-- Selects all known appData hashes and preimages (as string) from the backend database --- For some additional stats, --- on this data see https://dune.com/queries/1608286 \ No newline at end of file +SELECT + concat('0x',encode(contract_app_data, 'hex')) contract_app_data, + encode(full_app_data, 'escape') +FROM app_data diff --git a/src/sync/app_data.py b/src/sync/app_data.py index 8f5f143d..8953666d 100644 --- a/src/sync/app_data.py +++ b/src/sync/app_data.py @@ -1,154 +1,27 @@ """Main Entry point for app_hash sync""" -from dune_client.file.interface import FileIO -from dune_client.types import DuneRecord +from dune_client.client import DuneClient -from src.fetch.dune import DuneFetcher -from src.fetch.ipfs import Cid +from src.fetch.orderbook import OrderbookFetcher from src.logger import set_log -from src.models.app_data_content import FoundContent, NotFoundContent -from src.models.block_range import BlockRange -from src.models.tables import SyncTable -from src.post.aws import AWSClient -from src.sync.common import last_sync_block -from src.sync.config import SyncConfig, AppDataSyncConfig -from src.sync.record_handler import RecordHandler -from src.sync.upload_handler import UploadHandler +from src.sync.config import AppDataSyncConfig log = set_log(__name__) -SYNC_TABLE = SyncTable.APP_DATA - - -class AppDataHandler(RecordHandler): # pylint:disable=too-many-instance-attributes - """ - This class is responsible for consuming new dune records and missing values from previous runs - it attempts to fetch content for them and filters them into "found" and "not found" as necessary - """ - - def __init__( # pylint:disable=too-many-arguments - self, - file_manager: FileIO, - new_rows: list[DuneRecord], - block_range: BlockRange, - config: SyncConfig, - ipfs_access_key: str, - missing_file_name: str, - ): - super().__init__(block_range, SYNC_TABLE, config) - self.file_manager = file_manager - self.ipfs_access_key = ipfs_access_key - - self._found: list[FoundContent] = [] - self._not_found: list[NotFoundContent] = [] - - self.new_rows = new_rows - self.missing_file_name = missing_file_name - try: - self.missing_values = self.file_manager.load_ndjson(missing_file_name) - except FileNotFoundError: - self.missing_values = [] - - def num_records(self) -> int: - assert len(self.new_rows) == 0, ( - "this function call is not allowed until self.new_rows have been processed! " - "call fetch_content_and_filter first" - ) - return len(self._found) - - async def _handle_new_records(self, max_retries: int) -> None: - # Drain the dune_results into "found" and "not found" categories - self._found, self._not_found = await Cid.fetch_many( - self.new_rows, self.ipfs_access_key, max_retries - ) - - async def _handle_missing_records( - self, max_retries: int, give_up_threshold: int - ) -> None: - found, not_found = await Cid.fetch_many( - self.missing_values, self.ipfs_access_key, max_retries - ) - while found: - self._found.append(found.pop()) - while not_found: - row = not_found.pop() - app_hash, attempts = row.app_hash, row.attempts - if attempts > give_up_threshold: - log.debug( - f"No content found after {attempts} attempts for {app_hash} assuming NULL." - ) - self._found.append( - FoundContent( - app_hash=app_hash, - first_seen_block=row.first_seen_block, - content={}, - ) - ) - else: - self._not_found.append(row) - - def write_found_content(self) -> None: - assert len(self.new_rows) == 0, "Must call _handle_new_records first!" - self.file_manager.write_ndjson( - data=[x.as_dune_record() for x in self._found], name=self.content_filename - ) - # When not_found is empty, we want to overwrite the file (hence skip_empty=False) - # This happens when number of attempts exceeds GIVE_UP_THRESHOLD - self.file_manager.write_ndjson( - data=[x.as_dune_record() for x in self._not_found], - name=self.missing_file_name, - skip_empty=False, - ) - - def write_sync_data(self) -> None: - # Only write these if upload was successful. - self.file_manager.write_csv( - data=[{self.config.sync_column: str(self.block_range.block_to)}], - name=self.config.sync_file, - ) - - async def fetch_content_and_filter( - self, max_retries: int, give_up_threshold: int - ) -> None: - """ - Run loop fetching app_data for hashes, - separates into (found and not found), returning the pair. - """ - await self._handle_new_records(max_retries) - log.info( - f"Attempting to recover missing {len(self.missing_values)} records from previous run" - ) - await self._handle_missing_records(max_retries, give_up_threshold) - - async def sync_app_data( - aws: AWSClient, - dune: DuneFetcher, + orderbook: OrderbookFetcher, + dune: DuneClient, config: AppDataSyncConfig, - ipfs_access_key: str, dry_run: bool, ) -> None: """App Data Sync Logic""" - block_range = BlockRange( - block_from=last_sync_block( - aws, - table=SYNC_TABLE, - genesis_block=12153262, # First App Hash Block - ), - block_to=await dune.latest_app_hash_block(), - ) - - data_handler = AppDataHandler( - file_manager=FileIO(config.volume_path / str(SYNC_TABLE)), - new_rows=await dune.get_app_hashes(block_range), - block_range=block_range, - config=config, - ipfs_access_key=ipfs_access_key, - missing_file_name=config.missing_files_name, - ) - await data_handler.fetch_content_and_filter( - max_retries=config.max_retries, give_up_threshold=config.give_up_threshold - ) - UploadHandler(aws, data_handler, table=SYNC_TABLE).write_and_upload_content(dry_run) + hashes = orderbook.get_app_hashes() + if not dry_run: + dune.upload_csv( + data=hashes.to_csv(index=False), + table_name=config.table, + description=config.description, + is_private=False, + ) log.info("app_data sync run completed successfully") diff --git a/src/sync/config.py b/src/sync/config.py index bbd663c6..829b5313 100644 --- a/src/sync/config.py +++ b/src/sync/config.py @@ -19,12 +19,12 @@ class SyncConfig: @dataclass -class AppDataSyncConfig(SyncConfig): - """Additional data field for app data sync.""" +class AppDataSyncConfig: + """Configuration for app data sync.""" - # Maximum number of retries on a single run - max_retries: int = 3 - # Total number of accumulated attempts before we assume no content - give_up_threshold: int = 100 - # Persisted file where we store the missing results and number of attempts. - missing_files_name: str = "missing_app_hashes.json" + # The name of the table to upload to + table: str = "app_data_test" + # Description of the table (for creation) + description: str = ( + "Table containing known CoW Protocol appData hashes and their pre-images" + ) diff --git a/tests/e2e/test_sync_app_data.py b/tests/e2e/test_sync_app_data.py index 2591d171..c1bc2a4a 100644 --- a/tests/e2e/test_sync_app_data.py +++ b/tests/e2e/test_sync_app_data.py @@ -1,81 +1,50 @@ import os -import shutil import unittest -from pathlib import Path +import time + from unittest import IsolatedAsyncioTestCase from dotenv import load_dotenv -from dune_client.file.interface import FileIO +from dune_client.client import DuneClient from src.fetch.dune import DuneFetcher -from src.models.block_range import BlockRange -from src.sync.app_data import AppDataHandler, SYNC_TABLE +from src.fetch.orderbook import OrderbookFetcher +from src.sync import sync_app_data from src.sync.config import AppDataSyncConfig - class TestSyncAppData(IsolatedAsyncioTestCase): def setUp(self) -> None: load_dotenv() - self.dune = DuneFetcher(os.environ["DUNE_API_KEY"]) - self.config = AppDataSyncConfig( - volume_path=Path(os.environ["VOLUME_PATH"]), - missing_files_name="missing_app_hashes.json", - max_retries=2, - give_up_threshold=3, + self.dune = DuneClient(os.environ["DUNE_API_KEY"]) + self.fetcher = DuneFetcher(self.dune) + self.namespace = "cowprotocol" + self.config = AppDataSyncConfig(table="app_data_test") + self.query = self.dune.create_query( + name="Fetch app data hashes for test", + query_sql=f"SELECT * FROM dune.{self.namespace}.dataset_{self.config.table}", ) - self.file_manager = FileIO(self.config.volume_path / str(SYNC_TABLE)) def tearDown(self) -> None: - shutil.rmtree(self.config.volume_path) - - async def test_fetch_content_and_filter(self): - retries = self.config.max_retries - give_up = self.config.give_up_threshold - missing_files = self.config.missing_files_name - # block numbers - a, b, c = 15582187, 16082187, 16100000 - self.assertTrue(a < b < c) - - block_range_1 = BlockRange( - block_from=a, - block_to=b, - ) - data_handler = AppDataHandler( - file_manager=self.file_manager, - new_rows=await self.dune.get_app_hashes(block_range_1), - block_range=block_range_1, - config=self.config, - missing_file_name=missing_files, - ipfs_access_key=os.environ["IPFS_ACCESS_KEY"], + self.dune.delete_table( + namespace=self.namespace, + table_name="dataset_" + self.config.table ) + self.dune.archive_query(self.query.base.query_id) - print(f"Beginning Content Fetching on {len(data_handler.new_rows)} records") - await data_handler.fetch_content_and_filter(retries, give_up) - data_handler.write_found_content() - self.assertEqual(0, len(data_handler.new_rows)) - - block_range_2 = BlockRange( - block_from=b, - block_to=c, - ) - data_handler = AppDataHandler( - file_manager=self.file_manager, - new_rows=await self.dune.get_app_hashes(block_range_2), - block_range=block_range_2, + async def test_fetch_content_and_filter(self): + print("Beginning Sync") + await sync_app_data( + orderbook=OrderbookFetcher(), + dune=self.dune, config=self.config, - missing_file_name=missing_files, - ipfs_access_key=os.environ["IPFS_ACCESS_KEY"], + dry_run=False, ) - print( - f"Beginning Second Run Content Fetching on {len(data_handler.new_rows)} records" - ) - await data_handler.fetch_content_and_filter(retries, give_up) - data_handler.write_found_content() - - self.assertEqual(0, len(data_handler.new_rows)) - # Two runs with retries = 2 and give_up = 3 implies no more missing records. - self.assertEqual(0, len(data_handler._not_found)) - + print("Finished Sync") + # Wait some time for the table to be exposed + time.sleep(5) + result = self.dune.run_query(self.query.base).result.rows + print(f"Found {len(result)} results") + self.assertGreater(len(result), 0) if __name__ == "__main__": unittest.main() From 632a84e52e982af5265b31d33bf47f7d1d90ee19 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Wed, 10 Jul 2024 12:07:00 +0000 Subject: [PATCH 2/8] derive dune table name from database --- src/fetch/orderbook.py | 9 +++++++++ src/main.py | 9 +++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index 82eb4e1b..eea6c242 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -62,6 +62,15 @@ def _query_both_dbs( prod = cls._read_query_for_env(query_prod, OrderbookEnv.PROD, data_types) return barn, prod + @classmethod + def database(cls) -> Engine: + """Returns a the current database name""" + query = "SELECT current_database()" + barn, prod = cls._query_both_dbs(query, query) + assert barn.current_database[0] == prod.current_database[0], "Expecting databases to match" + + return prod.current_database[0] + @classmethod def get_latest_block(cls) -> int: """ diff --git a/src/main.py b/src/main.py index ed47e1e1..4ed3cafe 100644 --- a/src/main.py +++ b/src/main.py @@ -51,13 +51,14 @@ def __init__(self) -> None: args = ScriptArgs() aws = AWSClient.new_from_environment() dune = DuneClient(os.environ["DUNE_API_KEY"]) + orderbook = OrderbookFetcher() if args.sync_table == SyncTable.APP_DATA: asyncio.run( sync_app_data( - orderbook=OrderbookFetcher(), + orderbook, dune=dune, - config=AppDataSyncConfig(), + config=AppDataSyncConfig(table=f'app_data_{orderbook.database()}'), dry_run=args.dry_run, ) ) @@ -65,14 +66,14 @@ def __init__(self) -> None: sync_order_rewards( aws, config=SyncConfig(volume_path), - fetcher=OrderbookFetcher(), + fetcher=orderbook, dry_run=args.dry_run, ) elif args.sync_table == SyncTable.BATCH_REWARDS: sync_batch_rewards( aws, config=SyncConfig(volume_path), - fetcher=OrderbookFetcher(), + fetcher=orderbook, dry_run=args.dry_run, ) else: From 43b79ed5e1064a4c60ebf04d548f290e0dacd1aa Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Wed, 10 Jul 2024 12:21:07 +0000 Subject: [PATCH 3/8] more explicit approach to specify target table --- .env.sample | 3 +++ src/fetch/orderbook.py | 9 --------- src/main.py | 4 +++- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/.env.sample b/.env.sample index 84ce0dd2..bdc916f3 100644 --- a/.env.sample +++ b/.env.sample @@ -19,5 +19,8 @@ AWS_BUCKET= BARN_DB_URL={user}:{password}@{host}:{port}/{database} PROD_DB_URL={user}:{password}@{host}:{port}/{database} +#Target table for app data sync +APP_DATA_TARGET_TABLE=app_data_mainnet + # IPFS Gateway IPFS_ACCESS_KEY= diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index eea6c242..32b0c807 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -61,15 +61,6 @@ def _query_both_dbs( barn = cls._read_query_for_env(query_barn, OrderbookEnv.BARN, data_types) prod = cls._read_query_for_env(query_prod, OrderbookEnv.PROD, data_types) return barn, prod - - @classmethod - def database(cls) -> Engine: - """Returns a the current database name""" - query = "SELECT current_database()" - barn, prod = cls._query_both_dbs(query, query) - assert barn.current_database[0] == prod.current_database[0], "Expecting databases to match" - - return prod.current_database[0] @classmethod def get_latest_block(cls) -> int: diff --git a/src/main.py b/src/main.py index 4ed3cafe..ab953467 100644 --- a/src/main.py +++ b/src/main.py @@ -54,11 +54,13 @@ def __init__(self) -> None: orderbook = OrderbookFetcher() if args.sync_table == SyncTable.APP_DATA: + table = os.environ["APP_DATA_TARGET_TABLE"] + assert table, "APP_DATA sync needs a DUNE_NETWORK_NAME env" asyncio.run( sync_app_data( orderbook, dune=dune, - config=AppDataSyncConfig(table=f'app_data_{orderbook.database()}'), + config=AppDataSyncConfig(table), dry_run=args.dry_run, ) ) From c95fbbc7000e74b2277d5ae6ee7c7092fc5bae29 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Wed, 10 Jul 2024 14:23:21 +0000 Subject: [PATCH 4/8] lint --- src/fetch/orderbook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index 32b0c807..82eb4e1b 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -61,7 +61,7 @@ def _query_both_dbs( barn = cls._read_query_for_env(query_barn, OrderbookEnv.BARN, data_types) prod = cls._read_query_for_env(query_prod, OrderbookEnv.PROD, data_types) return barn, prod - + @classmethod def get_latest_block(cls) -> int: """ From 63d7b0a95de3123840304818bf7be519c9feedc1 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Wed, 10 Jul 2024 14:36:58 +0000 Subject: [PATCH 5/8] run black --- tests/e2e/test_sync_app_data.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/e2e/test_sync_app_data.py b/tests/e2e/test_sync_app_data.py index c1bc2a4a..7a7f7cd6 100644 --- a/tests/e2e/test_sync_app_data.py +++ b/tests/e2e/test_sync_app_data.py @@ -12,6 +12,7 @@ from src.sync import sync_app_data from src.sync.config import AppDataSyncConfig + class TestSyncAppData(IsolatedAsyncioTestCase): def setUp(self) -> None: load_dotenv() @@ -26,8 +27,7 @@ def setUp(self) -> None: def tearDown(self) -> None: self.dune.delete_table( - namespace=self.namespace, - table_name="dataset_" + self.config.table + namespace=self.namespace, table_name="dataset_" + self.config.table ) self.dune.archive_query(self.query.base.query_id) @@ -46,5 +46,6 @@ async def test_fetch_content_and_filter(self): print(f"Found {len(result)} results") self.assertGreater(len(result), 0) + if __name__ == "__main__": unittest.main() From e63d961ac96b1a431203fde0eb7c93f2fffb527c Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Thu, 11 Jul 2024 13:42:08 +0000 Subject: [PATCH 6/8] remove DuneFetcher --- tests/e2e/test_sync_app_data.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/e2e/test_sync_app_data.py b/tests/e2e/test_sync_app_data.py index 7a7f7cd6..7ffdc52f 100644 --- a/tests/e2e/test_sync_app_data.py +++ b/tests/e2e/test_sync_app_data.py @@ -7,7 +7,6 @@ from dotenv import load_dotenv from dune_client.client import DuneClient -from src.fetch.dune import DuneFetcher from src.fetch.orderbook import OrderbookFetcher from src.sync import sync_app_data from src.sync.config import AppDataSyncConfig @@ -17,7 +16,6 @@ class TestSyncAppData(IsolatedAsyncioTestCase): def setUp(self) -> None: load_dotenv() self.dune = DuneClient(os.environ["DUNE_API_KEY"]) - self.fetcher = DuneFetcher(self.dune) self.namespace = "cowprotocol" self.config = AppDataSyncConfig(table="app_data_test") self.query = self.dune.create_query( From b05da484e93216e9564ef2b93dc2c6e2ef4a20be Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Thu, 11 Jul 2024 13:42:31 +0000 Subject: [PATCH 7/8] fix assert --- src/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index ab953467..0bcba008 100644 --- a/src/main.py +++ b/src/main.py @@ -55,7 +55,7 @@ def __init__(self) -> None: if args.sync_table == SyncTable.APP_DATA: table = os.environ["APP_DATA_TARGET_TABLE"] - assert table, "APP_DATA sync needs a DUNE_NETWORK_NAME env" + assert table, "APP_DATA sync needs a APP_DATA_TARGET_TABLE env" asyncio.run( sync_app_data( orderbook, From 210d1be1b3717d3362b97c7f3172b067613f3a9e Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Thu, 11 Jul 2024 13:49:06 +0000 Subject: [PATCH 8/8] clean up env vars --- .env.sample | 2 -- 1 file changed, 2 deletions(-) diff --git a/.env.sample b/.env.sample index bdc916f3..956ec839 100644 --- a/.env.sample +++ b/.env.sample @@ -1,6 +1,4 @@ VOLUME_PATH=data -APP_DATA_MAX_RETRIES=3 -APP_DATA_GIVE_UP_THRESHOLD=100 # Dune credentials DUNE_API_KEY=