diff --git a/.coveragerc b/.coveragerc index 7b815a0..4936a25 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,4 +1,4 @@ [run] branch = True data_file = .coverage -source=edx_prefectutils +source=edx_argoutils diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md index 480e412..eb8a4fd 100644 --- a/.github/ISSUE_TEMPLATE.md +++ b/.github/ISSUE_TEMPLATE.md @@ -1,4 +1,4 @@ -* edx-prefectutils version: +* edx-argoutils version: * Python version: * Operating System: diff --git a/AUTHORS.rst b/AUTHORS.rst index 27c46b6..e41b875 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -5,7 +5,7 @@ Credits Development Lead ---------------- -* Julia Eskew +* Abdul Rafey Contributors ------------ diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 6e052f9..0555c75 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -15,7 +15,7 @@ Types of Contributions Report Bugs ~~~~~~~~~~~ -Report bugs at https://github.com/edx/edx-prefectutils/issues. +Report bugs at https://github.com/2uinc/edx-argoutils/issues. If you are reporting a bug, please include: @@ -38,14 +38,14 @@ and "help wanted" is open to whoever wants to implement it. Write Documentation ~~~~~~~~~~~~~~~~~~~ -edx-prefectutils could always use more documentation, whether as part of the -official edx-prefectutils docs, in docstrings, or even on the web in blog posts, +edx-argoutils could always use more documentation, whether as part of the +official edx-argoutils docs, in docstrings, or even on the web in blog posts, articles, and such. Submit Feedback ~~~~~~~~~~~~~~~ -The best way to send feedback is to file an issue at https://github.com/edx/edx-prefectutils/issues. +The best way to send feedback is to file an issue at https://github.com/2uinc/edx-argoutils/issues. If you are proposing a feature: @@ -57,17 +57,17 @@ If you are proposing a feature: Get Started! ------------ -Ready to contribute? Here's how to set up `edx-prefectutils` for local development. +Ready to contribute? Here's how to set up `edx-argoutils` for local development. -1. Fork the `edx-prefectutils` repo on GitHub. +1. Fork the `edx-argoutils` repo on GitHub. 2. Clone your fork locally:: - $ git clone git@github.com:your_name_here/edx-prefectutils.git + $ git clone git@github.com:your_name_here/edx-argoutils.git 3. Install your local copy into a virtualenv. Assuming you have virtualenvwrapper installed, this is how you set up your fork for local development:: - $ mkvirtualenv edx-prefectutils - $ cd edx-prefectutils/ + $ mkvirtualenv edx-argoutils + $ cd edx-argoutils/ $ python setup.py develop 4. Create a branch for local development:: @@ -79,7 +79,7 @@ Ready to contribute? Here's how to set up `edx-prefectutils` for local developme 5. When you're done making changes, check that your changes pass flake8 and the tests, including testing other Python versions with tox:: - $ flake8 edx_prefectutils tests + $ flake8 edx_argoutils tests $ python setup.py test or pytest $ tox @@ -103,7 +103,7 @@ Before you submit a pull request, check that it meets these guidelines: your new functionality into a function with a docstring, and add the feature to the list in README.rst. 3. The pull request should work for Python 3.5, 3.6, 3.7 and 3.8, and for PyPy. Check - https://github.com/edx/edx-prefectutils/actions?query=workflow%3A%22Python+CI%22 + https://github.com/2uinc/edx-argoutils/actions?query=workflow%3A%22Python+CI%22 and make sure that the tests pass for all supported Python versions. Tips @@ -111,7 +111,7 @@ Tips To run a subset of tests:: -$ pytest tests.test_edx_prefectutils +$ pytest tests.test_edx_argoutils Deploying diff --git a/HISTORY.rst b/HISTORY.rst index aa2ba1f..56b29fd 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,7 +2,7 @@ History ======= -0.1.0 (2020-05-11) +0.1.0 (2024-11-07) ------------------ * First release on PyPI. diff --git a/Makefile b/Makefile index 145c3c0..3911d20 100644 --- a/Makefile +++ b/Makefile @@ -48,7 +48,7 @@ clean-test: ## remove test and coverage artifacts rm -fr .pytest_cache lint: ## check style with flake8 - flake8 edx_prefectutils tests + flake8 edx_argoutils tests test: ## run tests quickly with the default Python pytest @@ -57,15 +57,15 @@ test-all: ## run tests on every Python version with tox tox coverage: ## check code coverage quickly with the default Python - coverage run --source edx_prefectutils -m pytest + coverage run --source edx_argoutils -m pytest coverage report -m coverage html $(BROWSER) htmlcov/index.html docs: ## generate Sphinx HTML documentation, including API docs - rm -f docs/edx_prefectutils.rst + rm -f docs/edx_argoutils.rst rm -f docs/modules.rst - sphinx-apidoc -o docs/ edx_prefectutils + sphinx-apidoc -o docs/ edx_argoutils $(MAKE) -C docs clean $(MAKE) -C docs html $(BROWSER) docs/_build/html/index.html diff --git a/README.rst b/README.rst index 6ad6e8e..8d89790 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,5 @@ ================ -edx-prefectutils +edx-argoutils ================ @@ -7,7 +7,7 @@ edx-prefectutils -A utility code written by edX specifically for edX Prefect Flows. It assists writing in edX Prefect Flows. +A utility code written by edX specifically for edX Argo Flows. It assists writing in edX Argo Flows. @@ -20,14 +20,11 @@ PyPI Package Release -------------------- - This repository publishes packages on PyPI, when you update this repository make sure to create a new release so latest changes would be published on PyPI and become available for use. - Bump the `version`_ to match the version that will be released. Once the pull request with the updated version is merged into master, create a new release from GitHub Web UI, using the same version number. You can refer to `How to create GitHub release`_ for more information on this process. -- `edx-prefectutils PyPI`_ package is used in `prefect-flows`_ repository, you will have to update edx-prefectutils package version in `requirements`_ so it will pick latest PyPI package version. -.. _`version`: https://github.com/edx/edx-prefectutils/blob/master/edx_prefectutils/__init__.py#L5 -.. _`How to create GitHub release`: https://docs.github.com/en/github/administering-a-repository/releasing-projects-on-github/managing-releases-in-a-repository -.. _`edx-prefectutils PyPI`: https://pypi.org/project/edx-prefectutils/ -.. _`prefect-flows`: https://github.com/edx/prefect-flows -.. _`requirements`: https://github.com/edx/prefect-flows/blob/master/requirements.txt#L7 +.. _`version`: https://github.com/2uinc/edx-argoutils/blob/master/edx_argoutils/__init__.py#L5 +.. _`How to create GitHub release`: https://docs.github.com/en/github/administering-a-repository/releasing-projects-on-github/managing-releases-in-a-repository +.. _`edx-argoutils PyPI`: https://pypi.org/project/edx-argoutils/ Credits diff --git a/docs/Makefile b/docs/Makefile index bebcfed..46b8533 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -4,7 +4,7 @@ # You can set these variables from the command line. SPHINXOPTS = SPHINXBUILD = python -msphinx -SPHINXPROJ = edx_prefectutils +SPHINXPROJ = edx_argoutils SOURCEDIR = . BUILDDIR = _build diff --git a/docs/conf.py b/docs/conf.py index a904ff7..eea4b4d 100755 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# edx_prefectutils documentation build configuration file, created by +# edx_argoutils documentation build configuration file, created by # sphinx-quickstart on Fri Jun 9 13:47:02 2017. # # This file is execfile()d with the current directory set to its @@ -21,7 +21,7 @@ import sys sys.path.insert(0, os.path.abspath('..')) -import edx_prefectutils +import edx_argoutils # -- General configuration --------------------------------------------- @@ -46,18 +46,18 @@ top_level_doc = 'index' # General information about the project. -project = 'edx-prefectutils' -copyright = "2020, Julia Eskew" -author = "Julia Eskew" +project = 'edx-argoutils' +copyright = "2024, Abdul Rafey" +author = "Abdul Rafey" # The version info for the project you're documenting, acts as replacement # for |version| and |release|, also used in various other places throughout # the built documents. # # The short X.Y version. -version = edx_prefectutils.__version__ +version = edx_argoutils.__version__ # The full version, including alpha/beta/rc tags. -release = edx_prefectutils.__version__ +release = edx_argoutils.__version__ # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -100,7 +100,7 @@ # -- Options for HTMLHelp output --------------------------------------- # Output file base name for HTML help builder. -htmlhelp_basename = 'edx_prefectutilsdoc' +htmlhelp_basename = 'edx_argoutilsdoc' # -- Options for LaTeX output ------------------------------------------ @@ -127,9 +127,9 @@ # (source start file, target name, title, author, documentclass # [howto, manual, or own class]). latex_documents = [ - (top_level_doc, 'edx_prefectutils.tex', - 'edx-prefectutils Documentation', - 'Julia Eskew', 'manual'), + (top_level_doc, 'edx_argoutils.tex', + 'edx-argoutils Documentation', + 'Abdul Rafey', 'manual'), ] @@ -138,8 +138,8 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - (top_level_doc, 'edx_prefectutils', - 'edx-prefectutils Documentation', + (top_level_doc, 'edx_argoutils', + 'edx-argoutils Documentation', [author], 1) ] @@ -150,10 +150,10 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (top_level_doc, 'edx_prefectutils', - 'edx-prefectutils Documentation', + (top_level_doc, 'edx_argoutils', + 'edx-argoutils Documentation', author, - 'edx_prefectutils', + 'edx_argoutils', 'One line description of project.', 'Miscellaneous'), ] diff --git a/docs/index.rst b/docs/index.rst index 6ee957c..657cb66 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,4 +1,4 @@ -Welcome to edx-prefectutils's documentation! +Welcome to edx-argoutils's documentation! ========================================= .. toctree:: diff --git a/docs/installation.rst b/docs/installation.rst index 85b2a40..2dcd72f 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -8,13 +8,13 @@ Installation Stable release -------------- -To install edx-prefectutils, run this command in your terminal: +To install edx-argoutils, run this command in your terminal: .. code-block:: console - $ pip install edx_prefectutils + $ pip install edx_argoutils -This is the preferred method to install edx-prefectutils, as it will always install the most recent stable release. +This is the preferred method to install edx-argoutils, as it will always install the most recent stable release. If you don't have `pip`_ installed, this `Python installation guide`_ can guide you through the process. @@ -26,19 +26,19 @@ you through the process. From sources ------------ -The sources for edx-prefectutils can be downloaded from the `Github repo`_. +The sources for edx-argoutils can be downloaded from the `Github repo`_. You can either clone the public repository: .. code-block:: console - $ git clone git://github.com/edx/edx-prefectutils + $ git clone git://github.com/2uinc/edx-argoutils Or download the `tarball`_: .. code-block:: console - $ curl -OJL https://github.com/edx/edx-prefectutils/tarball/master + $ curl -OJL https://github.com/2uinc/edx-argoutils/tarball/master Once you have a copy of the source, you can install it with: @@ -47,5 +47,5 @@ Once you have a copy of the source, you can install it with: $ python setup.py install -.. _Github repo: https://github.com/edx/edx-prefectutils -.. _tarball: https://github.com/edx/edx-prefectutils/tarball/master +.. _Github repo: https://github.com/2uinc/edx-argoutils +.. _tarball: https://github.com/2uinc/edx-argoutils/tarball/master diff --git a/docs/make.bat b/docs/make.bat index 5b7a6bd..c5c12af 100644 --- a/docs/make.bat +++ b/docs/make.bat @@ -9,7 +9,7 @@ if "%SPHINXBUILD%" == "" ( ) set SOURCEDIR=. set BUILDDIR=_build -set SPHINXPROJ=edx_prefectutils +set SPHINXPROJ=edx_argoutils if "%1" == "" goto help diff --git a/docs/usage.rst b/docs/usage.rst index 47d996b..be4f3cc 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -2,6 +2,6 @@ Usage ===== -To use edx-prefectutils in a project:: +To use edx-argoutils in a project:: - import edx_prefectutils + import edx_argoutils diff --git a/edx_argoutils/__init__.py b/edx_argoutils/__init__.py new file mode 100644 index 0000000..92dc86a --- /dev/null +++ b/edx_argoutils/__init__.py @@ -0,0 +1,5 @@ +""" +Top-level package for edx-argoutils. +""" + +__version__ = '1.0.19' diff --git a/edx_prefectutils/bigquery.py b/edx_argoutils/bigquery.py similarity index 100% rename from edx_prefectutils/bigquery.py rename to edx_argoutils/bigquery.py diff --git a/edx_argoutils/common.py b/edx_argoutils/common.py new file mode 100644 index 0000000..13f2b00 --- /dev/null +++ b/edx_argoutils/common.py @@ -0,0 +1,151 @@ +""" +Utility functions for use in Argo flows. +""" + +import itertools +import re +import six +from opaque_keys import InvalidKeyError +from opaque_keys.edx.keys import CourseKey +from datetime import datetime, timedelta, date + + + +def get_date(date: str): + """ + Return today's date string if date is None. Otherwise return the passed parameter value. + """ + if date is None: + return datetime.today().strftime('%Y-%m-%d') + else: + return date + + +def generate_dates(start_date: str, end_date: str, date_format: str = "%Y%m%d"): + """ + Generates a list of date strings in the format specified by `date_format` from + start_date up to but excluding end_date. + """ + if not start_date: + start_date = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") + if not end_date: + end_date = datetime.today().strftime("%Y-%m-%d") + + parsed_start_date = datetime.strptime(start_date, "%Y-%m-%d") + parsed_end_date = datetime.strptime(end_date, "%Y-%m-%d") + dates = [] + while parsed_start_date < parsed_end_date: + dates.append(parsed_start_date) + parsed_start_date = parsed_start_date + timedelta(days=1) + + return [date.strftime(date_format) for date in dates] + + +def generate_month_start_dates(start_date: str, end_date: str, date_format: str = "%Y-%m-%d"): + """ + Return a list of first days of months within the specified date range. + If start_date or end_date is not provided, defaults to yesterday or today respectively. + """ + if not start_date: + start_date = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") + if not end_date: + end_date = datetime.today().strftime("%Y-%m-%d") + + # Since our intention is to extract first day of months, we will start by modifying the start and end date + # to represent the first day of month. + parsed_start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(day=1) + parsed_end_date = datetime.strptime(end_date, "%Y-%m-%d").replace(day=1) + dates = [] + current_date = parsed_start_date + while current_date <= parsed_end_date: + dates.append(current_date) + # The addition of 32 days to current_date and then setting the day to 1 is a way to ensure that we move to + # the beginning of the next month, even if the month doesn't have exactly 32 days. + current_date += timedelta(days=32) + current_date = current_date.replace(day=1) + + return [date.strftime(date_format) for date in dates] + + +def get_unzipped_cartesian_product(input_lists: list): + """ + Generate an unzipped cartesian product of the given list of lists. + + For example, get_unzipped_cartesian_product([[1, 2, 3], ["a", "b", "c"]]) would return: + + [ + [1, 1, 1, 2, 2, 3, 3, 3], + ["a", "b", "c", "a", "b", "c", "a", "b", "c"] + ] + + Args: + input_lists (list): A list of two or more lists. + """ + return list(zip(*itertools.product(*input_lists))) + + +def get_filename_safe_course_id(course_id, replacement_char='_'): + """ + Create a representation of a course_id that can be used safely in a filepath. + """ + try: + course_key = CourseKey.from_string(course_id) + # Ignore the namespace of the course_id altogether, for backwards compatibility. + filename = course_key._to_string() # pylint: disable=protected-access + except InvalidKeyError: + # If the course_id doesn't parse, we will still return a value here. + filename = course_id + + # The safest characters are A-Z, a-z, 0-9, , and . + # We represent the first four with \w. + # TODO: Once we support courses with unicode characters, we will need to revisit this. + return re.sub(r'[^\w\.\-]', six.text_type(replacement_char), filename) + +def generate_date_range(start_date= None, end_date= None, is_daily: bool = None): + """ + Generate a list of dates depending on parameters passed. Dates are inclusive. + Custom dates is top priority: start_date & end_date are set, is_daily = False + Daily run: start_date & end_date are both None, is_daily = True + True-up run: start_date & end_date are both None, is_daily = False + + Args: + start_date (str): The start date in YYYY-MM-DD format + end_date (str): The end date in YYYY-MM-DD format + is_daily (bool): Designates if last completed day to run + """ + + if start_date is not None and end_date is not None and is_daily is False: + # Manual run: user entered parameters for custom dates + #logger.info("Setting dates for manual run...") + #start_date= start_date.strftime('%Y-%m-%d') + start_date = datetime.strptime(start_date, "%Y-%m-%d").date() + end_date = datetime.strptime(end_date, "%Y-%m-%d").date() + #end_date= end_date.strftime('%Y-%m-%d') + + elif start_date is None and end_date is None and is_daily is True: + # Daily run: minus 2 lag completed day, eg. if today is 9/14, output is 9/12 + # this is due to timing issues with data freshness for STRIPE_RAW <> Snowflake share + minus_two_lag_date = datetime.strptime(str(date.today()), "%Y-%m-%d").date() - timedelta(days=2) + start_date, end_date = minus_two_lag_date, minus_two_lag_date + + elif start_date is None and end_date is None and is_daily is False: + # True-up: Calculate the last completed month + today = datetime.now().date() + current_month_start = datetime(today.year, today.month, 1).date() + last_month_end = current_month_start - timedelta(days=1) + last_month_start = datetime(last_month_end.year, last_month_end.month, 1).date() + + start_date = last_month_start + end_date = last_month_end + + else: + raise Exception("Incorrect parameters passed!") + + current_date = start_date + date_list = [] + + while current_date <= end_date: + date_list.append(current_date) + current_date += timedelta(days=1) + + return date_list diff --git a/edx_prefectutils/edx_api_client.py b/edx_argoutils/edx_api_client.py similarity index 98% rename from edx_prefectutils/edx_api_client.py rename to edx_argoutils/edx_api_client.py index d365b0c..8fb6a15 100644 --- a/edx_prefectutils/edx_api_client.py +++ b/edx_argoutils/edx_api_client.py @@ -4,7 +4,7 @@ import backoff import requests -from prefect.utilities.logging import get_logger +import logging from requests.auth import AuthBase DEFAULT_RETRY_STATUS_CODES = ( @@ -16,6 +16,7 @@ 520, # This is a custom Cloudwatch code for "Unknown error". ) DEFAULT_TIMEOUT_SECONDS = 7200 +logger = logging.getLogger("edx_api_client") class EdxApiClient(object): @@ -57,7 +58,6 @@ def authenticated_session(self): def ensure_oauth_access_token(self): """Retrieves OAuth 2.0 access token using the client credentials grant and stores it in the request session.""" - logger = get_logger() now = datetime.utcnow() if self._expires_at is None or now >= self._expires_at: logger.info('Token is expired or missing, requesting a new one.') @@ -203,7 +203,6 @@ def __call__(self, r): def log_response_hook(response, *args, **kwargs): # pylint: disable=unused-argument """Log summary information about every request made.""" - logger = get_logger() logger.info( "[{}] [{}] [{}] {}".format( response.request.method, response.status_code, response.elapsed.total_seconds(), response.url diff --git a/edx_prefectutils/email_unsubscribes.py b/edx_argoutils/email_unsubscribes.py similarity index 100% rename from edx_prefectutils/email_unsubscribes.py rename to edx_argoutils/email_unsubscribes.py diff --git a/edx_prefectutils/hubspot_leads.py b/edx_argoutils/hubspot_leads.py similarity index 100% rename from edx_prefectutils/hubspot_leads.py rename to edx_argoutils/hubspot_leads.py diff --git a/edx_prefectutils/mysql.py b/edx_argoutils/mysql.py similarity index 92% rename from edx_prefectutils/mysql.py rename to edx_argoutils/mysql.py index c4db16c..dc4e513 100644 --- a/edx_prefectutils/mysql.py +++ b/edx_argoutils/mysql.py @@ -1,19 +1,20 @@ """ Tasks for interacting with Aurora MySQL. """ +import logging import os - import mysql.connector -from prefect import task -from prefect.engine import signals -from prefect.utilities.logging import get_logger -from edx_prefectutils.snowflake import MANIFEST_FILE_NAME +from edx_argoutils.snowflake import MANIFEST_FILE_NAME +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +) def create_mysql_connection(credentials: dict, database: str, autocommit: bool = False): - user = credentials['user'] + user = credentials['username'] password = credentials['password'] host = credentials['host'] @@ -44,7 +45,6 @@ def create_mysql_connection(credentials: dict, database: str, autocommit: bool = return connection -@task def load_s3_data_to_mysql( aurora_credentials: dict, database: str, @@ -98,7 +98,7 @@ def _drop_temp_tables(table, connection): query = "DROP TABLE IF EXISTS {table}".format(table=table) connection.cursor().execute(query) - logger = get_logger() + logger = logging.getLogger("load_s3_data_to_mysql") connection = create_mysql_connection(aurora_credentials, database) @@ -120,7 +120,7 @@ def _drop_temp_tables(table, connection): table=table, table_schema=table_schema ) - logger.debug(query) + logger.info(query) connection.cursor().execute(query) # Check for existing data @@ -130,7 +130,8 @@ def _drop_temp_tables(table, connection): row = cursor.fetchone() if row and not overwrite: - raise signals.SKIP('Skipping task as data already exists in the dest. table and no overwrite was provided.') + logger.info('Skipping task as data already exists in the destination table and no overwrite was provided.') + return # Create a temp table for loading data if overwrite and overwrite_with_temp_table: @@ -141,7 +142,7 @@ def _drop_temp_tables(table, connection): try: if row and overwrite and not overwrite_with_temp_table: query = "DELETE FROM {table} {record_filter}".format(table=table, record_filter=record_filter) - logger.debug("Deleting existing data for {table}".format(table=table)) + logger.info("Deleting existing data for {table}".format(table=table)) connection.cursor().execute(query) if use_manifest: @@ -177,7 +178,7 @@ def _drop_temp_tables(table, connection): # Commit if we're not getting an implicit commit from RENAME. connection.commit() except Exception as e: - logger.error(str(e)) + logger.info(str(e)) connection.rollback() raise finally: diff --git a/edx_prefectutils/paypal.py b/edx_argoutils/paypal.py similarity index 73% rename from edx_prefectutils/paypal.py rename to edx_argoutils/paypal.py index 9c5c2ca..fc01b53 100644 --- a/edx_prefectutils/paypal.py +++ b/edx_argoutils/paypal.py @@ -5,13 +5,16 @@ import datetime import fnmatch import json +import logging -import prefect from paramiko import SFTPClient, Transport -from prefect import config, task -from prefect.engine import signals +from edx_argoutils.s3 import get_s3_path_for_date, list_object_keys_from_s3 -from edx_prefectutils.s3 import get_s3_path_for_date, list_object_keys_from_s3 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +) +log = logging.getLogger("load_paypal_to_snowflake") def check_paypal_report(sftp_connection, remote_filename, check_column_name): @@ -71,8 +74,7 @@ def get_paypal_filename(date, prefix, connection, remote_path): """ Get remote filename. Sample remote filename: DDR-20190822.01.008.CSV """ - logger = prefect.context.get("logger") - logger.info(connection) + log.info(connection) date_string = date.strftime('%Y%m%d') pattern = (prefix, date_string, 'CSV') remote_filepattern = "*".join(pattern) @@ -87,13 +89,6 @@ class RemoteFileNotFoundError(Exception): pass -# Retry every 10 minutes for 5 hours! This should hopefully handle situations when the report is abnormally late. -@task( - max_retries=30, - retry_delay=datetime.timedelta(minutes=10), - # Skip this retry filter until we upgrade to prefect 1.2.x since it is a new feature. - retry_on=RemoteFileNotFoundError, -) def fetch_paypal_report( date: str, paypal_credentials: dict, @@ -106,38 +101,25 @@ def fetch_paypal_report( port: str = None, remote_path: str = None, aws_credentials: dict = None, - argo: bool = False, ): - paypal_config = getattr(config, 'paypal', None) - - host = getattr(paypal_config, 'host', None) or host - port = getattr(paypal_config, 'port', None) or port - remote_path = getattr(paypal_config, 'remote_path', None) or remote_path - - logger = prefect.context.get("logger") - logger.info("Pulling Paypal report for {}".format(date)) + log.info("Pulling Paypal report for {}".format(date)) if not overwrite: # If we're not overwriting and the file already exists, raise a skip date_path = get_s3_path_for_date(date) s3_key = s3_path + date_path - logger.info("Checking for existence of: {}".format(s3_key)) + log.info("Checking for existence of: {}".format(s3_key)) - existing_file = list_object_keys_from_s3.run(s3_bucket, s3_key, aws_credentials) + existing_file = list_object_keys_from_s3(s3_bucket, s3_key, aws_credentials) if existing_file: - if not argo: - raise signals.SKIP( - 'File {} already exists and we are not overwriting. Skipping.'.format(s3_key) - ) - else: - logger.info( - 'File {} already exists and we are not overwriting. Skipping.'.format(s3_key) - ) - return + log.info( + 'File {} already exists and we are not overwriting. Skipping.'.format(s3_key) + ) + return else: - logger.info("File not found, continuing download for {}.".format(date)) + log.info("File not found, continuing download for {}.".format(date)) transport = Transport(host, port) transport.connect( diff --git a/edx_prefectutils/paypal_xml.py b/edx_argoutils/paypal_xml.py similarity index 100% rename from edx_prefectutils/paypal_xml.py rename to edx_argoutils/paypal_xml.py diff --git a/edx_prefectutils/record.py b/edx_argoutils/record.py similarity index 100% rename from edx_prefectutils/record.py rename to edx_argoutils/record.py diff --git a/edx_argoutils/s3.py b/edx_argoutils/s3.py new file mode 100644 index 0000000..018f5f2 --- /dev/null +++ b/edx_argoutils/s3.py @@ -0,0 +1,117 @@ +""" +S3 related common methods +""" + +import boto3 +import logging + +logger = logging.getLogger("s3") + +def get_s3_client(credentials: dict = None): + s3_client = None + if credentials: + s3_client = boto3.client( + 's3', + aws_access_key_id=credentials.get('AccessKeyId'), + aws_secret_access_key=credentials.get('SecretAccessKey'), + aws_session_token=credentials.get('SessionToken') + ) + else: + s3_client = boto3.client('s3') + + return s3_client + +def delete_s3_directory(bucket: str = None, prefix: str = None, credentials: dict = None): + """ + Deletes all objects with the given S3 directory (prefix) from the given bucket. + + Args: + bucket (str): The S3 bucket to delete the objects from. + prefix (str): The S3 prefix to delete the objects from. + credentials (dict): The AWS credentials to use. + """ + s3_keys = list_object_keys_from_s3(bucket, prefix, credentials) + if s3_keys: + s3_client = get_s3_client(credentials) + logger.info("Deleting S3 keys: {}".format(s3_keys)) + s3_client.delete_objects( + Bucket=bucket, + Delete={ + 'Objects': [{'Key': key} for key in s3_keys] + } + ) + + +def delete_object_from_s3(key: str = None, bucket: str = None, credentials: dict = None, ): + """ + Delete an object from S3. + + key (str): Name of the object within the S3 bucket (/foo/bar/baz.json) + bucket (str): Name of the S3 bucket to delete from. + credentials (dict): AWS credentials, if None boto will fall back the usual methods of resolution. + """ + s3_client = get_s3_client(credentials) + s3_client.delete_object(Bucket=bucket, Key=key) + + +def list_object_keys_from_s3(bucket: str = None, prefix: str = '', credentials: dict = None, ): + """ + List objects key names from an S3 bucket that match the given prefix. + + prefix (str): Prefix path to search (ex: /foo/bar will match /foo/bar/baz and /foo/bar/baz/bing ...) + bucket (str): Name of the S3 bucket to search from. + credentials (dict): AWS credentials, if None boto will fall back the usual methods of resolution. + """ + s3_client = get_s3_client(credentials) + + all_object_keys = [] + + response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) + + if 'Contents' in response: + all_object_keys.extend([o['Key'] for o in response['Contents']]) + + while response.get('IsTruncated'): # Check if there are more objects to fetch + # Use the NextContinuationToken to get the next batch of results + response = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=prefix, + ContinuationToken=response['NextContinuationToken'] + ) + + # Add the new batch of results to the list + if 'Contents' in response: + all_object_keys.extend([o['Key'] for o in response['Contents']]) + + # Log the total number of found objects + logger.info(f"Total objects found: {len(all_object_keys)}") + logger.info(f"Found objects: {all_object_keys}") + # edx_legacy/segment-config/dev/load_segment_config_to_snowflake/2024-11-08/2024-11-08.json + + return all_object_keys + + +def get_s3_path_for_date(filename): + # The path and file name inside our given bucket and S3 prefix to write the file to + return '{filename}/{filename}.json'.format(filename=filename) + + +def write_report_to_s3(download_results: tuple, s3_bucket: str, s3_path: str, credentials: dict = None): + filename, report_str = download_results + file_path = get_s3_path_for_date(filename) + s3_key = s3_path + file_path + logger.info("Writing report to S3 for {} to {}".format(filename, s3_key)) + + s3_client = get_s3_client(credentials) + + s3_client.put_object( + Bucket=s3_bucket, + Key=s3_key, + Body=report_str, + ContentType='application/json' + ) + return file_path + + +def get_s3_url(s3_bucket, s3_path): + return 's3://{bucket}/{path}'.format(bucket=s3_bucket, path=s3_path) diff --git a/edx_prefectutils/sitemap.py b/edx_argoutils/sitemap.py similarity index 66% rename from edx_prefectutils/sitemap.py rename to edx_argoutils/sitemap.py index 8a32f9c..e7aca2c 100644 --- a/edx_prefectutils/sitemap.py +++ b/edx_argoutils/sitemap.py @@ -1,19 +1,16 @@ """ -Tasks for pulling sitemap data. +Functions for pulling sitemap data. """ import json import xml.etree.ElementTree as ET from os.path import basename, join, splitext from urllib.parse import urlparse - -import prefect +import boto3 import requests -from prefect import task -from prefect.tasks.aws import s3 +from .common import get_date -@task def fetch_sitemap_urls(sitemap_index_url: str) -> str: """ Fetches a list of sitemap urls by parsing sitemap index file. @@ -25,12 +22,11 @@ def fetch_sitemap_urls(sitemap_index_url: str) -> str: return sitemap_urls -@task def fetch_sitemap(sitemap_url: str): """ Fetches sitemap data from a given sitemap URL. """ - scraped_at = str(prefect.context.date) + scraped_at = get_date(None) r = requests.get(sitemap_url) sitemap_xml = r.text tree = ET.fromstring(sitemap_xml) @@ -44,17 +40,25 @@ def fetch_sitemap(sitemap_url: str): return sitemap_filename, json.dumps(sitemap_json) -@task -def write_sitemap_to_s3(sitemap_data: str, s3_bucket: str, s3_path: str): +def write_sitemap_to_s3(sitemap_data: str, s3_bucket: str, s3_path: str, credentials: dict = {}): """ Writes sitemap data in JSON format to S3. """ filename, sitemap_json = sitemap_data - date_path = f'{prefect.context.today}/{filename}.json' - s3_key = join(s3_path, date_path) - s3.S3Upload(bucket=s3_bucket).run( - sitemap_json, - key=s3_key, + today = get_date(None) + date_path = f'{today}/{filename}.json' + s3_key = f'{s3_path}{date_path}' + s3_client = boto3.client( + 's3', + aws_access_key_id=credentials.get('AccessKeyId'), + aws_secret_access_key=credentials.get('SecretAccessKey'), + aws_session_token=credentials.get('SessionToken') + ) + s3_client.put_object( + Bucket=s3_bucket, + Key=s3_key, + Body=sitemap_json, + ContentType='application/json' ) return date_path diff --git a/edx_prefectutils/snowflake.py b/edx_argoutils/snowflake.py similarity index 94% rename from edx_prefectutils/snowflake.py rename to edx_argoutils/snowflake.py index 0e4b379..d7ae6d0 100644 --- a/edx_prefectutils/snowflake.py +++ b/edx_argoutils/snowflake.py @@ -1,9 +1,10 @@ """ -Utility methods and tasks for working with Snowflake from a Prefect flow. +Utility methods and tasks for working with Snowflake from a Argo flow. """ import json import os from collections import namedtuple +import logging from typing import List, TypedDict from urllib.parse import urlparse @@ -11,15 +12,15 @@ import snowflake.connector from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization -from prefect import task -from prefect.engine import signals -from prefect.tasks.aws import s3 -from prefect.utilities.logging import get_logger -from edx_prefectutils import s3 as s3_utils +from edx_argoutils import s3 as s3_utils MANIFEST_FILE_NAME = 'manifest.json' EXPORT_MAX_FILESIZE = 104857600 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +) class SFCredentials(TypedDict, total=False): @@ -97,10 +98,6 @@ def qualified_stage_name(database, schema, table) -> str: ) -@task -@backoff.on_exception(backoff.expo, - snowflake.connector.ProgrammingError, - max_tries=3) def load_ga_data_to_snowflake( sf_credentials: SFCredentials, sf_database: str, @@ -224,7 +221,6 @@ def load_ga_data_to_snowflake( sf_connection.close() -@task def load_s3_data_to_snowflake( date: str, date_property: str, @@ -282,9 +278,9 @@ def load_s3_data_to_snowflake( disable_existence_check (bool, optional): Whether to disable check for existing data, useful when always appending to the table regardless of any existing data for that provided `date` """ - logger = get_logger() + logger = logging.getLogger("Snowflake Utility") if not file and not pattern: - raise signals.FAIL('Either `file` or `pattern` must be specified to run this task.') + raise ValueError("Either `file` or `pattern` must be specified to run this task.") sf_connection = create_snowflake_connection(sf_credentials, sf_role, warehouse=sf_warehouse) @@ -320,7 +316,8 @@ def load_s3_data_to_snowflake( raise if row and not overwrite: - raise signals.SKIP('Skipping task as data for the date exists and no overwrite was provided.') + logger.info("Skipping task as data for the date exists and no overwrite was provided.") + return else: logger.info("Continuing with S3 load for {}".format(date)) @@ -374,7 +371,9 @@ def load_s3_data_to_snowflake( if file: logger.info("Loading file {}".format(file)) - files_paramater = "FILES = ( '{}' )".format(file) + files = file.split(',') + # files_paramater = "FILES = ( '{}' )".format(file) + files_paramater = "FILES = ({})".format(", ".join(["'{}'".format(f) for f in files])) if pattern: logger.info("Loading pattern {}".format(pattern)) @@ -415,7 +414,6 @@ def load_s3_data_to_snowflake( sf_connection.close() -@task def export_snowflake_table_to_s3( sf_credentials: SFCredentials, sf_database: str, @@ -433,6 +431,7 @@ def export_snowflake_table_to_s3( overwrite: bool = True, single: bool = False, generate_manifest: bool = False, + aws_credentials: dict = None, ): """ @@ -465,7 +464,7 @@ def export_snowflake_table_to_s3( copy option. generate_manifest (bool, optional): Whether to generate a manifest file in S3. Defaults to `FALSE`. """ - logger = get_logger() + logger = logging.getLogger("Export Snowflake Table to S3 Utility") sf_connection = create_snowflake_connection( credentials=sf_credentials, @@ -484,7 +483,7 @@ def export_snowflake_table_to_s3( if overwrite: logger.info("Deleting existing data in S3 bucket: {bucket} with prefix: {prefix}".format( bucket=export_bucket, prefix=export_prefix)) - s3_utils.delete_s3_directory.run(export_bucket, export_prefix) + s3_utils.delete_s3_directory(export_bucket, export_prefix, aws_credentials) escape_clause = '' if escape_unenclosed_field is None \ else "ESCAPE_UNENCLOSED_FIELD = NONE" if escape_unenclosed_field == 'NONE' \ @@ -552,14 +551,16 @@ def export_snowflake_table_to_s3( "urls": s3_file_paths, } ) - s3.S3Upload(bucket=export_bucket).run( - json.dumps(manifest_file_content), - key=s3_manifest_file_prefix + s3_client = s3_utils.get_s3_client(aws_credentials) + s3_client.put_object( + Bucket=export_bucket, + Key=s3_manifest_file_prefix, + Body=json.dumps(manifest_file_content) ) except snowflake.connector.ProgrammingError as e: if 'Files already existing at the unload destination' in e.msg: logger.error("Files already exist at {destination}".format(destination=export_path)) - raise signals.FAIL('Files already exist. Use overwrite option to force unloading.') + raise Exception('Files already exist. Use overwrite option to force unloading.') else: raise finally: diff --git a/edx_prefectutils/vault_secrets.py b/edx_argoutils/vault_secrets.py similarity index 100% rename from edx_prefectutils/vault_secrets.py rename to edx_argoutils/vault_secrets.py diff --git a/edx_prefectutils/__init__.py b/edx_prefectutils/__init__.py deleted file mode 100644 index 7dfb5e8..0000000 --- a/edx_prefectutils/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -Top-level package for edx-prefectutils. -""" - -__version__ = '2.4.4' diff --git a/edx_prefectutils/common.py b/edx_prefectutils/common.py deleted file mode 100644 index c0d8f04..0000000 --- a/edx_prefectutils/common.py +++ /dev/null @@ -1,112 +0,0 @@ -""" -Utility methods and tasks for use from a Prefect flow. -""" - -import datetime -import itertools -import re - -import prefect -import six -from opaque_keys import InvalidKeyError -from opaque_keys.edx.keys import CourseKey -from prefect import task -from prefect.engine.results import PrefectResult - - -@task -def get_date(date: str): - """ - Return today's date string if date is None. Otherwise return the passed parameter value. - prefect.context.today is only available at task level, so we cannot use it as a default parameter value. - """ - if date is None: - return prefect.context.today - else: - return date - - -@task(result=PrefectResult()) -def generate_dates(start_date: str, end_date: str, date_format: str = "%Y%m%d"): - """ - Generates a list of date strings in the format specified by `date_format` from - start_date up to but excluding end_date. - """ - if not start_date: - start_date = prefect.context.yesterday - if not end_date: - end_date = prefect.context.today - - parsed_start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d") - parsed_end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d") - dates = [] - while parsed_start_date < parsed_end_date: - dates.append(parsed_start_date) - parsed_start_date = parsed_start_date + datetime.timedelta(days=1) - - return [date.strftime(date_format) for date in dates] - - -@task -def generate_month_start_dates(start_date: str, end_date: str, date_format: str = "%Y-%m-%d"): - """ - Return a list of first days of months within the specified date range. - If start_date or end_date is not provided, defaults to yesterday or today respectively. - prefect.context.today is only available at task level, so we cannot use it as a default parameter value. - """ - if not start_date: - start_date = prefect.context.yesterday - if not end_date: - end_date = prefect.context.today - - # Since our intention is to extract first day of months, we will start by modifying the start and end date - # to represent the first day of month. - parsed_start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").replace(day=1) - parsed_end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").replace(day=1) - dates = [] - current_date = parsed_start_date - while current_date <= parsed_end_date: - dates.append(current_date) - # The addition of 32 days to current_date and then setting the day to 1 is a way to ensure that we move to - # the beginning of the next month, even if the month doesn't have exactly 32 days. - current_date += datetime.timedelta(days=32) - current_date = current_date.replace(day=1) - - return [date.strftime(date_format) for date in dates] - - -@task -def get_unzipped_cartesian_product(input_lists: list): - """ - Generate an unzipped cartesian product of the given list of lists, useful for - generating task parameters for mapping. - - For example, get_unzipped_cartesian_product([[1, 2, 3], ["a", "b", "c"]]) would return: - - [ - [1, 1, 1, 2, 2, 3, 3, 3], - ["a", "b", "c", "a", "b", "c", "a", "b", "c"] - ] - - Args: - input_lists (list): A list of two or more lists. - """ - return list(zip(*itertools.product(*input_lists))) - - -def get_filename_safe_course_id(course_id, replacement_char='_'): - """ - Create a representation of a course_id that can be used safely in a filepath. - """ - try: - course_key = CourseKey.from_string(course_id) - # Ignore the namespace of the course_id altogether, for backwards compatibility. - filename = course_key._to_string() # pylint: disable=protected-access - except InvalidKeyError: - # If the course_id doesn't parse, we will still return a value here. - filename = course_id - - # The safest characters are A-Z, a-z, 0-9, , and . - # We represent the first four with \w. - # TODO: Once we support courses with unicode characters, we will need to revisit this. - return re.sub(r'[^\w\.\-]', six.text_type(replacement_char), filename) diff --git a/edx_prefectutils/s3.py b/edx_prefectutils/s3.py deleted file mode 100644 index 36f561a..0000000 --- a/edx_prefectutils/s3.py +++ /dev/null @@ -1,95 +0,0 @@ -""" -S3 related common methods and tasks for Prefect -""" - -import prefect -from prefect import task -from prefect.tasks.aws import s3 -from prefect.utilities.aws import get_boto_client - - -@task -def delete_s3_directory(bucket: str = None, prefix: str = None, credentials: dict = None): - """ - Deletes all objects with the given S3 directory (prefix) from the given bucket. - - Args: - bucket (str): The S3 bucket to delete the objects from. - prefix (str): The S3 prefix to delete the objects from. - credentials (dict): The AWS credentials to use. - """ - s3_keys = list_object_keys_from_s3.run(bucket, prefix, credentials) - if s3_keys: - s3_client = get_boto_client('s3', credentials=credentials) - logger = prefect.context.get("logger") - logger.info("Deleting S3 keys: {}".format(s3_keys)) - s3_client.delete_objects( - Bucket=bucket, - Delete={ - 'Objects': [{'Key': key} for key in s3_keys] - } - ) - - -@task -def delete_object_from_s3(key: str = None, bucket: str = None, credentials: dict = None, ): - """ - Delete an object from S3. - - key (str): Name of the object within the S3 bucket (/foo/bar/baz.json) - bucket (str): Name of the S3 bucket to delete from. - credentials (dict): AWS credentials, if None boto will fall back the usual methods of resolution. - """ - s3_client = get_boto_client("s3", credentials=credentials) - s3_client.delete_object(Bucket=bucket, Key=key) - - -@task -def list_object_keys_from_s3(bucket: str = None, prefix: str = '', credentials: dict = None, ): - """ - List objects key names from an S3 bucket that match the given prefix. - - prefix (str): Prefix path to search (ex: /foo/bar will match /foo/bar/baz and /foo/bar/baz/bing ...) - bucket (str): Name of the S3 bucket to search from. - credentials (dict): AWS credentials, if None boto will fall back the usual methods of resolution. - """ - s3_client = get_boto_client("s3", credentials=credentials) - found_objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) - - logger = prefect.context.get("logger") - logger.info(found_objects) - - if found_objects['KeyCount']: - return [ - o['Key'] for o in found_objects['Contents'] - ] - else: - return [] - - -def get_s3_path_for_date(date): - # The path and file name inside our given bucket and S3 prefix to write the file to - return '{date}/{date}.json'.format(date=date) - - -@task -def write_report_to_s3(download_results: tuple, s3_bucket: str, s3_path: str, credentials: dict = None): - logger = prefect.context.get("logger") - - date, report_str = download_results - date_path = get_s3_path_for_date(date) - s3_key = s3_path + date_path - logger.info("Writing report to S3 for {} to {}".format(date, s3_key)) - - s3.S3Upload(bucket=s3_bucket).run( - report_str, - key=s3_key, - credentials=credentials - ) - - return date_path - - -@task -def get_s3_url(s3_bucket, s3_path): - return 's3://{bucket}/{path}'.format(bucket=s3_bucket, path=s3_path) diff --git a/setup.cfg b/setup.cfg index eb35ff5..3fa9d0a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ tag = True search = version='{current_version}' replace = version='{new_version}' -[bumpversion:file:edx_prefectutils/__init__.py] +[bumpversion:file:edx_argoutils/__init__.py] search = __version__ = '{current_version}' replace = __version__ = '{new_version}' diff --git a/setup.py b/setup.py index d1ac6b9..ec05f30 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,7 @@ def is_requirement(line): line.startswith('-c') ) -VERSION = get_version('edx_prefectutils', '__init__.py') +VERSION = get_version('edx_argoutils', '__init__.py') if sys.argv[-1] == 'tag': print("Tagging the version on github:") @@ -71,8 +71,8 @@ def is_requirement(line): test_requirements = ['pytest>=3', ] setup( - author="Julia Eskew", - author_email='jeskew@edx.org', + author="Abdul Rafey", + author_email='arafey@2u.com', python_requires='>=3.8', license='AGPL', classifiers=[ @@ -83,22 +83,22 @@ def is_requirement(line): 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.8', ], - description="Utility code to assist in writing Prefect Flows.", + description="Utility code to assist in writing Argo Flows.", entry_points={ 'console_scripts': [ - 'edx_prefectutils=edx_prefectutils.cli:main', + 'edx_argoutils=edx_argoutils.cli:main', ], }, install_requires=load_requirements('requirements/base.in'), long_description=README, include_package_data=True, - keywords='edx_prefectutils', - name='edx-prefectutils', - packages=find_packages(include=['edx_prefectutils', 'edx_prefectutils.*']), + keywords='edx_argoutils', + name='edx-argoutils', + packages=find_packages(include=['edx_argoutils', 'edx_argoutils.*']), setup_requires=setup_requirements, test_suite='tests', tests_require=test_requirements, - url='https://github.com/edx/edx-prefectutils', + url='https://github.com/2uinc/edx-argoutils', version=VERSION, zip_safe=False, ) diff --git a/tests/test_bigquery.py b/tests/test_bigquery.py index e9283bb..6cf9427 100644 --- a/tests/test_bigquery.py +++ b/tests/test_bigquery.py @@ -1,13 +1,13 @@ #!/usr/bin/env python """ -Tests for BigQuery utils in the `edx_prefectutils` package. +Tests for BigQuery utils in the `edx_argoutils` package. """ from prefect.core import Flow from pytest_mock import mocker # noqa: F401 -from edx_prefectutils import bigquery +from edx_argoutils import bigquery def test_cleanup_gcs_files(mocker): # noqa: F811 diff --git a/tests/test_common.py b/tests/test_common.py index ddb597f..63ed115 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -1,44 +1,91 @@ #!/usr/bin/env python """ -Tests for BigQuery utils in the `edx_prefectutils` package. +Tests for Common utils in the `edx_argoutils`. """ -from prefect.core import Flow - -from edx_prefectutils import common - +from edx_argoutils import common +from datetime import datetime, date +from unittest.mock import patch +from opaque_keys.edx.keys import CourseKey def test_generate_dates(): - with Flow("test") as f: - task = common.generate_dates( - start_date='2020-01-01', - end_date='2020-01-05' - ) - state = f.run() - assert state.is_successful() - assert state.result[task].result == ['20200101', '20200102', '20200103', '20200104'] + result = common.generate_dates( + start_date='2020-01-01', + end_date='2020-01-05' + ) + assert result == ['20200101', '20200102', '20200103', '20200104'] def test_generate_month_start_dates(): - with Flow("test") as f: - task = common.generate_month_start_dates( - start_date='2023-01-31', - end_date='2023-05-05' - ) - state = f.run() - assert state.is_successful() - assert state.result[task].result == ['2023-01-01', '2023-02-01', '2023-03-01', '2023-04-01', '2023-05-01'] + result = common.generate_month_start_dates( + start_date='2023-01-31', + end_date='2023-05-05' + ) + assert result == ['2023-01-01', '2023-02-01', '2023-03-01', '2023-04-01', '2023-05-01'] def test_get_unzipped_cartesian_product(): - with Flow("test") as f: - task = common.get_unzipped_cartesian_product( - input_lists=[[1, 2, 3], ["a", "b", "c"]] - ) - state = f.run() - assert state.is_successful() - assert state.result[task].result == [ - (1, 1, 1, 2, 2, 2, 3, 3, 3), - ("a", "b", "c", "a", "b", "c", "a", "b", "c") + result = common.get_unzipped_cartesian_product( + input_lists=[[1, 2, 3], ["a", "b", "c"]] + ) + assert result == [ + (1, 1, 1, 2, 2, 2, 3, 3, 3), + ("a", "b", "c", "a", "b", "c", "a", "b", "c") ] + + +def test_valid_course_id(): + course_id = "course-v1:BerkeleyX+CS198.SDC.1+1T2021" + result = "BerkeleyX_CS198.SDC.1_1T2021" + with patch.object(CourseKey, 'from_string') as mock_from_string: + mock_from_string.return_value._to_string.return_value = result + assert result == common.get_filename_safe_course_id(course_id) + +def test_invalid_course_id(): + course_id = "BerkeleyX!CS198.SDC.1!1T2021" + result = "BerkeleyX_CS198.SDC.1_1T2021" + assert result == common.get_filename_safe_course_id(course_id) + +def test_generate_date_range(): + # Test Case 1: Custom date range (is_daily=False) + result = common.generate_date_range( + start_date='2025-01-01', + end_date='2025-01-05', + is_daily=False + ) + assert result == [ + datetime.strptime(date, '%Y-%m-%d').date() + for date in ['2025-01-01', '2025-01-02', '2025-01-03', '2025-01-04', '2025-01-05'] + ] + + # Test Case 2: Daily run (is_daily=True) with mock + fixed_today = date(2025, 1, 28) # Assume today's date is 2025-01-28 + with patch('edx_argoutils.common.date') as mock_date: + mock_date.today.return_value = fixed_today + mock_date.side_effect = lambda *args, **kwargs: date(*args, **kwargs) + + result = common.generate_date_range(is_daily=True) + expected = [date(2025, 1, 26)] # Two days before fixed_today + assert result == expected, f"Expected {expected}, but got {result}" + + #Test Case 3: True-up scenario (is_daily=False, no start_date and end_date) + with patch('edx_argoutils.common.date') as mock_date: + mock_date.today.return_value = fixed_today + mock_date.side_effect = lambda *args, **kwargs: date(*args, **kwargs) + + result = common.generate_date_range(is_daily=False) + expected = [date(2024, 12, d) for d in range(1, 32)] #Last completed month + assert result == expected, f"Expected {expected}, but got {result}" + + # Test Case 4: Invalid parameters + try: + common.generate_date_range( + start_date="2025-01-01", + end_date=None, + is_daily=False + ) + except Exception as e: + assert str(e) == "Incorrect parameters passed!" + else: + assert False, "Expected an exception but none was raised!" \ No newline at end of file diff --git a/tests/test_edx_api_client.py b/tests/test_edx_api_client.py index e3214d9..46cb98e 100644 --- a/tests/test_edx_api_client.py +++ b/tests/test_edx_api_client.py @@ -9,7 +9,7 @@ from ddt import data, ddt, unpack from mock import patch -from edx_prefectutils.edx_api_client import EdxApiClient +from edx_argoutils.edx_api_client import EdxApiClient FAKE_AUTH_URL = 'http://example.com/oauth2/access_token' FAKE_CLIENT_ID = 'aclientid' @@ -39,7 +39,7 @@ def utcnow(cls): """Return the time specified by the time offset""" return self.current_time + timedelta(seconds=self.time_offset) - datetime_patcher = patch('edx_prefectutils.edx_api_client.datetime', MockDateTime) + datetime_patcher = patch('edx_argoutils.edx_api_client.datetime', MockDateTime) datetime_patcher.start() self.addCleanup(datetime_patcher.stop) diff --git a/tests/test_mysql.py b/tests/test_mysql.py index bf76375..be1dc58 100644 --- a/tests/test_mysql.py +++ b/tests/test_mysql.py @@ -4,7 +4,7 @@ from prefect.engine import signals from pytest_mock import mocker # noqa: F401 -from edx_prefectutils import mysql as utils_mysql +from edx_argoutils import mysql as utils_mysql @pytest.fixture diff --git a/tests/test_paypal.py b/tests/test_paypal.py index 7c109f7..7870747 100644 --- a/tests/test_paypal.py +++ b/tests/test_paypal.py @@ -6,7 +6,7 @@ from mock import Mock -from edx_prefectutils.paypal import get_paypal_filename +from edx_argoutils.paypal import get_paypal_filename def test_get_paypal_filename(): diff --git a/tests/test_paypal_xml.py b/tests/test_paypal_xml.py index 67d5258..0e608c7 100644 --- a/tests/test_paypal_xml.py +++ b/tests/test_paypal_xml.py @@ -6,7 +6,7 @@ import httpretty from ddt import data, ddt, unpack -from edx_prefectutils.paypal_xml import (ColumnMetadata, +from edx_argoutils.paypal_xml import (ColumnMetadata, PaypalApiRequestFailedError, PaypalMalformedResponseError, PaypalReportDataRequest, diff --git a/tests/test_s3.py b/tests/test_s3.py index 3fe95cb..9d47bca 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -5,11 +5,11 @@ from mock import patch from mock.mock import MagicMock -from edx_prefectutils.s3 import delete_s3_directory +from edx_argoutils.s3 import delete_s3_directory -@patch("edx_prefectutils.s3.list_object_keys_from_s3.run") -@patch("edx_prefectutils.s3.get_boto_client") +@patch("edx_argoutils.s3.list_object_keys_from_s3.run") +@patch("edx_argoutils.s3.get_boto_client") def test_delete_s3_directory(boto_client_mock, list_object_keys_from_s3_mock): """ Test the delete_s3_directory task diff --git a/tests/test_sitemap.py b/tests/test_sitemap.py index 79b140e..41741a7 100644 --- a/tests/test_sitemap.py +++ b/tests/test_sitemap.py @@ -1,67 +1,84 @@ """ -Tests for sitemap tasks. +Tests for sitemap functions. +pytest test_sitemap.py """ import json - +import unittest +from unittest.mock import Mock, patch import requests -from mock import Mock, patch -from prefect import context - -from edx_prefectutils.sitemap import fetch_sitemap, fetch_sitemap_urls +from edx_argoutils.sitemap import fetch_sitemap, fetch_sitemap_urls SCRAPED_AT = '2021-10-22T15:14:16.683985+00:00' -@patch.object(requests, 'get') -def test_fetch_sitemap_urls(mockget): - mockresponse = Mock() - mockget.return_value = mockresponse - mockresponse.text = """ - - - https://www.foo.com/sitemap-0.xml - - - https://www.foo.com/sitemap-1.xml - - - """ - expected_output = ['https://www.foo.com/sitemap-0.xml', 'https://www.foo.com/sitemap-1.xml'] - task = fetch_sitemap_urls - assert task.run(sitemap_index_url='dummy_url') == expected_output +class TestSitemapTasks(unittest.TestCase): + + @patch.object(requests, 'get') + def test_fetch_sitemap_urls(self, mockget): + # Mock the response from requests.get + mockresponse = Mock() + mockget.return_value = mockresponse + mockresponse.text = """ + + + https://www.foo.com/sitemap-0.xml + + + https://www.foo.com/sitemap-1.xml + + + """ + + # Expected output for the mock sitemap index response + expected_output = ['https://www.foo.com/sitemap-0.xml', 'https://www.foo.com/sitemap-1.xml'] + + # Call the function (directly, without Prefect context) + result = fetch_sitemap_urls(sitemap_index_url='dummy_url') + + # Check if the result matches the expected output + self.assertEqual(result, expected_output) + + + @patch.object(requests, 'get') + def test_fetch_sitemap(self, mockget): + # Mock the response from requests.get + mockresponse = Mock() + mockget.return_value = mockresponse + mockresponse.text = """ + + + https://www.foo.come/terms-service + daily + 0.7 + + + https://www.foo.come/policy + daily + 0.7 + + + https://www.foo.come/policy/security + daily + 0.7 + + + """ + + # Expected output for the mock sitemap response + expected_output = [ + {'scraped_at': SCRAPED_AT, 'url': 'https://www.foo.come/terms-service'}, + {'scraped_at': SCRAPED_AT, 'url': 'https://www.foo.come/policy'}, + {'scraped_at': SCRAPED_AT, 'url': 'https://www.foo.come/policy/security'}, + ] + + # Manually pass SCRAPED_AT (instead of using Prefect context) + sitemap_url = 'https://www.foo.com/sitemap-0.xml' + sitemap_filename, sitemap_json = fetch_sitemap(sitemap_url=sitemap_url, scraped_at=SCRAPED_AT) + # Check if the result matches the expected output + self.assertEqual((sitemap_filename, json.loads(sitemap_json)), ('sitemap-0', expected_output)) -@patch.object(requests, 'get') -def test_fetch_sitemap(mockget): - mockresponse = Mock() - mockget.return_value = mockresponse - mockresponse.text = """ - # noqa: E501 - - https://www.foo.come/terms-service - daily - 0.7 - - - https://www.foo.come/policy - daily - 0.7 - - - https://www.foo.come/policy/security - daily - 0.7 - - - """ - expected_output = [ - {'scraped_at': SCRAPED_AT, 'url': 'https://www.foo.come/terms-service'}, - {'scraped_at': SCRAPED_AT, 'url': 'https://www.foo.come/policy'}, - {'scraped_at': SCRAPED_AT, 'url': 'https://www.foo.come/policy/security'}, - ] - task = fetch_sitemap - with context(date=SCRAPED_AT): - sitemap_filename, sitemap_json = task.run(sitemap_url='https://www.foo.com/sitemap-0.xml') - assert (sitemap_filename, json.loads(sitemap_json)) == ('sitemap-0', expected_output) +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_snowflake.py b/tests/test_snowflake.py index 9af501b..9ddb940 100755 --- a/tests/test_snowflake.py +++ b/tests/test_snowflake.py @@ -1,7 +1,7 @@ #!/usr/bin/env python """ -Tests for Snowflake utils in the `edx_prefectutils` package. +Tests for Snowflake utils in the `edx_argoutils` package. """ import json @@ -14,7 +14,7 @@ from pytest_mock import mocker # noqa: F401 from snowflake.connector import ProgrammingError -from edx_prefectutils import snowflake +from edx_argoutils import snowflake def test_qualified_table_name(): @@ -306,7 +306,7 @@ def test_export_snowflake_table_to_s3_with_exception(mock_sf_connection): def test_export_snowflake_table_to_s3_overwrite(mock_sf_connection): # noqa: F811 mock_cursor = mock_sf_connection.cursor() - with mock.patch('edx_prefectutils.s3.delete_s3_directory.run') as mock_delete_s3_directory: + with mock.patch('edx_argoutils.s3.delete_s3_directory.run') as mock_delete_s3_directory: with Flow("test") as f: snowflake.export_snowflake_table_to_s3( sf_credentials={}, @@ -336,7 +336,7 @@ def test_export_snowflake_table_to_s3_overwrite(mock_sf_connection): # noqa: F8 def test_export_snowflake_table_to_s3_no_escape(mock_sf_connection): # noqa: F811 mock_cursor = mock_sf_connection.cursor() - with mock.patch('edx_prefectutils.s3.delete_s3_directory.run'): + with mock.patch('edx_argoutils.s3.delete_s3_directory.run'): with Flow("test") as f: snowflake.export_snowflake_table_to_s3( sf_credentials={}, @@ -363,7 +363,7 @@ def test_export_snowflake_table_to_s3_no_escape(mock_sf_connection): # noqa: F8 def test_export_snowflake_table_to_s3_no_enclosure(mock_sf_connection): # noqa: F811 mock_cursor = mock_sf_connection.cursor() - with mock.patch('edx_prefectutils.s3.delete_s3_directory.run'): + with mock.patch('edx_argoutils.s3.delete_s3_directory.run'): with Flow("test") as f: snowflake.export_snowflake_table_to_s3( sf_credentials={}, @@ -390,7 +390,7 @@ def test_export_snowflake_table_to_s3_no_enclosure(mock_sf_connection): # noqa: def test_export_snowflake_table_to_s3_no_null_if(mock_sf_connection): # noqa: F811 mock_cursor = mock_sf_connection.cursor() - with mock.patch('edx_prefectutils.s3.delete_s3_directory.run'): + with mock.patch('edx_argoutils.s3.delete_s3_directory.run'): with Flow("test") as f: snowflake.export_snowflake_table_to_s3( sf_credentials={}, diff --git a/tests/test_vault_secrets.py b/tests/test_vault_secrets.py index 30d0179..5330c03 100644 --- a/tests/test_vault_secrets.py +++ b/tests/test_vault_secrets.py @@ -1,13 +1,13 @@ #!/usr/bin/env python """ -Tests for Hashicorp Vault secrets utils in the `edx_prefectutils` package. +Tests for Hashicorp Vault secrets utils in the `edx_argoutils` package. """ from prefect import Flow, task, unmapped from pytest_mock import mocker # noqa: F401 -from edx_prefectutils import vault_secrets +from edx_argoutils import vault_secrets @task diff --git a/tox.ini b/tox.ini index 1c2301b..a311852 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ deps = -r{toxinidir}/requirements/test.txt commands = touch tests/__init__.py - flake8 edx_prefectutils tests + flake8 edx_argoutils tests rm tests/__init__.py - isort --check-only --diff edx_prefectutils tests + isort --check-only --diff edx_argoutils tests