From ffdef7945746acdf9db6cba0e4d98ed9ad12cbb2 Mon Sep 17 00:00:00 2001 From: = Date: Wed, 30 Oct 2024 14:01:28 +0100 Subject: [PATCH 1/6] Split prefect deployments into dev and prod --- .github/workflows/prefect-dev.yaml | 27 +++ .../{prefect.yaml => prefect-prod.yaml} | 6 +- src/deploy_prefect/dev_deployment.py | 166 ++++++++++++++++++ .../{deployment.py => prod_deployment.py} | 2 +- 4 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 .github/workflows/prefect-dev.yaml rename .github/workflows/{prefect.yaml => prefect-prod.yaml} (87%) create mode 100644 src/deploy_prefect/dev_deployment.py rename src/deploy_prefect/{deployment.py => prod_deployment.py} (98%) diff --git a/.github/workflows/prefect-dev.yaml b/.github/workflows/prefect-dev.yaml new file mode 100644 index 0000000..95b9d6b --- /dev/null +++ b/.github/workflows/prefect-dev.yaml @@ -0,0 +1,27 @@ +name: prefect-dev +on: + push: + branches: [ dev ] +jobs: + dev-deployment: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12"] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Deploy prefect deployment + env: + PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} + run: | + python -m pip install --upgrade pip + pip install -r requirements/prefect.txt + pip install -r requirements/prod.txt + prefect config set PREFECT_API_URL=$PREFECT_API_URL + python -m src.deploy_prefect.dev_deployment diff --git a/.github/workflows/prefect.yaml b/.github/workflows/prefect-prod.yaml similarity index 87% rename from .github/workflows/prefect.yaml rename to .github/workflows/prefect-prod.yaml index a977343..795c181 100644 --- a/.github/workflows/prefect.yaml +++ b/.github/workflows/prefect-prod.yaml @@ -1,9 +1,9 @@ -name: prefect +name: prefect-prod on: push: branches: [ main ] jobs: - deployment: + prod-deployment: runs-on: ubuntu-latest strategy: matrix: @@ -24,4 +24,4 @@ jobs: pip install -r requirements/prefect.txt pip install -r requirements/prod.txt prefect config set PREFECT_API_URL=$PREFECT_API_URL - python -m src.deploy_prefect.deployment + python -m src.deploy_prefect.prod_deployment diff --git a/src/deploy_prefect/dev_deployment.py b/src/deploy_prefect/dev_deployment.py new file mode 100644 index 0000000..c7ba424 --- /dev/null +++ b/src/deploy_prefect/dev_deployment.py @@ -0,0 +1,166 @@ +"""Prefect Deployment for Order Rewards Data""" + +import os +from io import StringIO +from datetime import datetime, timedelta, timezone + +import requests +import pandas as pd +from dotenv import load_dotenv +from dune_client.client import DuneClient + +# pylint: disable=import-error +from prefect import flow, task, get_run_logger # type: ignore + +from prefect.runner.storage import GitRepository # type: ignore + +from src.models.block_range import BlockRange +from src.fetch.orderbook import OrderbookFetcher + +load_dotenv() + + +def get_last_monday_midnight_utc() -> int: + """Get the timestamp of last monday at midnight UTC""" + now = datetime.now(timezone.utc) + current_weekday = now.weekday() + days_since_last_monday = current_weekday if current_weekday != 0 else 7 + last_monday = now - timedelta(days=days_since_last_monday) + last_monday_midnight = last_monday.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + timestamp = int(last_monday_midnight.timestamp()) + return timestamp + + +@task # type: ignore[misc] +def get_block_range() -> BlockRange: + """Returns the blockrange from last monday midnight until now""" + etherscan_api = "https://api.etherscan.io/api" + api_key = os.environ["ETHERSCAN_API_KEY"] + start = ( + requests.get( + etherscan_api, + { # type: ignore + "module": "block", + "action": "getblocknobytime", + "timestamp": get_last_monday_midnight_utc(), + "closest": "before", + "apikey": api_key, + }, + timeout=60, + ) + .json() + .get("result") + ) + end = ( + requests.get( + etherscan_api, + { # type: ignore + "module": "block", + "action": "getblocknobytime", + "timestamp": int(datetime.now(timezone.utc).timestamp()), + "closest": "before", + "apikey": api_key, + }, + timeout=60, + ) + .json() + .get("result") + ) + + blockrange = BlockRange(block_from=start, block_to=end) + return blockrange + + +@task # type: ignore[misc] +def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: + """Runs the query to get the order book for a specified blockrange""" + orderbook = OrderbookFetcher() + return orderbook.get_order_rewards(blockrange) + + +@task # type: ignore[misc] +def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: + """Casts the dataframe to a string in csv format for uploading to Dune""" + csv_buffer = StringIO() + orderbook.to_csv(csv_buffer, index=False) + return csv_buffer.getvalue() + + +@task # type: ignore[misc] +def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: + """ + Uploads the order rewards data to Dune, + either creating a new query or updating an existing one + """ + table_name = f"order_rewards_{block_start}" + dune = DuneClient.from_env() + dune.upload_csv( # type: ignore[attr-defined] + data=data, + description=f"Order rewards data for blocks {block_start}-{block_end}", + table_name=table_name, + is_private=False, + ) + return table_name + + +@task # type: ignore[misc] +def update_aggregate_query(table_name: str) -> None: + """ + Query example: + WITH aggregate AS ( + SELECT * FROM dune.cowprotocol.order_rewards_1 + UNION ALL + SELECT * FROM dune.cowprotocol.order_rewards_2 + ) + + SELECT DISTINCT * FROM aggregate; + """ + + logger = get_run_logger() + dune = DuneClient.from_env() + query_id = os.environ["AGGREGATE_QUERY_ID"] + query = dune.get_query(query_id) # type: ignore[attr-defined] + sql_query = query.sql + + if table_name not in sql_query: + logger.info(f"Table name not found, updating table with {table_name}") + insertion_point = insertion_point = sql_query.rfind(")") + updated_sql_query = ( + sql_query[:insertion_point].strip() + + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" + + sql_query[insertion_point:] + ) + dune.update_query( # type: ignore[attr-defined] + query_sql=updated_sql_query, query_id=query_id + ) + else: + logger.info("Table already in query, not updating query") + + +@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] +def order_rewards() -> None: + """Defines a flow for updating the order_rewards table""" + blockrange = get_block_range() + orderbook = fetch_orderbook(blockrange) + data = cast_orderbook_to_dune_string(orderbook) + table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + update_aggregate_query(table_name) + + +if __name__ == "__main__": + git_source = GitRepository( + url="https://github.com/cowprotocol/dune-sync.git", + ) + flow.from_source( + source=git_source, + entrypoint="src/deploy_prefect/dev_deployment.py:order_rewards", + ).deploy( + name="dune-sync-dev-order-rewards", + work_pool_name="cowbarn", + cron="*/30 * * * *", # Every 30 minutes + tags=["solver", "dune-sync"], + description="Run the dune sync order_rewards query", + version="0.0.1", + ) diff --git a/src/deploy_prefect/deployment.py b/src/deploy_prefect/prod_deployment.py similarity index 98% rename from src/deploy_prefect/deployment.py rename to src/deploy_prefect/prod_deployment.py index b13b6c4..4ec3ab6 100644 --- a/src/deploy_prefect/deployment.py +++ b/src/deploy_prefect/prod_deployment.py @@ -155,7 +155,7 @@ def order_rewards() -> None: ) flow.from_source( source=git_source, - entrypoint="src/deploy_prefect/deployment.py:order_rewards", + entrypoint="src/deploy_prefect/prod_deployment.py:order_rewards", ).deploy( name="dune-sync-prod-order-rewards", work_pool_name="cowbarn", From b3d07e61a88219b75d79d3d449019895af04a1a8 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 4 Nov 2024 10:56:28 +0100 Subject: [PATCH 2/6] Split dev and prod deployments --- .github/workflows/pull-request.yaml | 24 ++++ src/deploy_prefect/dev_deployment.py | 155 +----------------------- src/deploy_prefect/flows.py | 35 ++++++ src/deploy_prefect/local_deploy.py | 2 +- src/deploy_prefect/models.py | 90 ++++++++++++++ src/deploy_prefect/pr_deployment.py | 25 ++++ src/deploy_prefect/prod_deployment.py | 154 +---------------------- src/deploy_prefect/tasks.py | 125 +++++++++++++++++++ src/deploy_prefect/utils.py | 15 +++ src/models/batch_rewards_schema.py | 1 - tests/unit/test_batch_rewards_schema.py | 24 ---- 11 files changed, 326 insertions(+), 324 deletions(-) create mode 100644 src/deploy_prefect/flows.py create mode 100644 src/deploy_prefect/models.py create mode 100644 src/deploy_prefect/pr_deployment.py create mode 100644 src/deploy_prefect/tasks.py create mode 100644 src/deploy_prefect/utils.py diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 5a84816..a27bec2 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -49,3 +49,27 @@ jobs: env: PROD_DB_URL: ${{ secrets.PROD_DB_URL }} BARN_DB_URL: ${{ secrets.BARN_DB_URL }} + + prefect-pr-test-run: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12"] + + env: + PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} + BRANCH_NAME: ${{ github.head_ref }} + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - run: | + python -m pip install --upgrade pip + pip install -r requirements/prefect.txt + pip install -r requirements/prod.txt + prefect config set PREFECT_API_URL=$PREFECT_API_URL + python -m src.deploy_prefect.pr_deployment + prefect deployment run 'dev-order-rewards/dune-sync-pr-order-rewards' diff --git a/src/deploy_prefect/dev_deployment.py b/src/deploy_prefect/dev_deployment.py index c7ba424..76fd031 100644 --- a/src/deploy_prefect/dev_deployment.py +++ b/src/deploy_prefect/dev_deployment.py @@ -1,166 +1,23 @@ """Prefect Deployment for Order Rewards Data""" - -import os -from io import StringIO -from datetime import datetime, timedelta, timezone - -import requests -import pandas as pd -from dotenv import load_dotenv -from dune_client.client import DuneClient - # pylint: disable=import-error -from prefect import flow, task, get_run_logger # type: ignore - +from prefect import flow # type: ignore from prefect.runner.storage import GitRepository # type: ignore -from src.models.block_range import BlockRange -from src.fetch.orderbook import OrderbookFetcher - -load_dotenv() - - -def get_last_monday_midnight_utc() -> int: - """Get the timestamp of last monday at midnight UTC""" - now = datetime.now(timezone.utc) - current_weekday = now.weekday() - days_since_last_monday = current_weekday if current_weekday != 0 else 7 - last_monday = now - timedelta(days=days_since_last_monday) - last_monday_midnight = last_monday.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - timestamp = int(last_monday_midnight.timestamp()) - return timestamp - - -@task # type: ignore[misc] -def get_block_range() -> BlockRange: - """Returns the blockrange from last monday midnight until now""" - etherscan_api = "https://api.etherscan.io/api" - api_key = os.environ["ETHERSCAN_API_KEY"] - start = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": get_last_monday_midnight_utc(), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - end = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": int(datetime.now(timezone.utc).timestamp()), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - - blockrange = BlockRange(block_from=start, block_to=end) - return blockrange - - -@task # type: ignore[misc] -def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: - """Runs the query to get the order book for a specified blockrange""" - orderbook = OrderbookFetcher() - return orderbook.get_order_rewards(blockrange) - - -@task # type: ignore[misc] -def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: - """Casts the dataframe to a string in csv format for uploading to Dune""" - csv_buffer = StringIO() - orderbook.to_csv(csv_buffer, index=False) - return csv_buffer.getvalue() - - -@task # type: ignore[misc] -def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: - """ - Uploads the order rewards data to Dune, - either creating a new query or updating an existing one - """ - table_name = f"order_rewards_{block_start}" - dune = DuneClient.from_env() - dune.upload_csv( # type: ignore[attr-defined] - data=data, - description=f"Order rewards data for blocks {block_start}-{block_end}", - table_name=table_name, - is_private=False, - ) - return table_name - - -@task # type: ignore[misc] -def update_aggregate_query(table_name: str) -> None: - """ - Query example: - WITH aggregate AS ( - SELECT * FROM dune.cowprotocol.order_rewards_1 - UNION ALL - SELECT * FROM dune.cowprotocol.order_rewards_2 - ) - - SELECT DISTINCT * FROM aggregate; - """ - - logger = get_run_logger() - dune = DuneClient.from_env() - query_id = os.environ["AGGREGATE_QUERY_ID"] - query = dune.get_query(query_id) # type: ignore[attr-defined] - sql_query = query.sql - - if table_name not in sql_query: - logger.info(f"Table name not found, updating table with {table_name}") - insertion_point = insertion_point = sql_query.rfind(")") - updated_sql_query = ( - sql_query[:insertion_point].strip() - + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" - + sql_query[insertion_point:] - ) - dune.update_query( # type: ignore[attr-defined] - query_sql=updated_sql_query, query_id=query_id - ) - else: - logger.info("Table already in query, not updating query") - - -@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] -def order_rewards() -> None: - """Defines a flow for updating the order_rewards table""" - blockrange = get_block_range() - orderbook = fetch_orderbook(blockrange) - data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) - update_aggregate_query(table_name) - +# pylint: disable=duplicate-code if __name__ == "__main__": git_source = GitRepository( url="https://github.com/cowprotocol/dune-sync.git", + branch="dev", ) flow.from_source( source=git_source, - entrypoint="src/deploy_prefect/dev_deployment.py:order_rewards", + entrypoint="src/deploy_prefect/flows.py:dev_order_rewards", ).deploy( name="dune-sync-dev-order-rewards", work_pool_name="cowbarn", cron="*/30 * * * *", # Every 30 minutes - tags=["solver", "dune-sync"], + tags=["dev", "solver", "dune-sync"], description="Run the dune sync order_rewards query", - version="0.0.1", + version="0.0.2", ) diff --git a/src/deploy_prefect/flows.py b/src/deploy_prefect/flows.py new file mode 100644 index 0000000..a317c25 --- /dev/null +++ b/src/deploy_prefect/flows.py @@ -0,0 +1,35 @@ +"""Prefect Deployment for Order Rewards Data""" +# pylint: disable=import-error +from prefect import flow # type: ignore + +from src.deploy_prefect.tasks import ( + get_block_range, + fetch_orderbook, + cast_orderbook_to_dune_string, + upload_data_to_dune, + update_aggregate_query, +) +from src.deploy_prefect.models import ENV, CHAIN, Config + + +@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] +def dev_order_rewards() -> None: + """Defines a flow for updating the order_rewards table""" + config = Config(CHAIN.MAINNET, ENV.DEV) + + blockrange = get_block_range() + orderbook = fetch_orderbook(blockrange) + data = cast_orderbook_to_dune_string(orderbook) + table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + update_aggregate_query(table_name, config) + + +@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] +def prod_order_rewards() -> None: + """Defines a flow for updating the order_rewards table""" + config = Config(CHAIN.MAINNET, ENV.PROD) + blockrange = get_block_range() + orderbook = fetch_orderbook(blockrange) + data = cast_orderbook_to_dune_string(orderbook) + table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + update_aggregate_query(table_name, config) diff --git a/src/deploy_prefect/local_deploy.py b/src/deploy_prefect/local_deploy.py index 92700b5..a4e0a21 100644 --- a/src/deploy_prefect/local_deploy.py +++ b/src/deploy_prefect/local_deploy.py @@ -5,7 +5,7 @@ # pylint: disable=import-error from prefect import flow # type: ignore from dotenv import load_dotenv -from src.deploy_prefect.deployment import ( +from src.deploy_prefect.tasks import ( get_block_range, fetch_orderbook, cast_orderbook_to_dune_string, diff --git a/src/deploy_prefect/models.py b/src/deploy_prefect/models.py new file mode 100644 index 0000000..1c77f46 --- /dev/null +++ b/src/deploy_prefect/models.py @@ -0,0 +1,90 @@ +"""Dataclasses for the prefect deployments""" +import os +from enum import Enum +from dataclasses import dataclass, field + +from dotenv import load_dotenv + +load_dotenv() + + +class ENV(Enum): + """ + Enum ENV class to change environment variables for DEV And PROD + """ + + DEV = "DEV" + PR = "PR" + PROD = "PROD" + + def is_dev(self) -> bool: + """Check if the environment is DEV.""" + return self == ENV.DEV + + def is_pr(self) -> bool: + """Check if the environment is PR.""" + return self == ENV.PR + + def is_prod(self) -> bool: + """Check if the environment is PROD.""" + return self == ENV.PROD + + +class CHAIN(Enum): + """ + Enum CHAIN class to change environment variables different chains + """ + + ARBITRUM = "ARBITRUM" + GNOSIS = "GNOSIS" + MAINNET = "MAINNET" + + def is_arbitrum(self) -> bool: + """Check if the chain is Arbitrum""" + return self == CHAIN.ARBITRUM + + def is_gnosis(self) -> bool: + """Check if the chain is Gnosis""" + return self == CHAIN.GNOSIS + + def is_mainnet(self) -> bool: + """Check if the chain is mainnet""" + return self == CHAIN.MAINNET + + +@dataclass(frozen=True) +class Config: + """ + Config dataclass to setup config based on chain&env combination. + """ + + _chain: CHAIN + _env: ENV + + _dune_query_id: str = field(init=False) + _etherscan_api_key: str = field(init=False) + + def __post_init__(self) -> None: + etherscan_api_value = os.environ["ETHERSCAN_API_KEY"] + + if self._env.is_dev(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_DEV_ID"] + elif self._env.is_pr(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_PR_ID"] + elif self._env.is_prod(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_ID"] + else: + raise ValueError("ENV is neither DEV, PR, nor PROD") + + object.__setattr__(self, "_etherscan_api_key", etherscan_api_value) + object.__setattr__(self, "_dune_query_id", dune_query_id_value) + + @property + def etherscan_api_key(self) -> str: + """Etherscan API key getter""" + return self._etherscan_api_key + + @property + def dune_query_id(self) -> str: + """Dune Aggregate Query Getter""" + return self._dune_query_id diff --git a/src/deploy_prefect/pr_deployment.py b/src/deploy_prefect/pr_deployment.py new file mode 100644 index 0000000..ad8d3f5 --- /dev/null +++ b/src/deploy_prefect/pr_deployment.py @@ -0,0 +1,25 @@ +"""Prefect Deployment for Order Rewards Data""" +import os + +# pylint: disable=import-error +from prefect import flow # type: ignore +from prefect.runner.storage import GitRepository # type: ignore + + +# pylint: disable=duplicate-code +if __name__ == "__main__": + branch_name = os.getenv("BRANCH_NAME") + git_source = GitRepository( + url="https://github.com/cowprotocol/dune-sync.git", + branch=branch_name, + ) + flow.from_source( + source=git_source, + entrypoint="src/deploy_prefect/flows.py:dev_order_rewards", + ).deploy( + name="dune-sync-pr-order-rewards", + work_pool_name="cowbarn", + tags=["dev", "solver", "dune-sync"], + description="Run the dune sync order_rewards query", + version="0.0.2", + ) diff --git a/src/deploy_prefect/prod_deployment.py b/src/deploy_prefect/prod_deployment.py index 4ec3ab6..3453ab0 100644 --- a/src/deploy_prefect/prod_deployment.py +++ b/src/deploy_prefect/prod_deployment.py @@ -1,166 +1,22 @@ """Prefect Deployment for Order Rewards Data""" - -import os -from io import StringIO -from datetime import datetime, timedelta, timezone - -import requests -import pandas as pd -from dotenv import load_dotenv -from dune_client.client import DuneClient - # pylint: disable=import-error -from prefect import flow, task, get_run_logger # type: ignore - +from prefect import flow # type: ignore from prefect.runner.storage import GitRepository # type: ignore -from src.models.block_range import BlockRange -from src.fetch.orderbook import OrderbookFetcher - -load_dotenv() - - -def get_last_monday_midnight_utc() -> int: - """Get the timestamp of last monday at midnight UTC""" - now = datetime.now(timezone.utc) - current_weekday = now.weekday() - days_since_last_monday = current_weekday if current_weekday != 0 else 7 - last_monday = now - timedelta(days=days_since_last_monday) - last_monday_midnight = last_monday.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - timestamp = int(last_monday_midnight.timestamp()) - return timestamp - - -@task # type: ignore[misc] -def get_block_range() -> BlockRange: - """Returns the blockrange from last monday midnight until now""" - etherscan_api = "https://api.etherscan.io/api" - api_key = os.environ["ETHERSCAN_API_KEY"] - start = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": get_last_monday_midnight_utc(), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - end = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": int(datetime.now(timezone.utc).timestamp()), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - - blockrange = BlockRange(block_from=start, block_to=end) - return blockrange - - -@task # type: ignore[misc] -def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: - """Runs the query to get the order book for a specified blockrange""" - orderbook = OrderbookFetcher() - return orderbook.get_order_rewards(blockrange) - - -@task # type: ignore[misc] -def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: - """Casts the dataframe to a string in csv format for uploading to Dune""" - csv_buffer = StringIO() - orderbook.to_csv(csv_buffer, index=False) - return csv_buffer.getvalue() - - -@task # type: ignore[misc] -def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: - """ - Uploads the order rewards data to Dune, - either creating a new query or updating an existing one - """ - table_name = f"order_rewards_{block_start}" - dune = DuneClient.from_env() - dune.upload_csv( # type: ignore[attr-defined] - data=data, - description=f"Order rewards data for blocks {block_start}-{block_end}", - table_name=table_name, - is_private=False, - ) - return table_name - - -@task # type: ignore[misc] -def update_aggregate_query(table_name: str) -> None: - """ - Query example: - WITH aggregate AS ( - SELECT * FROM dune.cowprotocol.order_rewards_1 - UNION ALL - SELECT * FROM dune.cowprotocol.order_rewards_2 - ) - - SELECT DISTINCT * FROM aggregate; - """ - - logger = get_run_logger() - dune = DuneClient.from_env() - query_id = os.environ["AGGREGATE_QUERY_ID"] - query = dune.get_query(query_id) # type: ignore[attr-defined] - sql_query = query.sql - - if table_name not in sql_query: - logger.info(f"Table name not found, updating table with {table_name}") - insertion_point = insertion_point = sql_query.rfind(")") - updated_sql_query = ( - sql_query[:insertion_point].strip() - + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" - + sql_query[insertion_point:] - ) - dune.update_query( # type: ignore[attr-defined] - query_sql=updated_sql_query, query_id=query_id - ) - else: - logger.info("Table already in query, not updating query") - - -@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] -def order_rewards() -> None: - """Defines a flow for updating the order_rewards table""" - blockrange = get_block_range() - orderbook = fetch_orderbook(blockrange) - data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) - update_aggregate_query(table_name) - +# pylint: disable=duplicate-code if __name__ == "__main__": git_source = GitRepository( url="https://github.com/cowprotocol/dune-sync.git", ) flow.from_source( source=git_source, - entrypoint="src/deploy_prefect/prod_deployment.py:order_rewards", + entrypoint="src/deploy_prefect/flows.py:prod_order_rewards", ).deploy( name="dune-sync-prod-order-rewards", work_pool_name="cowbarn", cron="*/30 * * * *", # Every 30 minutes - tags=["solver", "dune-sync"], + tags=["prod", "solver", "dune-sync"], description="Run the dune sync order_rewards query", - version="0.0.1", + version="0.0.2", ) diff --git a/src/deploy_prefect/tasks.py b/src/deploy_prefect/tasks.py new file mode 100644 index 0000000..e1caeed --- /dev/null +++ b/src/deploy_prefect/tasks.py @@ -0,0 +1,125 @@ +"""Prefect Deployment for Order Rewards Data""" + +import os +from io import StringIO +from datetime import datetime, timezone + +import requests +import pandas as pd +from dotenv import load_dotenv +from dune_client.client import DuneClient + +# pylint: disable=import-error +from prefect import task, get_run_logger # type: ignore + +from src.models.block_range import BlockRange +from src.fetch.orderbook import OrderbookFetcher +from src.deploy_prefect.utils import get_last_monday_midnight_utc +from src.deploy_prefect.models import Config + +load_dotenv() + + +@task # type: ignore[misc] +def get_block_range() -> BlockRange: + """Returns the blockrange from last monday midnight until now""" + etherscan_api = "https://api.etherscan.io/api" + api_key = os.environ["ETHERSCAN_API_KEY"] + start = ( + requests.get( + etherscan_api, + { # type: ignore + "module": "block", + "action": "getblocknobytime", + "timestamp": get_last_monday_midnight_utc(), + "closest": "before", + "apikey": api_key, + }, + timeout=60, + ) + .json() + .get("result") + ) + end = ( + requests.get( + etherscan_api, + { # type: ignore + "module": "block", + "action": "getblocknobytime", + "timestamp": int(datetime.now(timezone.utc).timestamp()), + "closest": "before", + "apikey": api_key, + }, + timeout=60, + ) + .json() + .get("result") + ) + + blockrange = BlockRange(block_from=start, block_to=end) + return blockrange + + +@task # type: ignore[misc] +def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: + """Runs the query to get the order book for a specified blockrange""" + orderbook = OrderbookFetcher() + return orderbook.get_order_rewards(blockrange) + + +@task # type: ignore[misc] +def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: + """Casts the dataframe to a string in csv format for uploading to Dune""" + csv_buffer = StringIO() + orderbook.to_csv(csv_buffer, index=False) + return csv_buffer.getvalue() + + +@task # type: ignore[misc] +def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: + """ + Uploads the order rewards data to Dune, + either creating a new query or updating an existing one + """ + table_name = f"order_rewards_{block_start}" + dune = DuneClient.from_env() + dune.upload_csv( # type: ignore[attr-defined] + data=data, + description=f"Order rewards data for blocks {block_start}-{block_end}", + table_name=table_name, + is_private=False, + ) + return table_name + + +@task # type: ignore[misc] +def update_aggregate_query(table_name: str, config: Config) -> None: + """ + Query example: + WITH aggregate AS ( + SELECT * FROM dune.cowprotocol.order_rewards_1 + UNION ALL + SELECT * FROM dune.cowprotocol.order_rewards_2 + ) + + SELECT DISTINCT * FROM aggregate; + """ + + logger = get_run_logger() + dune = DuneClient.from_env() + query = dune.get_query(config.dune_query_id) # type: ignore[attr-defined] + sql_query = query.sql + + if table_name not in sql_query: + logger.info(f"Table name not found, updating table with {table_name}") + insertion_point = insertion_point = sql_query.rfind(")") + updated_sql_query = ( + sql_query[:insertion_point].strip() + + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" + + sql_query[insertion_point:] + ) + dune.update_query( # type: ignore[attr-defined] + query_sql=updated_sql_query, query_id=config.dune_query_id + ) + else: + logger.info("Table already in query, not updating query") diff --git a/src/deploy_prefect/utils.py b/src/deploy_prefect/utils.py new file mode 100644 index 0000000..1d8ab0d --- /dev/null +++ b/src/deploy_prefect/utils.py @@ -0,0 +1,15 @@ +"""Prefect Deployment for Order Rewards Data""" +from datetime import datetime, timedelta, timezone + + +def get_last_monday_midnight_utc() -> int: + """Get the timestamp of last monday at midnight UTC""" + now = datetime.now(timezone.utc) + current_weekday = now.weekday() + days_since_last_monday = current_weekday if current_weekday != 0 else 7 + last_monday = now - timedelta(days=days_since_last_monday) + last_monday_midnight = last_monday.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + timestamp = int(last_monday_midnight.timestamp()) + return timestamp diff --git a/src/models/batch_rewards_schema.py b/src/models/batch_rewards_schema.py index 7f481c1..7001cc8 100644 --- a/src/models/batch_rewards_schema.py +++ b/src/models/batch_rewards_schema.py @@ -35,7 +35,6 @@ def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]] "fee": int(row["network_fee"]), "winning_score": int(row["winning_score"]), "reference_score": int(row["reference_score"]), - "participating_solvers": row["participating_solvers"], }, } for row in rewards_df.to_dict(orient="records") diff --git a/tests/unit/test_batch_rewards_schema.py b/tests/unit/test_batch_rewards_schema.py index e7f7a17..64db127 100644 --- a/tests/unit/test_batch_rewards_schema.py +++ b/tests/unit/test_batch_rewards_schema.py @@ -34,21 +34,6 @@ def test_order_rewards_transformation(self): "capped_payment": [-1000000000000000, -1000000000000000], "winning_score": [123456 * ONE_ETH, 6789 * ONE_ETH], "reference_score": [ONE_ETH, 2 * ONE_ETH], - "participating_solvers": [ - [ - "0x51", - "0x52", - "0x53", - ], - [ - "0x51", - "0x52", - "0x53", - "0x54", - "0x55", - "0x56", - ], - ], } ) @@ -61,7 +46,6 @@ def test_order_rewards_transformation(self): "capped_payment": -1000000000000000, "execution_cost": 9999000000000000000000, "fee": 1000000000000000, - "participating_solvers": ["0x51", "0x52", "0x53"], "protocol_fee": 2000000000000000, "reference_score": 1000000000000000000, "surplus": 2000000000000000000, @@ -78,14 +62,6 @@ def test_order_rewards_transformation(self): "capped_payment": -1000000000000000, "execution_cost": 1, "fee": max_uint, - "participating_solvers": [ - "0x51", - "0x52", - "0x53", - "0x54", - "0x55", - "0x56", - ], "protocol_fee": 0, "reference_score": 2000000000000000000, "surplus": 3000000000000000000, From 307af9456331b2a7f2f1f20ea0601ee640007c72 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 4 Nov 2024 14:52:11 +0100 Subject: [PATCH 3/6] Fix dune data upload name --- src/deploy_prefect/models.py | 12 +++++++++++- src/deploy_prefect/tasks.py | 6 ++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/deploy_prefect/models.py b/src/deploy_prefect/models.py index 1c77f46..adc91d8 100644 --- a/src/deploy_prefect/models.py +++ b/src/deploy_prefect/models.py @@ -81,10 +81,20 @@ def __post_init__(self) -> None: @property def etherscan_api_key(self) -> str: - """Etherscan API key getter""" + """Etherscan API Key Getter""" return self._etherscan_api_key @property def dune_query_id(self) -> str: """Dune Aggregate Query Getter""" return self._dune_query_id + + @property + def env(self) ->: + """ENV Getter""" + return self._env + + @property + def chain(self) ->: + """Chain Getter""" + return self._chain diff --git a/src/deploy_prefect/tasks.py b/src/deploy_prefect/tasks.py index e1caeed..17df5ef 100644 --- a/src/deploy_prefect/tasks.py +++ b/src/deploy_prefect/tasks.py @@ -76,12 +76,14 @@ def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: @task # type: ignore[misc] -def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: +def upload_data_to_dune( + data: str, block_start: int, block_end: int, config: Config +) -> str: """ Uploads the order rewards data to Dune, either creating a new query or updating an existing one """ - table_name = f"order_rewards_{block_start}" + table_name = f"order_rewards_{config.env}_{block_start}" dune = DuneClient.from_env() dune.upload_csv( # type: ignore[attr-defined] data=data, From af99bfe310b567b5625cb4fde45db9dcd807ce84 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 4 Nov 2024 14:58:57 +0100 Subject: [PATCH 4/6] Bug fixes --- src/deploy_prefect/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/deploy_prefect/models.py b/src/deploy_prefect/models.py index adc91d8..9d71771 100644 --- a/src/deploy_prefect/models.py +++ b/src/deploy_prefect/models.py @@ -90,11 +90,11 @@ def dune_query_id(self) -> str: return self._dune_query_id @property - def env(self) ->: + def env(self) -> str: """ENV Getter""" return self._env @property - def chain(self) ->: + def chain(self) -> str: """Chain Getter""" return self._chain From 88e70b0cd1a2e90644634913ea8c95b8d767d7de Mon Sep 17 00:00:00 2001 From: = Date: Mon, 4 Nov 2024 15:27:15 +0100 Subject: [PATCH 5/6] Fix flow --- src/deploy_prefect/flows.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/deploy_prefect/flows.py b/src/deploy_prefect/flows.py index a317c25..4406c97 100644 --- a/src/deploy_prefect/flows.py +++ b/src/deploy_prefect/flows.py @@ -20,7 +20,9 @@ def dev_order_rewards() -> None: blockrange = get_block_range() orderbook = fetch_orderbook(blockrange) data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + table_name = upload_data_to_dune( + data, blockrange.block_from, blockrange.block_to, config + ) update_aggregate_query(table_name, config) @@ -31,5 +33,7 @@ def prod_order_rewards() -> None: blockrange = get_block_range() orderbook = fetch_orderbook(blockrange) data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + table_name = upload_data_to_dune( + data, blockrange.block_from, blockrange.block_to, config + ) update_aggregate_query(table_name, config) From 54e22cf2c518820c66824ded0822c1fbddd765ff Mon Sep 17 00:00:00 2001 From: = Date: Mon, 4 Nov 2024 15:46:14 +0100 Subject: [PATCH 6/6] Update uploaded file name --- src/deploy_prefect/flows.py | 4 +--- src/deploy_prefect/tasks.py | 8 ++++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/deploy_prefect/flows.py b/src/deploy_prefect/flows.py index 4406c97..62b70a6 100644 --- a/src/deploy_prefect/flows.py +++ b/src/deploy_prefect/flows.py @@ -33,7 +33,5 @@ def prod_order_rewards() -> None: blockrange = get_block_range() orderbook = fetch_orderbook(blockrange) data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune( - data, blockrange.block_from, blockrange.block_to, config - ) + table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) update_aggregate_query(table_name, config) diff --git a/src/deploy_prefect/tasks.py b/src/deploy_prefect/tasks.py index 17df5ef..0896b15 100644 --- a/src/deploy_prefect/tasks.py +++ b/src/deploy_prefect/tasks.py @@ -2,6 +2,7 @@ import os from io import StringIO +from typing import Optional from datetime import datetime, timezone import requests @@ -77,13 +78,16 @@ def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: @task # type: ignore[misc] def upload_data_to_dune( - data: str, block_start: int, block_end: int, config: Config + data: str, block_start: int, block_end: int, config: Optional[Config] = None ) -> str: """ Uploads the order rewards data to Dune, either creating a new query or updating an existing one """ - table_name = f"order_rewards_{config.env}_{block_start}" + if config: + table_name = f"order_rewards_{config.env}_{block_start}" + else: + table_name = f"order_rewards_{block_start}" dune = DuneClient.from_env() dune.upload_csv( # type: ignore[attr-defined] data=data,