diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 1d796116..5a84816f 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -49,4 +49,3 @@ jobs: env: PROD_DB_URL: ${{ secrets.PROD_DB_URL }} BARN_DB_URL: ${{ secrets.BARN_DB_URL }} - WAREHOUSE_URL: ${{ secrets.WAREHOUSE_URL }} diff --git a/src/fetch/postgres.py b/src/fetch/postgres.py deleted file mode 100644 index f541fbf7..00000000 --- a/src/fetch/postgres.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Generic Postgres Adapter for executing queries on a postgres DB.""" -from dataclasses import dataclass -from typing import Optional - -import pandas as pd -from pandas import DataFrame -from sqlalchemy import create_engine -from sqlalchemy.engine import Engine - -from src.models.block_range import BlockRange -from src.utils import open_query - -REORG_THRESHOLD = 65 - - -@dataclass -class PostgresFetcher: - """ - Basic Postgres interface - """ - - engine: Engine - - def __init__(self, db_url: str): - db_string = f"postgresql+psycopg2://{db_url}" - - self.engine = create_engine(db_string) - - def _read_query( - self, query: str, data_types: Optional[dict[str, str]] = None - ) -> DataFrame: - return pd.read_sql_query(query, con=self.engine, dtype=data_types) - - def get_latest_block(self) -> int: - """ - Fetches the latest mutually synced block from orderbook databases (with REORG protection) - """ - data_types = {"latest": "int64"} - res = self._read_query(open_query("warehouse/latest_block.sql"), data_types) - assert len(res) == 1, "Expecting single record" - return int(res["latest"][0]) - REORG_THRESHOLD - - def get_internal_imbalances(self, block_range: BlockRange) -> DataFrame: - """ - Fetches and validates Internal Token Imbalances - """ - cow_reward_query = ( - open_query("warehouse/token_imbalances.sql") - .replace("{{start_block}}", str(block_range.block_from)) - .replace("{{end_block}}", str(block_range.block_to)) - ) - data_types = { - "block_number": "int64", - } - return self._read_query(cow_reward_query, data_types) diff --git a/src/main.py b/src/main.py index 834ca3c1..7b7c8cea 100644 --- a/src/main.py +++ b/src/main.py @@ -9,14 +9,12 @@ from src.fetch.dune import DuneFetcher from src.fetch.orderbook import OrderbookFetcher -from src.fetch.postgres import PostgresFetcher from src.logger import set_log from src.models.tables import SyncTable from src.post.aws import AWSClient from src.sync import sync_app_data from src.sync.config import SyncConfig, AppDataSyncConfig from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards -from src.sync.token_imbalance import sync_internal_imbalance log = set_log(__name__) @@ -84,12 +82,5 @@ def __init__(self) -> None: fetcher=OrderbookFetcher(), dry_run=args.dry_run, ) - elif args.sync_table == SyncTable.INTERNAL_IMBALANCE: - sync_internal_imbalance( - aws, - config=SyncConfig(volume_path), - fetcher=PostgresFetcher(os.environ["WAREHOUSE_URL"]), - dry_run=args.dry_run, - ) else: log.error(f"unsupported sync_table '{args.sync_table}'") diff --git a/src/models/tables.py b/src/models/tables.py index 147f217a..08068f6c 100644 --- a/src/models/tables.py +++ b/src/models/tables.py @@ -8,7 +8,6 @@ class SyncTable(Enum): APP_DATA = "app_data" ORDER_REWARDS = "order_rewards" BATCH_REWARDS = "batch_rewards" - INTERNAL_IMBALANCE = "internal_imbalance" def __str__(self) -> str: return str(self.value) diff --git a/src/models/token_imbalance_schema.py b/src/models/token_imbalance_schema.py deleted file mode 100644 index e7053cd4..00000000 --- a/src/models/token_imbalance_schema.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Model for Batch Rewards Data""" -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - -from pandas import DataFrame - - -@dataclass -class TokenImbalance: - """ - This class provides a transformation interface (to JSON) for Dataframe - """ - - @classmethod - def from_pdf_to_dune_records(cls, frame: DataFrame) -> list[dict[str, Any]]: - """Converts Pandas DataFrame into the expected stream type for Dune""" - return [ - { - "block_number": int(row["block_number"]), - "tx_hash": row["tx_hash"], - "token": row["token"], - "amount": str(row["amount"]), - } - for row in frame.to_dict(orient="records") - ] diff --git a/src/sql/warehouse/latest_block.sql b/src/sql/warehouse/latest_block.sql deleted file mode 100644 index 629195af..00000000 --- a/src/sql/warehouse/latest_block.sql +++ /dev/null @@ -1,4 +0,0 @@ -select - max(block_number) as latest -from - settlements; \ No newline at end of file diff --git a/src/sql/warehouse/token_imbalances.sql b/src/sql/warehouse/token_imbalances.sql deleted file mode 100644 index 814cf8d7..00000000 --- a/src/sql/warehouse/token_imbalances.sql +++ /dev/null @@ -1,11 +0,0 @@ -select - block_number, - concat('0x', encode(s.tx_hash, 'hex')) as tx_hash, - concat('0x', encode(token, 'hex')) as token, - amount::text as amount -from - internalized_imbalances - join settlements s on internalized_imbalances.tx_hash = s.tx_hash -where - block_number > {{start_block}} - and block_number <= {{end_block}}; \ No newline at end of file diff --git a/src/sync/token_imbalance.py b/src/sync/token_imbalance.py deleted file mode 100644 index fe0ca4f3..00000000 --- a/src/sync/token_imbalance.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Main Entry point for token_imbalance sync""" -from dune_client.file.interface import FileIO - -from src.fetch.postgres import PostgresFetcher -from src.logger import set_log -from src.models.block_range import BlockRange -from src.models.tables import SyncTable -from src.models.token_imbalance_schema import TokenImbalance -from src.post.aws import AWSClient -from src.sync.common import last_sync_block -from src.sync.config import SyncConfig -from src.sync.order_rewards import OrderbookDataHandler -from src.sync.upload_handler import UploadHandler - -log = set_log(__name__) - - -def sync_internal_imbalance( - aws: AWSClient, fetcher: PostgresFetcher, config: SyncConfig, dry_run: bool -) -> None: - """Token Imbalance Sync Logic""" - sync_table = SyncTable.INTERNAL_IMBALANCE - block_range = BlockRange( - block_from=last_sync_block( - aws, - table=sync_table, - # The first block for which solver competitions - # are available in production orderbook: - # select * from solver_competitions where id = 1; - genesis_block=15173540, - ), - block_to=fetcher.get_latest_block(), - ) - # TODO - Gap Detection (find missing txHashes and ensure they are accounted for!) - record_handler = OrderbookDataHandler( - file_manager=FileIO(config.volume_path / str(sync_table)), - block_range=block_range, - config=config, - data_list=TokenImbalance.from_pdf_to_dune_records( - fetcher.get_internal_imbalances(block_range) - ), - sync_table=sync_table, - ) - UploadHandler(aws, record_handler, table=sync_table).write_and_upload_content( - dry_run - ) - log.info(f"{sync_table} sync run completed successfully") diff --git a/tests/integration/test_warehouse_fetcher.py b/tests/integration/test_warehouse_fetcher.py deleted file mode 100644 index a832e2d5..00000000 --- a/tests/integration/test_warehouse_fetcher.py +++ /dev/null @@ -1,42 +0,0 @@ -import os -import unittest - -import pandas as pd -from dotenv import load_dotenv - -from src.fetch.postgres import PostgresFetcher -from src.models.block_range import BlockRange - - -class TestPostgresWarehouseFetching(unittest.TestCase): - def setUp(self) -> None: - load_dotenv() - # TODO - deploy test DB and populate with some records... - self.fetcher = PostgresFetcher(db_url=os.environ["WAREHOUSE_URL"]) - - def test_latest_block_reasonable(self): - self.assertGreater(self.fetcher.get_latest_block(), 17273090) - - def test_get_imbalances(self): - imbalance_df = self.fetcher.get_internal_imbalances( - BlockRange(17236982, 17236983) - ) - expected = pd.DataFrame( - { - "block_number": pd.Series([17236983, 17236983], dtype="int64"), - "tx_hash": [ - "0x9dc611149c7d6a936554b4be0e4fde455c015a9d5c81650af1433df5e904c791", - "0x9dc611149c7d6a936554b4be0e4fde455c015a9d5c81650af1433df5e904c791", - ], - "token": [ - "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", - "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", - ], - "amount": ["-1438513324", "789652004205719637"], - }, - ) - self.assertIsNone(pd.testing.assert_frame_equal(expected, imbalance_df)) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_token_imbalance_schema.py b/tests/unit/test_token_imbalance_schema.py deleted file mode 100644 index 136e3c37..00000000 --- a/tests/unit/test_token_imbalance_schema.py +++ /dev/null @@ -1,46 +0,0 @@ -import unittest - -import pandas as pd - -from src.models.token_imbalance_schema import TokenImbalance - - -class TestModelTokenImbalance(unittest.TestCase): - def test_token_imbalance_schema(self): - max_uint = 115792089237316195423570985008687907853269984665640564039457584007913129639936 - sample_df = pd.DataFrame( - { - "block_number": pd.Series([123, 456], dtype="int64"), - "tx_hash": [ - "0x71", - "0x72", - ], - "token": [ - "0xa0", - "0xa1", - ], - "amount": [-9999, max_uint], - } - ) - - self.assertEqual( - [ - { - "amount": "-9999", - "block_number": 123, - "token": "0xa0", - "tx_hash": "0x71", - }, - { - "amount": "115792089237316195423570985008687907853269984665640564039457584007913129639936", - "block_number": 456, - "token": "0xa1", - "tx_hash": "0x72", - }, - ], - TokenImbalance.from_pdf_to_dune_records(sample_df), - ) - - -if __name__ == "__main__": - unittest.main()