From fa0a3c2fbce20f286832a088c00e40b2bd9ecaf4 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 24 Jul 2024 14:46:53 +0800 Subject: [PATCH] use logical date --- .../google_books_telescope/google_books_telescope.py | 4 +--- .../irus_fulcrum_telescope/irus_fulcrum_telescope.py | 2 +- .../irus_oapen_telescope/irus_oapen_telescope.py | 6 +----- dags/oaebu_workflows/jstor_telescope/jstor_telescope.py | 6 +----- .../oapen_metadata_telescope/oapen_metadata_telescope.py | 6 +----- dags/oaebu_workflows/onix_telescope/onix_telescope.py | 4 +--- dags/oaebu_workflows/onix_workflow/onix_workflow.py | 4 ++-- dags/oaebu_workflows/thoth_telescope/thoth_telescope.py | 2 +- .../ucl_discovery_telescope/ucl_discovery_telescope.py | 2 +- .../ucl_sales_telescope/ucl_sales_telescope.py | 2 +- .../google_books_telescope/test_google_books_telescope.py | 4 ++-- .../irus_fulcrum_telescope/test_irus_fulcrum_telescope.py | 4 ++-- tests/irus_oapen_telescope/test_irus_oapen_telescope.py | 6 +++--- tests/jstor_telescope/test_jstor_telescope.py | 8 ++++---- .../test_oapen_metadata_telescope.py | 2 +- tests/onix_telescope/test_onix_telescope.py | 6 +++--- tests/onix_workflow/test_onix_workflow.py | 8 ++++---- tests/thoth_telescope/test_thoth_telescope.py | 4 ++-- .../test_ucl_discovery_telescope.py | 4 ++-- tests/ucl_sales_telescope/test_ucl_sales_telescope.py | 4 ++-- 20 files changed, 36 insertions(+), 52 deletions(-) diff --git a/dags/oaebu_workflows/google_books_telescope/google_books_telescope.py b/dags/oaebu_workflows/google_books_telescope/google_books_telescope.py index 4495dc77..1022858c 100644 --- a/dags/oaebu_workflows/google_books_telescope/google_books_telescope.py +++ b/dags/oaebu_workflows/google_books_telescope/google_books_telescope.py @@ -361,9 +361,7 @@ def cleanup_workflow(release: dict, **context) -> None: """Delete all files, folders and XComs associated with this release.""" release = GoogleBooksRelease.from_dict(release) - cleanup( - dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder - ) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) ( move_files_to_in_progress(data) diff --git a/dags/oaebu_workflows/irus_fulcrum_telescope/irus_fulcrum_telescope.py b/dags/oaebu_workflows/irus_fulcrum_telescope/irus_fulcrum_telescope.py index 79b4ec50..3f8422aa 100644 --- a/dags/oaebu_workflows/irus_fulcrum_telescope/irus_fulcrum_telescope.py +++ b/dags/oaebu_workflows/irus_fulcrum_telescope/irus_fulcrum_telescope.py @@ -297,7 +297,7 @@ def add_new_dataset_releases(release: dict, **context) -> None: def cleanup_workflow(release: dict, **context) -> None: """Delete all files and folders associated with this release.""" release = IrusFulcrumRelease.from_dict(release) - cleanup(dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder) + cleanup(dag_id, workflow_folder=release.workflow_folder) # Define DAG tasks task_check = check_dependencies(airflow_conns=[irus_oapen_api_conn_id]) diff --git a/dags/oaebu_workflows/irus_oapen_telescope/irus_oapen_telescope.py b/dags/oaebu_workflows/irus_oapen_telescope/irus_oapen_telescope.py index 31dca095..c04891fd 100644 --- a/dags/oaebu_workflows/irus_oapen_telescope/irus_oapen_telescope.py +++ b/dags/oaebu_workflows/irus_oapen_telescope/irus_oapen_telescope.py @@ -396,11 +396,7 @@ def cleanup_workflow(release: dict, **context) -> None: """Delete all files, folders and XComs associated with this release.""" release = IrusOapenRelease.from_dict(release) - cleanup( - dag_id=dag_id, - execution_date=context["execution_date"], - workflow_folder=release.workflow_folder, - ) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) ( call_cloud_function_(data) diff --git a/dags/oaebu_workflows/jstor_telescope/jstor_telescope.py b/dags/oaebu_workflows/jstor_telescope/jstor_telescope.py index c200a868..6e13f3f0 100644 --- a/dags/oaebu_workflows/jstor_telescope/jstor_telescope.py +++ b/dags/oaebu_workflows/jstor_telescope/jstor_telescope.py @@ -405,11 +405,7 @@ def cleanup_workflow(release: dict, **context) -> None: release = JstorRelease.from_dict(release) api = make_jstor_api(entity_type, entity_id) - cleanup( - dag_id=dag_id, - execution_date=context["execution_date"], - workflow_folder=release.workflow_folder, - ) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) success = api.add_labels(release.reports) set_task_state(success, context["ti"].task_id, release=release) diff --git a/dags/oaebu_workflows/oapen_metadata_telescope/oapen_metadata_telescope.py b/dags/oaebu_workflows/oapen_metadata_telescope/oapen_metadata_telescope.py index ed5c675f..9ed7de36 100644 --- a/dags/oaebu_workflows/oapen_metadata_telescope/oapen_metadata_telescope.py +++ b/dags/oaebu_workflows/oapen_metadata_telescope/oapen_metadata_telescope.py @@ -277,11 +277,7 @@ def cleanup_workflow(release: dict, **context) -> None: """Delete all files, folders and XComs associated with this release.""" release = OapenMetadataRelease.from_dict(release) - cleanup( - dag_id=dag_id, - execution_date=context["execution_date"], - workflow_folder=release.workflow_folder, - ) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) task_check_dependencies = check_dependencies() xcom_release = make_release() diff --git a/dags/oaebu_workflows/onix_telescope/onix_telescope.py b/dags/oaebu_workflows/onix_telescope/onix_telescope.py index 5c3d6c5e..991162e5 100644 --- a/dags/oaebu_workflows/onix_telescope/onix_telescope.py +++ b/dags/oaebu_workflows/onix_telescope/onix_telescope.py @@ -305,9 +305,7 @@ def cleanup_workflow(release: dict, **context) -> None: """Delete all files, folders and XComs associated with this release.""" release = OnixRelease.from_dict(release) - cleanup( - dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder - ) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) ( move_files_to_in_progress(data) diff --git a/dags/oaebu_workflows/onix_workflow/onix_workflow.py b/dags/oaebu_workflows/onix_workflow/onix_workflow.py index bed04126..bf36722c 100644 --- a/dags/oaebu_workflows/onix_workflow/onix_workflow.py +++ b/dags/oaebu_workflows/onix_workflow/onix_workflow.py @@ -344,7 +344,7 @@ def make_sensors(): def make_release(**context) -> dict: """Creates a release object. - :param context: From Airflow. Contains the execution_date. + :param context: From Airflow. Contains the logical_date. :return: a dictionary representation of the OnixWorkflowRelease object. """ @@ -1026,7 +1026,7 @@ def cleanup_workflow(release: dict, **context): """Cleanup temporary files.""" release = OnixWorkflowRelease.from_dict(release) - cleanup(dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) # Define DAG tasks task_check_dependencies = check_dependencies() diff --git a/dags/oaebu_workflows/thoth_telescope/thoth_telescope.py b/dags/oaebu_workflows/thoth_telescope/thoth_telescope.py index 6e2f85aa..a2a61554 100644 --- a/dags/oaebu_workflows/thoth_telescope/thoth_telescope.py +++ b/dags/oaebu_workflows/thoth_telescope/thoth_telescope.py @@ -253,7 +253,7 @@ def cleanup_workflow(release: dict, **content) -> None: """Delete all files, folders and XComs associated with this release.""" release = ThothRelease.from_dict(release) - cleanup(dag_id=dag_id, execution_date=content["execution_date"], workflow_folder=release.workflow_folder) + cleanup(dag_id=dag_id, logical_date=content["logical_date"], workflow_folder=release.workflow_folder) task_check_dependencies = check_dependencies() xcom_release = make_release() diff --git a/dags/oaebu_workflows/ucl_discovery_telescope/ucl_discovery_telescope.py b/dags/oaebu_workflows/ucl_discovery_telescope/ucl_discovery_telescope.py index a8b30647..6f41525a 100644 --- a/dags/oaebu_workflows/ucl_discovery_telescope/ucl_discovery_telescope.py +++ b/dags/oaebu_workflows/ucl_discovery_telescope/ucl_discovery_telescope.py @@ -322,7 +322,7 @@ def cleanup_workflow(release: dict, **context) -> None: """Delete all files, folders and XComs associated with this release.""" release = UclDiscoveryRelease.from_dict(release) - cleanup(dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) task_check_dependencies = check_dependencies(airflow_conns=[oaebu_service_account_conn_id]) xcom_release = make_release() diff --git a/dags/oaebu_workflows/ucl_sales_telescope/ucl_sales_telescope.py b/dags/oaebu_workflows/ucl_sales_telescope/ucl_sales_telescope.py index 5a560072..211a6df1 100644 --- a/dags/oaebu_workflows/ucl_sales_telescope/ucl_sales_telescope.py +++ b/dags/oaebu_workflows/ucl_sales_telescope/ucl_sales_telescope.py @@ -251,7 +251,7 @@ def _cleanup_workflow(release: dict, **context) -> None: """Delete all files, folders and XComs associated with this release.""" release = UclSalesRelease.from_dict(release) - cleanup(dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder) + cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder) task_check_dependencies = check_dependencies(airflow_conns=[oaebu_service_account_conn_id]) xcom_release = _make_release() diff --git a/tests/google_books_telescope/test_google_books_telescope.py b/tests/google_books_telescope/test_google_books_telescope.py index 2b09ba89..6a8b9afa 100644 --- a/tests/google_books_telescope/test_google_books_telescope.py +++ b/tests/google_books_telescope/test_google_books_telescope.py @@ -129,7 +129,7 @@ def test_telescope(self): with sftp_server.create() as sftp_root: # Setup DAG - execution_date = pendulum.datetime(year=2021, month=3, day=31) + logical_date = pendulum.datetime(year=2021, month=3, day=31) sales_partner = partner_from_str("google_books_sales") sales_partner.bq_dataset_id = dataset_id traffic_partner = partner_from_str("google_books_traffic") @@ -151,7 +151,7 @@ def test_telescope(self): env.add_connection( Connection(conn_id=sftp_service_conn_id, uri=f"ssh://:password@localhost:{self.sftp_port}") ) - with env.create_dag_run(dag, execution_date): + with env.create_dag_run(dag, logical_date=logical_date): # Test that all dependencies are specified: no error should be thrown ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) diff --git a/tests/irus_fulcrum_telescope/test_irus_fulcrum_telescope.py b/tests/irus_fulcrum_telescope/test_irus_fulcrum_telescope.py index d54800b6..e1600536 100644 --- a/tests/irus_fulcrum_telescope/test_irus_fulcrum_telescope.py +++ b/tests/irus_fulcrum_telescope/test_irus_fulcrum_telescope.py @@ -106,7 +106,7 @@ def test_telescope(self): # Create the Observatory environment and run tests with env.create(): # Setup DAG - execution_date = pendulum.datetime(year=2022, month=4, day=7) + logical_date = pendulum.datetime(year=2022, month=4, day=7) data_partner = partner_from_str("irus_fulcrum") data_partner.bq_dataset_id = env.add_dataset() api_bq_dataset_id = env.add_dataset() @@ -121,7 +121,7 @@ def test_telescope(self): env.add_connection(Connection(conn_id="irus_api", uri=f"http://fake_api_login:@")) # Add the fake requestor ID as a connection - with env.create_dag_run(dag, execution_date): + with env.create_dag_run(dag, logical_date=logical_date): # Test that all dependencies are specified: no error should be thrown ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) diff --git a/tests/irus_oapen_telescope/test_irus_oapen_telescope.py b/tests/irus_oapen_telescope/test_irus_oapen_telescope.py index 57be0413..0e9a76e4 100644 --- a/tests/irus_oapen_telescope/test_irus_oapen_telescope.py +++ b/tests/irus_oapen_telescope/test_irus_oapen_telescope.py @@ -127,7 +127,7 @@ def test_telescope(self, mock_auth_session): env = SandboxEnvironment(self.project_id, self.data_location) # Setup DAG - execution_date = pendulum.datetime(year=2021, month=2, day=14) + logical_date = pendulum.datetime(year=2021, month=2, day=14) data_partner = partner_from_str("irus_oapen") data_partner.bq_dataset_id = env.add_dataset() dag_id = "irus_oapen_test" @@ -172,7 +172,7 @@ def test_telescope(self, mock_auth_session): # Create the Observatory environment and run tests with env.create(task_logging=True): - with env.create_dag_run(dag, execution_date): + with env.create_dag_run(dag, logical_date=logical_date): # Add airflow connections geoip_license_conn_id = "geoip_license_key" @@ -381,7 +381,7 @@ def assert_mocks(create: bool, update: bool): env = SandboxEnvironment( self.project_id, self.data_location, api_host="localhost", api_port=find_free_port() ) - with env.create_dag_run(dag, pendulum.datetime(year=2023, month=1, day=1)): + with env.create_dag_run(dag, logical_date=pendulum.datetime(year=2023, month=1, day=1)): ti = env.run_task("fetch_releases") diff --git a/tests/jstor_telescope/test_jstor_telescope.py b/tests/jstor_telescope/test_jstor_telescope.py index 762d3d98..3cec697b 100644 --- a/tests/jstor_telescope/test_jstor_telescope.py +++ b/tests/jstor_telescope/test_jstor_telescope.py @@ -191,7 +191,7 @@ def test_telescope_publisher(self, mock_account_credentials, mock_build): env.add_connection(dummy_gmail_connection()) # Setup Telescope - execution_date = pendulum.datetime(year=2020, month=11, day=1) + logical_date = pendulum.datetime(year=2020, month=11, day=1) country_partner = partner_from_str("jstor_country") dataset_id = env.add_dataset() country_partner.bq_dataset_id = dataset_id @@ -211,7 +211,7 @@ def test_telescope_publisher(self, mock_account_credentials, mock_build): ) # Begin DAG run - with env.create_dag_run(dag, execution_date): + with env.create_dag_run(dag, logical_date=logical_date): # Test that all dependencies are specified: no error should be thrown ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) @@ -490,7 +490,7 @@ def test_telescope_collection(self, mock_account_credentials, mock_build): env.add_connection(dummy_gmail_connection()) # Setup DAG - execution_date = pendulum.datetime(year=2023, month=10, day=4) + logical_date = pendulum.datetime(year=2023, month=10, day=4) country_partner = partner_from_str("jstor_country_collection") dataset_id = env.add_dataset() country_partner.bq_dataset_id = dataset_id @@ -510,7 +510,7 @@ def test_telescope_collection(self, mock_account_credentials, mock_build): ) # Begin DAG run - with env.create_dag_run(dag, execution_date): + with env.create_dag_run(dag, logical_date=logical_date): # Test that all dependencies are specified: no error should be thrown ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) diff --git a/tests/oapen_metadata_telescope/test_oapen_metadata_telescope.py b/tests/oapen_metadata_telescope/test_oapen_metadata_telescope.py index 5e43fc2a..cee5db15 100644 --- a/tests/oapen_metadata_telescope/test_oapen_metadata_telescope.py +++ b/tests/oapen_metadata_telescope/test_oapen_metadata_telescope.py @@ -120,7 +120,7 @@ def test_telescope(self): ) # first run - with env.create_dag_run(dag, pendulum.datetime(year=2021, month=2, day=1)): + with env.create_dag_run(dag, logical_date=pendulum.datetime(year=2021, month=2, day=1)): # Test that all dependencies are specified: no error should be thrown ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) diff --git a/tests/onix_telescope/test_onix_telescope.py b/tests/onix_telescope/test_onix_telescope.py index 983c1e1f..d9db2a01 100644 --- a/tests/onix_telescope/test_onix_telescope.py +++ b/tests/onix_telescope/test_onix_telescope.py @@ -123,7 +123,7 @@ def test_telescope(self): with env.create(), sftp_server.create() as sftp_root: # Setup DAG - execution_date = pendulum.datetime(year=2021, month=3, day=31) + logical_date = pendulum.datetime(year=2021, month=3, day=31) metadata_partner = partner_from_str("onix", metadata_partner=True) metadata_partner.bq_dataset_id = env.add_dataset() api_bq_dataset_id = env.add_dataset() @@ -143,7 +143,7 @@ def test_telescope(self): # Add SFTP connection conn = Connection(conn_id=sftp_service_conn_id, uri=f"ssh://:password@localhost:{self.sftp_port}") env.add_connection(conn) - with env.create_dag_run(dag, execution_date): + with env.create_dag_run(dag, logical_date=logical_date): # Test that all dependencies are specified: no error should be thrown ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) @@ -218,7 +218,7 @@ def test_telescope(self): self.assertEqual(len(dataset_releases), 0) # Add dataset release task - now = pendulum.now() + now = pendulum.now() with patch("oaebu_workflows.onix_telescope.onix_telescope.pendulum.now") as mock_now: mock_now.return_value = now ti = env.run_task("process_release.add_new_dataset_releases", map_index=0) diff --git a/tests/onix_workflow/test_onix_workflow.py b/tests/onix_workflow/test_onix_workflow.py index da36e5fb..62092224 100644 --- a/tests/onix_workflow/test_onix_workflow.py +++ b/tests/onix_workflow/test_onix_workflow.py @@ -1034,16 +1034,16 @@ def vcr_ignore_condition(request): sensor.grace_period = timedelta(seconds=1) # Run Dummy Dags - execution_date = pendulum.datetime(year=2021, month=5, day=17) + logical_date = pendulum.datetime(year=2021, month=5, day=17) for sensor_id in sensor_dag_ids: - dummy_dag = make_dummy_dag(sensor_id, execution_date) - with env.create_dag_run(dummy_dag, execution_date): + dummy_dag = make_dummy_dag(sensor_id, logical_date) + with env.create_dag_run(dummy_dag, logical_date=logical_date): # Running all of a DAGs tasks sets the DAG to finished ti = env.run_task("dummy_task") self.assertEqual(ti.state, State.SUCCESS) # Run end to end tests for DAG - with env.create_dag_run(dag, data_interval=DataInterval(execution_date, execution_date.add(days=7))): + with env.create_dag_run(dag, data_interval=DataInterval(logical_date, logical_date.add(days=7))): # Run dependency check ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) diff --git a/tests/thoth_telescope/test_thoth_telescope.py b/tests/thoth_telescope/test_thoth_telescope.py index 5d52723d..7d36114b 100644 --- a/tests/thoth_telescope/test_thoth_telescope.py +++ b/tests/thoth_telescope/test_thoth_telescope.py @@ -124,7 +124,7 @@ def test_telescope(self): # Create the Observatory environment and run tests with env.create(): # Setup Telescope - execution_date = pendulum.datetime(year=2022, month=12, day=1) + logical_date = pendulum.datetime(year=2022, month=12, day=1) metadata_partner = partner_from_str("thoth", metadata_partner=True) metadata_partner.bq_dataset_id = env.add_dataset() dag_id = "thoth_telescope_test" @@ -139,7 +139,7 @@ def test_telescope(self): api_bq_dataset_id=api_bq_dataset_id, ) - with env.create_dag_run(dag, execution_date): + with env.create_dag_run(dag, logical_date=logical_date): # Check dependencies task ti = env.run_task("check_dependencies") self.assertEqual(ti.state, State.SUCCESS) diff --git a/tests/ucl_discovery_telescope/test_ucl_discovery_telescope.py b/tests/ucl_discovery_telescope/test_ucl_discovery_telescope.py index 94c0892a..ab3ce462 100644 --- a/tests/ucl_discovery_telescope/test_ucl_discovery_telescope.py +++ b/tests/ucl_discovery_telescope/test_ucl_discovery_telescope.py @@ -105,10 +105,10 @@ def test_telescope(self): max_threads=1, api_bq_dataset_id=api_bq_dataset_id, ) - execution_date = pendulum.datetime(year=2023, month=6, day=1) + logical_date = pendulum.datetime(year=2023, month=6, day=1) # Create the Observatory environment and run tests - with env.create(), env.create_dag_run(dag, execution_date): + with env.create(), env.create_dag_run(dag, logical_date=logical_date): # Mock return values of download function interval_start = pendulum.instance(env.dag_run.data_interval_start) sheet_return = [ diff --git a/tests/ucl_sales_telescope/test_ucl_sales_telescope.py b/tests/ucl_sales_telescope/test_ucl_sales_telescope.py index 00507545..cbcc2770 100644 --- a/tests/ucl_sales_telescope/test_ucl_sales_telescope.py +++ b/tests/ucl_sales_telescope/test_ucl_sales_telescope.py @@ -111,10 +111,10 @@ def test_telescope(self): data_partner=data_partner, api_bq_dataset_id=api_bq_dataset_id, ) - execution_date = pendulum.datetime(year=2024, month=2, day=4) + logical_date = pendulum.datetime(year=2024, month=2, day=4) # Create the Observatory environment and run tests - with env.create(), env.create_dag_run(dag, execution_date): + with env.create(), env.create_dag_run(dag, logical_date=logical_date): # Mock return values of download function sheet_return = [ ["Year", "Month", "Free/Paid/Return?", "Country", "ISBN", "Book", "Qty"],