Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #103 from cowprotocol/rewrite_app_data_sync
Browse files Browse the repository at this point in the history
Rewrite AppData sync using csv upload feat
  • Loading branch information
fleupold authored Jul 11, 2024
2 parents b4553a7 + 210d1be commit b778238
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 250 deletions.
5 changes: 3 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
VOLUME_PATH=data
APP_DATA_MAX_RETRIES=3
APP_DATA_GIVE_UP_THRESHOLD=100

# Dune credentials
DUNE_API_KEY=
Expand All @@ -19,5 +17,8 @@ AWS_BUCKET=
BARN_DB_URL={user}:{password}@{host}:{port}/{database}
PROD_DB_URL={user}:{password}@{host}:{port}/{database}

#Target table for app data sync
APP_DATA_TARGET_TABLE=app_data_mainnet

# IPFS Gateway
IPFS_ACCESS_KEY=
5 changes: 2 additions & 3 deletions src/fetch/dune.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ class DuneFetcher:

def __init__(
self,
api_key: str,
dune: DuneClient,
) -> None:
"""
Class constructor.
Builds DuneClient from `api_key` along with a logger and FileIO object.
"""
self.dune = DuneClient(api_key)
self.dune = dune

async def fetch(self, query: QueryBase) -> list[DuneRecord]:
"""Async Dune Fetcher with some exception handling."""
Expand Down
11 changes: 11 additions & 0 deletions src/fetch/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,14 @@ def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame:
if not barn.empty:
return barn.copy()
return pd.DataFrame()

@classmethod
def get_app_hashes(cls) -> DataFrame:
"""
Fetches all appData hashes and preimages from Prod and Staging DB
"""
app_data_query = open_query("app_hashes.sql")
barn, prod = cls._query_both_dbs(app_data_query, app_data_query)

# We are only interested in unique app data
return pd.concat([prod, barn]).drop_duplicates().reset_index(drop=True)
24 changes: 10 additions & 14 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from pathlib import Path

from dotenv import load_dotenv
from dune_client.client import DuneClient

from src.fetch.dune import DuneFetcher
from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.tables import SyncTable
Expand Down Expand Up @@ -50,36 +50,32 @@ def __init__(self) -> None:
volume_path = Path(os.environ["VOLUME_PATH"])
args = ScriptArgs()
aws = AWSClient.new_from_environment()
dune = DuneClient(os.environ["DUNE_API_KEY"])
orderbook = OrderbookFetcher()

if args.sync_table == SyncTable.APP_DATA:
table = os.environ["APP_DATA_TARGET_TABLE"]
assert table, "APP_DATA sync needs a APP_DATA_TARGET_TABLE env"
asyncio.run(
sync_app_data(
aws,
dune=DuneFetcher(os.environ["DUNE_API_KEY"]),
config=AppDataSyncConfig(
volume_path=volume_path,
missing_files_name="missing_app_hashes.json",
max_retries=int(os.environ.get("APP_DATA_MAX_RETRIES", 3)),
give_up_threshold=int(
os.environ.get("APP_DATA_GIVE_UP_THRESHOLD", 100)
),
),
ipfs_access_key=os.environ["IPFS_ACCESS_KEY"],
orderbook,
dune=dune,
config=AppDataSyncConfig(table),
dry_run=args.dry_run,
)
)
elif args.sync_table == SyncTable.ORDER_REWARDS:
sync_order_rewards(
aws,
config=SyncConfig(volume_path),
fetcher=OrderbookFetcher(),
fetcher=orderbook,
dry_run=args.dry_run,
)
elif args.sync_table == SyncTable.BATCH_REWARDS:
sync_batch_rewards(
aws,
config=SyncConfig(volume_path),
fetcher=OrderbookFetcher(),
fetcher=orderbook,
dry_run=args.dry_run,
)
else:
Expand Down
5 changes: 0 additions & 5 deletions src/sql/app_hash_latest_block.sql

This file was deleted.

25 changes: 5 additions & 20 deletions src/sql/app_hashes.sql
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
-- App Hashes: https://dune.com/queries/1610025
-- MIN(first_block_seen) = 12153263
-- Nov 16, 2022: Query takes 4 seconds to run for on full block range
with
app_hashes as (
select
min(call_block_number) first_seen_block,
get_json_object(trade, '$.appData') as app_hash
from gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle
lateral view explode(trades) as trade
group by app_hash
)
select
app_hash,
first_seen_block
from app_hashes
where first_seen_block > '{{BlockFrom}}'
and first_seen_block <= '{{BlockTo}}'
-- Selects all known appData hashes and preimages (as string) from the backend database

-- For some additional stats,
-- on this data see https://dune.com/queries/1608286
SELECT
concat('0x',encode(contract_app_data, 'hex')) contract_app_data,
encode(full_app_data, 'escape')
FROM app_data
153 changes: 13 additions & 140 deletions src/sync/app_data.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,27 @@
"""Main Entry point for app_hash sync"""

from dune_client.file.interface import FileIO
from dune_client.types import DuneRecord
from dune_client.client import DuneClient

from src.fetch.dune import DuneFetcher
from src.fetch.ipfs import Cid
from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.app_data_content import FoundContent, NotFoundContent
from src.models.block_range import BlockRange
from src.models.tables import SyncTable
from src.post.aws import AWSClient
from src.sync.common import last_sync_block
from src.sync.config import SyncConfig, AppDataSyncConfig
from src.sync.record_handler import RecordHandler
from src.sync.upload_handler import UploadHandler
from src.sync.config import AppDataSyncConfig

