diff --git a/.github/workflows/prefect.yaml b/.github/workflows/prefect.yaml deleted file mode 100644 index a9773433..00000000 --- a/.github/workflows/prefect.yaml +++ /dev/null @@ -1,27 +0,0 @@ -name: prefect -on: - push: - branches: [ main ] -jobs: - deployment: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.12"] - - steps: - - uses: actions/checkout@v2 - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - - name: Deploy prefect deployment - env: - PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} - run: | - python -m pip install --upgrade pip - pip install -r requirements/prefect.txt - pip install -r requirements/prod.txt - prefect config set PREFECT_API_URL=$PREFECT_API_URL - python -m src.deploy_prefect.deployment diff --git a/Makefile b/Makefile index 98873dc6..a22559c2 100644 --- a/Makefile +++ b/Makefile @@ -48,22 +48,3 @@ build-image: run-image: echo "using ${PWD}/data" docker run -v ${PWD}/data:/app/data --env-file .env local_dune_sync - -prefect: - @if [ -z "$(VIRTUAL_ENV)" ]; then \ - echo "Error: Not in a virtual environment. Please activate a virtual environment before running this command."; \ - exit 1; \ - else \ - pip install -r requirements/prefect.txt; \ - prefect server start; \ - fi - -deployment: - @if [ -z "$(VIRTUAL_ENV)" ]; then \ - echo "Error: Not in a virtual environment. Please activate a virtual environment before running this command."; \ - exit 1; \ - else \ - pip install -r requirements/prefect.txt; \ - pip install -r requirements/prod.txt; \ - python -m src.deploy_prefect.local_deploy; \ - fi diff --git a/requirements/prefect.txt b/requirements/prefect.txt deleted file mode 100644 index 1d36828d..00000000 --- a/requirements/prefect.txt +++ /dev/null @@ -1,5 +0,0 @@ -prefect==3.0.2 -psycopg2==2.9.9 -dune_client==1.7.7 -requests==2.32.3 -prefect-github==0.3.0 diff --git a/src/deploy_prefect/__init__.py b/src/deploy_prefect/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/deploy_prefect/deployment.py b/src/deploy_prefect/deployment.py deleted file mode 100644 index b13b6c4a..00000000 --- a/src/deploy_prefect/deployment.py +++ /dev/null @@ -1,166 +0,0 @@ -"""Prefect Deployment for Order Rewards Data""" - -import os -from io import StringIO -from datetime import datetime, timedelta, timezone - -import requests -import pandas as pd -from dotenv import load_dotenv -from dune_client.client import DuneClient - -# pylint: disable=import-error -from prefect import flow, task, get_run_logger # type: ignore - -from prefect.runner.storage import GitRepository # type: ignore - -from src.models.block_range import BlockRange -from src.fetch.orderbook import OrderbookFetcher - -load_dotenv() - - -def get_last_monday_midnight_utc() -> int: - """Get the timestamp of last monday at midnight UTC""" - now = datetime.now(timezone.utc) - current_weekday = now.weekday() - days_since_last_monday = current_weekday if current_weekday != 0 else 7 - last_monday = now - timedelta(days=days_since_last_monday) - last_monday_midnight = last_monday.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - timestamp = int(last_monday_midnight.timestamp()) - return timestamp - - -@task # type: ignore[misc] -def get_block_range() -> BlockRange: - """Returns the blockrange from last monday midnight until now""" - etherscan_api = "https://api.etherscan.io/api" - api_key = os.environ["ETHERSCAN_API_KEY"] - start = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": get_last_monday_midnight_utc(), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - end = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": int(datetime.now(timezone.utc).timestamp()), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - - blockrange = BlockRange(block_from=start, block_to=end) - return blockrange - - -@task # type: ignore[misc] -def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: - """Runs the query to get the order book for a specified blockrange""" - orderbook = OrderbookFetcher() - return orderbook.get_order_rewards(blockrange) - - -@task # type: ignore[misc] -def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: - """Casts the dataframe to a string in csv format for uploading to Dune""" - csv_buffer = StringIO() - orderbook.to_csv(csv_buffer, index=False) - return csv_buffer.getvalue() - - -@task # type: ignore[misc] -def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: - """ - Uploads the order rewards data to Dune, - either creating a new query or updating an existing one - """ - table_name = f"order_rewards_{block_start}" - dune = DuneClient.from_env() - dune.upload_csv( # type: ignore[attr-defined] - data=data, - description=f"Order rewards data for blocks {block_start}-{block_end}", - table_name=table_name, - is_private=False, - ) - return table_name - - -@task # type: ignore[misc] -def update_aggregate_query(table_name: str) -> None: - """ - Query example: - WITH aggregate AS ( - SELECT * FROM dune.cowprotocol.order_rewards_1 - UNION ALL - SELECT * FROM dune.cowprotocol.order_rewards_2 - ) - - SELECT DISTINCT * FROM aggregate; - """ - - logger = get_run_logger() - dune = DuneClient.from_env() - query_id = os.environ["AGGREGATE_QUERY_ID"] - query = dune.get_query(query_id) # type: ignore[attr-defined] - sql_query = query.sql - - if table_name not in sql_query: - logger.info(f"Table name not found, updating table with {table_name}") - insertion_point = insertion_point = sql_query.rfind(")") - updated_sql_query = ( - sql_query[:insertion_point].strip() - + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" - + sql_query[insertion_point:] - ) - dune.update_query( # type: ignore[attr-defined] - query_sql=updated_sql_query, query_id=query_id - ) - else: - logger.info("Table already in query, not updating query") - - -@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] -def order_rewards() -> None: - """Defines a flow for updating the order_rewards table""" - blockrange = get_block_range() - orderbook = fetch_orderbook(blockrange) - data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) - update_aggregate_query(table_name) - - -if __name__ == "__main__": - git_source = GitRepository( - url="https://github.com/cowprotocol/dune-sync.git", - ) - flow.from_source( - source=git_source, - entrypoint="src/deploy_prefect/deployment.py:order_rewards", - ).deploy( - name="dune-sync-prod-order-rewards", - work_pool_name="cowbarn", - cron="*/30 * * * *", # Every 30 minutes - tags=["solver", "dune-sync"], - description="Run the dune sync order_rewards query", - version="0.0.1", - ) diff --git a/src/deploy_prefect/local_deploy.py b/src/deploy_prefect/local_deploy.py deleted file mode 100644 index 92700b59..00000000 --- a/src/deploy_prefect/local_deploy.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Code for Local Testing of Order Rewards Deployment""" - -import os - -# pylint: disable=import-error -from prefect import flow # type: ignore -from dotenv import load_dotenv -from src.deploy_prefect.deployment import ( - get_block_range, - fetch_orderbook, - cast_orderbook_to_dune_string, - upload_data_to_dune, -) - -load_dotenv() - - -@flow() # type: ignore[misc] -def order_rewards() -> None: - """Local flow for testing the order rewards deployment""" - blockrange = get_block_range() - orderbook = fetch_orderbook(blockrange) - data = cast_orderbook_to_dune_string(orderbook) - upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) - - -if __name__ == "__main__": - # Not ideal, but this script is for local testing - os.environ["PREFECT_SERVER_API_HOST"] = "0.0.0.0" - os.environ["PREFECT_SERVER__TELEMETRY__ENABLED"] = "false" - os.environ["PREFECT_API_URL"] = "http://localhost:4200/api" - os.environ["PREFECT_LOGGING_LEVEL"] = "INFO" - - order_rewards.serve( - name="dune-sync-prod-order-rewards", - cron="*/30 * * * *", # Every 30 minutes - tags=["solver", "dune-sync"], - description="Run the dune sync order_rewards query", - version="0.0.1", - )