diff --git a/.github/workflows/prefect-dev.yaml b/.github/workflows/prefect-dev.yaml new file mode 100644 index 0000000..95b9d6b --- /dev/null +++ b/.github/workflows/prefect-dev.yaml @@ -0,0 +1,27 @@ +name: prefect-dev +on: + push: + branches: [ dev ] +jobs: + dev-deployment: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12"] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Deploy prefect deployment + env: + PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} + run: | + python -m pip install --upgrade pip + pip install -r requirements/prefect.txt + pip install -r requirements/prod.txt + prefect config set PREFECT_API_URL=$PREFECT_API_URL + python -m src.deploy_prefect.dev_deployment diff --git a/.github/workflows/prefect.yaml b/.github/workflows/prefect-prod.yaml similarity index 87% rename from .github/workflows/prefect.yaml rename to .github/workflows/prefect-prod.yaml index a977343..795c181 100644 --- a/.github/workflows/prefect.yaml +++ b/.github/workflows/prefect-prod.yaml @@ -1,9 +1,9 @@ -name: prefect +name: prefect-prod on: push: branches: [ main ] jobs: - deployment: + prod-deployment: runs-on: ubuntu-latest strategy: matrix: @@ -24,4 +24,4 @@ jobs: pip install -r requirements/prefect.txt pip install -r requirements/prod.txt prefect config set PREFECT_API_URL=$PREFECT_API_URL - python -m src.deploy_prefect.deployment + python -m src.deploy_prefect.prod_deployment diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 5a84816..a27bec2 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -49,3 +49,27 @@ jobs: env: PROD_DB_URL: ${{ secrets.PROD_DB_URL }} BARN_DB_URL: ${{ secrets.BARN_DB_URL }} + + prefect-pr-test-run: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12"] + + env: + PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} + BRANCH_NAME: ${{ github.head_ref }} + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - run: | + python -m pip install --upgrade pip + pip install -r requirements/prefect.txt + pip install -r requirements/prod.txt + prefect config set PREFECT_API_URL=$PREFECT_API_URL + python -m src.deploy_prefect.pr_deployment + prefect deployment run 'dev-order-rewards/dune-sync-pr-order-rewards' diff --git a/src/deploy_prefect/dev_deployment.py b/src/deploy_prefect/dev_deployment.py new file mode 100644 index 0000000..76fd031 --- /dev/null +++ b/src/deploy_prefect/dev_deployment.py @@ -0,0 +1,23 @@ +"""Prefect Deployment for Order Rewards Data""" +# pylint: disable=import-error +from prefect import flow # type: ignore +from prefect.runner.storage import GitRepository # type: ignore + + +# pylint: disable=duplicate-code +if __name__ == "__main__": + git_source = GitRepository( + url="https://github.com/cowprotocol/dune-sync.git", + branch="dev", + ) + flow.from_source( + source=git_source, + entrypoint="src/deploy_prefect/flows.py:dev_order_rewards", + ).deploy( + name="dune-sync-dev-order-rewards", + work_pool_name="cowbarn", + cron="*/30 * * * *", # Every 30 minutes + tags=["dev", "solver", "dune-sync"], + description="Run the dune sync order_rewards query", + version="0.0.2", + ) diff --git a/src/deploy_prefect/flows.py b/src/deploy_prefect/flows.py new file mode 100644 index 0000000..62b70a6 --- /dev/null +++ b/src/deploy_prefect/flows.py @@ -0,0 +1,37 @@ +"""Prefect Deployment for Order Rewards Data""" +# pylint: disable=import-error +from prefect import flow # type: ignore + +from src.deploy_prefect.tasks import ( + get_block_range, + fetch_orderbook, + cast_orderbook_to_dune_string, + upload_data_to_dune, + update_aggregate_query, +) +from src.deploy_prefect.models import ENV, CHAIN, Config + + +@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] +def dev_order_rewards() -> None: + """Defines a flow for updating the order_rewards table""" + config = Config(CHAIN.MAINNET, ENV.DEV) + + blockrange = get_block_range() + orderbook = fetch_orderbook(blockrange) + data = cast_orderbook_to_dune_string(orderbook) + table_name = upload_data_to_dune( + data, blockrange.block_from, blockrange.block_to, config + ) + update_aggregate_query(table_name, config) + + +@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] +def prod_order_rewards() -> None: + """Defines a flow for updating the order_rewards table""" + config = Config(CHAIN.MAINNET, ENV.PROD) + blockrange = get_block_range() + orderbook = fetch_orderbook(blockrange) + data = cast_orderbook_to_dune_string(orderbook) + table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + update_aggregate_query(table_name, config) diff --git a/src/deploy_prefect/local_deploy.py b/src/deploy_prefect/local_deploy.py index 92700b5..a4e0a21 100644 --- a/src/deploy_prefect/local_deploy.py +++ b/src/deploy_prefect/local_deploy.py @@ -5,7 +5,7 @@ # pylint: disable=import-error from prefect import flow # type: ignore from dotenv import load_dotenv -from src.deploy_prefect.deployment import ( +from src.deploy_prefect.tasks import ( get_block_range, fetch_orderbook, cast_orderbook_to_dune_string, diff --git a/src/deploy_prefect/models.py b/src/deploy_prefect/models.py new file mode 100644 index 0000000..9d71771 --- /dev/null +++ b/src/deploy_prefect/models.py @@ -0,0 +1,100 @@ +"""Dataclasses for the prefect deployments""" +import os +from enum import Enum +from dataclasses import dataclass, field + +from dotenv import load_dotenv + +load_dotenv() + + +class ENV(Enum): + """ + Enum ENV class to change environment variables for DEV And PROD + """ + + DEV = "DEV" + PR = "PR" + PROD = "PROD" + + def is_dev(self) -> bool: + """Check if the environment is DEV.""" + return self == ENV.DEV + + def is_pr(self) -> bool: + """Check if the environment is PR.""" + return self == ENV.PR + + def is_prod(self) -> bool: + """Check if the environment is PROD.""" + return self == ENV.PROD + + +class CHAIN(Enum): + """ + Enum CHAIN class to change environment variables different chains + """ + + ARBITRUM = "ARBITRUM" + GNOSIS = "GNOSIS" + MAINNET = "MAINNET" + + def is_arbitrum(self) -> bool: + """Check if the chain is Arbitrum""" + return self == CHAIN.ARBITRUM + + def is_gnosis(self) -> bool: + """Check if the chain is Gnosis""" + return self == CHAIN.GNOSIS + + def is_mainnet(self) -> bool: + """Check if the chain is mainnet""" + return self == CHAIN.MAINNET + + +@dataclass(frozen=True) +class Config: + """ + Config dataclass to setup config based on chain&env combination. + """ + + _chain: CHAIN + _env: ENV + + _dune_query_id: str = field(init=False) + _etherscan_api_key: str = field(init=False) + + def __post_init__(self) -> None: + etherscan_api_value = os.environ["ETHERSCAN_API_KEY"] + + if self._env.is_dev(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_DEV_ID"] + elif self._env.is_pr(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_PR_ID"] + elif self._env.is_prod(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_ID"] + else: + raise ValueError("ENV is neither DEV, PR, nor PROD") + + object.__setattr__(self, "_etherscan_api_key", etherscan_api_value) + object.__setattr__(self, "_dune_query_id", dune_query_id_value) + + @property + def etherscan_api_key(self) -> str: + """Etherscan API Key Getter""" + return self._etherscan_api_key + + @property + def dune_query_id(self) -> str: + """Dune Aggregate Query Getter""" + return self._dune_query_id + + @property + def env(self) -> str: + """ENV Getter""" + return self._env + + @property + def chain(self) -> str: + """Chain Getter""" + return self._chain diff --git a/src/deploy_prefect/pr_deployment.py b/src/deploy_prefect/pr_deployment.py new file mode 100644 index 0000000..ad8d3f5 --- /dev/null +++ b/src/deploy_prefect/pr_deployment.py @@ -0,0 +1,25 @@ +"""Prefect Deployment for Order Rewards Data""" +import os + +# pylint: disable=import-error +from prefect import flow # type: ignore +from prefect.runner.storage import GitRepository # type: ignore + + +# pylint: disable=duplicate-code +if __name__ == "__main__": + branch_name = os.getenv("BRANCH_NAME") + git_source = GitRepository( + url="https://github.com/cowprotocol/dune-sync.git", + branch=branch_name, + ) + flow.from_source( + source=git_source, + entrypoint="src/deploy_prefect/flows.py:dev_order_rewards", + ).deploy( + name="dune-sync-pr-order-rewards", + work_pool_name="cowbarn", + tags=["dev", "solver", "dune-sync"], + description="Run the dune sync order_rewards query", + version="0.0.2", + ) diff --git a/src/deploy_prefect/prod_deployment.py b/src/deploy_prefect/prod_deployment.py new file mode 100644 index 0000000..3453ab0 --- /dev/null +++ b/src/deploy_prefect/prod_deployment.py @@ -0,0 +1,22 @@ +"""Prefect Deployment for Order Rewards Data""" +# pylint: disable=import-error +from prefect import flow # type: ignore +from prefect.runner.storage import GitRepository # type: ignore + + +# pylint: disable=duplicate-code +if __name__ == "__main__": + git_source = GitRepository( + url="https://github.com/cowprotocol/dune-sync.git", + ) + flow.from_source( + source=git_source, + entrypoint="src/deploy_prefect/flows.py:prod_order_rewards", + ).deploy( + name="dune-sync-prod-order-rewards", + work_pool_name="cowbarn", + cron="*/30 * * * *", # Every 30 minutes + tags=["prod", "solver", "dune-sync"], + description="Run the dune sync order_rewards query", + version="0.0.2", + ) diff --git a/src/deploy_prefect/deployment.py b/src/deploy_prefect/tasks.py similarity index 62% rename from src/deploy_prefect/deployment.py rename to src/deploy_prefect/tasks.py index b13b6c4..0896b15 100644 --- a/src/deploy_prefect/deployment.py +++ b/src/deploy_prefect/tasks.py @@ -2,7 +2,8 @@ import os from io import StringIO -from datetime import datetime, timedelta, timezone +from typing import Optional +from datetime import datetime, timezone import requests import pandas as pd @@ -10,29 +11,16 @@ from dune_client.client import DuneClient # pylint: disable=import-error -from prefect import flow, task, get_run_logger # type: ignore - -from prefect.runner.storage import GitRepository # type: ignore +from prefect import task, get_run_logger # type: ignore from src.models.block_range import BlockRange from src.fetch.orderbook import OrderbookFetcher +from src.deploy_prefect.utils import get_last_monday_midnight_utc +from src.deploy_prefect.models import Config load_dotenv() -def get_last_monday_midnight_utc() -> int: - """Get the timestamp of last monday at midnight UTC""" - now = datetime.now(timezone.utc) - current_weekday = now.weekday() - days_since_last_monday = current_weekday if current_weekday != 0 else 7 - last_monday = now - timedelta(days=days_since_last_monday) - last_monday_midnight = last_monday.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - timestamp = int(last_monday_midnight.timestamp()) - return timestamp - - @task # type: ignore[misc] def get_block_range() -> BlockRange: """Returns the blockrange from last monday midnight until now""" @@ -89,12 +77,17 @@ def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: @task # type: ignore[misc] -def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: +def upload_data_to_dune( + data: str, block_start: int, block_end: int, config: Optional[Config] = None +) -> str: """ Uploads the order rewards data to Dune, either creating a new query or updating an existing one """ - table_name = f"order_rewards_{block_start}" + if config: + table_name = f"order_rewards_{config.env}_{block_start}" + else: + table_name = f"order_rewards_{block_start}" dune = DuneClient.from_env() dune.upload_csv( # type: ignore[attr-defined] data=data, @@ -106,7 +99,7 @@ def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: @task # type: ignore[misc] -def update_aggregate_query(table_name: str) -> None: +def update_aggregate_query(table_name: str, config: Config) -> None: """ Query example: WITH aggregate AS ( @@ -120,8 +113,7 @@ def update_aggregate_query(table_name: str) -> None: logger = get_run_logger() dune = DuneClient.from_env() - query_id = os.environ["AGGREGATE_QUERY_ID"] - query = dune.get_query(query_id) # type: ignore[attr-defined] + query = dune.get_query(config.dune_query_id) # type: ignore[attr-defined] sql_query = query.sql if table_name not in sql_query: @@ -133,34 +125,7 @@ def update_aggregate_query(table_name: str) -> None: + sql_query[insertion_point:] ) dune.update_query( # type: ignore[attr-defined] - query_sql=updated_sql_query, query_id=query_id + query_sql=updated_sql_query, query_id=config.dune_query_id ) else: logger.info("Table already in query, not updating query") - - -@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] -def order_rewards() -> None: - """Defines a flow for updating the order_rewards table""" - blockrange = get_block_range() - orderbook = fetch_orderbook(blockrange) - data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) - update_aggregate_query(table_name) - - -if __name__ == "__main__": - git_source = GitRepository( - url="https://github.com/cowprotocol/dune-sync.git", - ) - flow.from_source( - source=git_source, - entrypoint="src/deploy_prefect/deployment.py:order_rewards", - ).deploy( - name="dune-sync-prod-order-rewards", - work_pool_name="cowbarn", - cron="*/30 * * * *", # Every 30 minutes - tags=["solver", "dune-sync"], - description="Run the dune sync order_rewards query", - version="0.0.1", - ) diff --git a/src/deploy_prefect/utils.py b/src/deploy_prefect/utils.py new file mode 100644 index 0000000..1d8ab0d --- /dev/null +++ b/src/deploy_prefect/utils.py @@ -0,0 +1,15 @@ +"""Prefect Deployment for Order Rewards Data""" +from datetime import datetime, timedelta, timezone + + +def get_last_monday_midnight_utc() -> int: + """Get the timestamp of last monday at midnight UTC""" + now = datetime.now(timezone.utc) + current_weekday = now.weekday() + days_since_last_monday = current_weekday if current_weekday != 0 else 7 + last_monday = now - timedelta(days=days_since_last_monday) + last_monday_midnight = last_monday.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + timestamp = int(last_monday_midnight.timestamp()) + return timestamp