log = set_log(__name__)


SYNC_TABLE = SyncTable.APP_DATA


class AppDataHandler(RecordHandler): # pylint:disable=too-many-instance-attributes
"""
This class is responsible for consuming new dune records and missing values from previous runs
it attempts to fetch content for them and filters them into "found" and "not found" as necessary
"""

def __init__( # pylint:disable=too-many-arguments
self,
file_manager: FileIO,
new_rows: list[DuneRecord],
block_range: BlockRange,
config: SyncConfig,
ipfs_access_key: str,
missing_file_name: str,
):
super().__init__(block_range, SYNC_TABLE, config)
self.file_manager = file_manager
self.ipfs_access_key = ipfs_access_key

self._found: list[FoundContent] = []
self._not_found: list[NotFoundContent] = []

self.new_rows = new_rows
self.missing_file_name = missing_file_name
try:
self.missing_values = self.file_manager.load_ndjson(missing_file_name)
except FileNotFoundError:
self.missing_values = []

def num_records(self) -> int:
assert len(self.new_rows) == 0, (
"this function call is not allowed until self.new_rows have been processed! "
"call fetch_content_and_filter first"
)
return len(self._found)

async def _handle_new_records(self, max_retries: int) -> None:
# Drain the dune_results into "found" and "not found" categories
self._found, self._not_found = await Cid.fetch_many(
self.new_rows, self.ipfs_access_key, max_retries
)

async def _handle_missing_records(
self, max_retries: int, give_up_threshold: int
) -> None:
found, not_found = await Cid.fetch_many(
self.missing_values, self.ipfs_access_key, max_retries
)
while found:
self._found.append(found.pop())
while not_found:
row = not_found.pop()
app_hash, attempts = row.app_hash, row.attempts
if attempts > give_up_threshold:
log.debug(
f"No content found after {attempts} attempts for {app_hash} assuming NULL."
)
self._found.append(
FoundContent(
app_hash=app_hash,
first_seen_block=row.first_seen_block,
content={},
)
)
else:
self._not_found.append(row)

def write_found_content(self) -> None:
assert len(self.new_rows) == 0, "Must call _handle_new_records first!"
self.file_manager.write_ndjson(
data=[x.as_dune_record() for x in self._found], name=self.content_filename
)
# When not_found is empty, we want to overwrite the file (hence skip_empty=False)
# This happens when number of attempts exceeds GIVE_UP_THRESHOLD
self.file_manager.write_ndjson(
data=[x.as_dune_record() for x in self._not_found],
name=self.missing_file_name,
skip_empty=False,
)

def write_sync_data(self) -> None:
# Only write these if upload was successful.
self.file_manager.write_csv(
data=[{self.config.sync_column: str(self.block_range.block_to)}],
name=self.config.sync_file,
)

async def fetch_content_and_filter(
self, max_retries: int, give_up_threshold: int
) -> None:
"""
Run loop fetching app_data for hashes,
separates into (found and not found), returning the pair.
"""
await self._handle_new_records(max_retries)
log.info(
f"Attempting to recover missing {len(self.missing_values)} records from previous run"
)
await self._handle_missing_records(max_retries, give_up_threshold)


async def sync_app_data(
aws: AWSClient,
dune: DuneFetcher,
orderbook: OrderbookFetcher,
dune: DuneClient,
config: AppDataSyncConfig,
ipfs_access_key: str,
dry_run: bool,
) -> None:
"""App Data Sync Logic"""
block_range = BlockRange(
block_from=last_sync_block(
aws,
table=SYNC_TABLE,
genesis_block=12153262, # First App Hash Block
),
block_to=await dune.latest_app_hash_block(),
)

data_handler = AppDataHandler(
file_manager=FileIO(config.volume_path / str(SYNC_TABLE)),
new_rows=await dune.get_app_hashes(block_range),
block_range=block_range,
config=config,
ipfs_access_key=ipfs_access_key,
missing_file_name=config.missing_files_name,
)
await data_handler.fetch_content_and_filter(
max_retries=config.max_retries, give_up_threshold=config.give_up_threshold
)
UploadHandler(aws, data_handler, table=SYNC_TABLE).write_and_upload_content(dry_run)
hashes = orderbook.get_app_hashes()
if not dry_run:
dune.upload_csv(
data=hashes.to_csv(index=False),
table_name=config.table,
description=config.description,
is_private=False,
)
log.info("app_data sync run completed successfully")
16 changes: 8 additions & 8 deletions src/sync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class SyncConfig:


@dataclass
class AppDataSyncConfig(SyncConfig):
"""Additional data field for app data sync."""
class AppDataSyncConfig:
"""Configuration for app data sync."""

# Maximum number of retries on a single run
max_retries: int = 3
# Total number of accumulated attempts before we assume no content
give_up_threshold: int = 100
# Persisted file where we store the missing results and number of attempts.
missing_files_name: str = "missing_app_hashes.json"
# The name of the table to upload to
table: str = "app_data_test"
# Description of the table (for creation)
description: str = (
"Table containing known CoW Protocol appData hashes and their pre-images"
)
Loading

0 comments on commit b778238

Please sign in to comment.