Skip to content

Commit

Permalink
use logical date
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jul 24, 2024
1 parent 8d3ad77 commit fa0a3c2
Show file tree
Hide file tree
Showing 20 changed files with 36 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,7 @@ def cleanup_workflow(release: dict, **context) -> None:
"""Delete all files, folders and XComs associated with this release."""

release = GoogleBooksRelease.from_dict(release)
cleanup(
dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder
)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)

(
move_files_to_in_progress(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def add_new_dataset_releases(release: dict, **context) -> None:
def cleanup_workflow(release: dict, **context) -> None:
"""Delete all files and folders associated with this release."""
release = IrusFulcrumRelease.from_dict(release)
cleanup(dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder)
cleanup(dag_id, workflow_folder=release.workflow_folder)

# Define DAG tasks
task_check = check_dependencies(airflow_conns=[irus_oapen_api_conn_id])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,7 @@ def cleanup_workflow(release: dict, **context) -> None:
"""Delete all files, folders and XComs associated with this release."""

release = IrusOapenRelease.from_dict(release)
cleanup(
dag_id=dag_id,
execution_date=context["execution_date"],
workflow_folder=release.workflow_folder,
)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)

(
call_cloud_function_(data)
Expand Down
6 changes: 1 addition & 5 deletions dags/oaebu_workflows/jstor_telescope/jstor_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,7 @@ def cleanup_workflow(release: dict, **context) -> None:

release = JstorRelease.from_dict(release)
api = make_jstor_api(entity_type, entity_id)
cleanup(
dag_id=dag_id,
execution_date=context["execution_date"],
workflow_folder=release.workflow_folder,
)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)
success = api.add_labels(release.reports)
set_task_state(success, context["ti"].task_id, release=release)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,7 @@ def cleanup_workflow(release: dict, **context) -> None:
"""Delete all files, folders and XComs associated with this release."""

release = OapenMetadataRelease.from_dict(release)
cleanup(
dag_id=dag_id,
execution_date=context["execution_date"],
workflow_folder=release.workflow_folder,
)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)

task_check_dependencies = check_dependencies()
xcom_release = make_release()
Expand Down
4 changes: 1 addition & 3 deletions dags/oaebu_workflows/onix_telescope/onix_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,7 @@ def cleanup_workflow(release: dict, **context) -> None:
"""Delete all files, folders and XComs associated with this release."""

release = OnixRelease.from_dict(release)
cleanup(
dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder
)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)

(
move_files_to_in_progress(data)
Expand Down
4 changes: 2 additions & 2 deletions dags/oaebu_workflows/onix_workflow/onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def make_sensors():
def make_release(**context) -> dict:
"""Creates a release object.
:param context: From Airflow. Contains the execution_date.
:param context: From Airflow. Contains the logical_date.
:return: a dictionary representation of the OnixWorkflowRelease object.
"""

Expand Down Expand Up @@ -1026,7 +1026,7 @@ def cleanup_workflow(release: dict, **context):
"""Cleanup temporary files."""

release = OnixWorkflowRelease.from_dict(release)
cleanup(dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)

# Define DAG tasks
task_check_dependencies = check_dependencies()
Expand Down
2 changes: 1 addition & 1 deletion dags/oaebu_workflows/thoth_telescope/thoth_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def cleanup_workflow(release: dict, **content) -> None:
"""Delete all files, folders and XComs associated with this release."""

release = ThothRelease.from_dict(release)
cleanup(dag_id=dag_id, execution_date=content["execution_date"], workflow_folder=release.workflow_folder)
cleanup(dag_id=dag_id, logical_date=content["logical_date"], workflow_folder=release.workflow_folder)

task_check_dependencies = check_dependencies()
xcom_release = make_release()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def cleanup_workflow(release: dict, **context) -> None:
"""Delete all files, folders and XComs associated with this release."""

release = UclDiscoveryRelease.from_dict(release)
cleanup(dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)

task_check_dependencies = check_dependencies(airflow_conns=[oaebu_service_account_conn_id])
xcom_release = make_release()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def _cleanup_workflow(release: dict, **context) -> None:
"""Delete all files, folders and XComs associated with this release."""

release = UclSalesRelease.from_dict(release)
cleanup(dag_id=dag_id, execution_date=context["execution_date"], workflow_folder=release.workflow_folder)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)

task_check_dependencies = check_dependencies(airflow_conns=[oaebu_service_account_conn_id])
xcom_release = _make_release()
Expand Down
4 changes: 2 additions & 2 deletions tests/google_books_telescope/test_google_books_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def test_telescope(self):
with sftp_server.create() as sftp_root:

# Setup DAG
execution_date = pendulum.datetime(year=2021, month=3, day=31)
logical_date = pendulum.datetime(year=2021, month=3, day=31)
sales_partner = partner_from_str("google_books_sales")
sales_partner.bq_dataset_id = dataset_id
traffic_partner = partner_from_str("google_books_traffic")
Expand All @@ -151,7 +151,7 @@ def test_telescope(self):
env.add_connection(
Connection(conn_id=sftp_service_conn_id, uri=f"ssh://:password@localhost:{self.sftp_port}")
)
with env.create_dag_run(dag, execution_date):
with env.create_dag_run(dag, logical_date=logical_date):
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down
4 changes: 2 additions & 2 deletions tests/irus_fulcrum_telescope/test_irus_fulcrum_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_telescope(self):
# Create the Observatory environment and run tests
with env.create():
# Setup DAG
execution_date = pendulum.datetime(year=2022, month=4, day=7)
logical_date = pendulum.datetime(year=2022, month=4, day=7)
data_partner = partner_from_str("irus_fulcrum")
data_partner.bq_dataset_id = env.add_dataset()
api_bq_dataset_id = env.add_dataset()
Expand All @@ -121,7 +121,7 @@ def test_telescope(self):
env.add_connection(Connection(conn_id="irus_api", uri=f"http://fake_api_login:@"))

# Add the fake requestor ID as a connection
with env.create_dag_run(dag, execution_date):
with env.create_dag_run(dag, logical_date=logical_date):
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down
6 changes: 3 additions & 3 deletions tests/irus_oapen_telescope/test_irus_oapen_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_telescope(self, mock_auth_session):
env = SandboxEnvironment(self.project_id, self.data_location)

# Setup DAG
execution_date = pendulum.datetime(year=2021, month=2, day=14)
logical_date = pendulum.datetime(year=2021, month=2, day=14)
data_partner = partner_from_str("irus_oapen")
data_partner.bq_dataset_id = env.add_dataset()
dag_id = "irus_oapen_test"
Expand Down Expand Up @@ -172,7 +172,7 @@ def test_telescope(self, mock_auth_session):

# Create the Observatory environment and run tests
with env.create(task_logging=True):
with env.create_dag_run(dag, execution_date):
with env.create_dag_run(dag, logical_date=logical_date):

# Add airflow connections
geoip_license_conn_id = "geoip_license_key"
Expand Down Expand Up @@ -381,7 +381,7 @@ def assert_mocks(create: bool, update: bool):
env = SandboxEnvironment(
self.project_id, self.data_location, api_host="localhost", api_port=find_free_port()
)
with env.create_dag_run(dag, pendulum.datetime(year=2023, month=1, day=1)):
with env.create_dag_run(dag, logical_date=pendulum.datetime(year=2023, month=1, day=1)):

ti = env.run_task("fetch_releases")

Expand Down
8 changes: 4 additions & 4 deletions tests/jstor_telescope/test_jstor_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def test_telescope_publisher(self, mock_account_credentials, mock_build):
env.add_connection(dummy_gmail_connection())

# Setup Telescope
execution_date = pendulum.datetime(year=2020, month=11, day=1)
logical_date = pendulum.datetime(year=2020, month=11, day=1)
country_partner = partner_from_str("jstor_country")
dataset_id = env.add_dataset()
country_partner.bq_dataset_id = dataset_id
Expand All @@ -211,7 +211,7 @@ def test_telescope_publisher(self, mock_account_credentials, mock_build):
)

# Begin DAG run
with env.create_dag_run(dag, execution_date):
with env.create_dag_run(dag, logical_date=logical_date):
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down Expand Up @@ -490,7 +490,7 @@ def test_telescope_collection(self, mock_account_credentials, mock_build):
env.add_connection(dummy_gmail_connection())

# Setup DAG
execution_date = pendulum.datetime(year=2023, month=10, day=4)
logical_date = pendulum.datetime(year=2023, month=10, day=4)
country_partner = partner_from_str("jstor_country_collection")
dataset_id = env.add_dataset()
country_partner.bq_dataset_id = dataset_id
Expand All @@ -510,7 +510,7 @@ def test_telescope_collection(self, mock_account_credentials, mock_build):
)

# Begin DAG run
with env.create_dag_run(dag, execution_date):
with env.create_dag_run(dag, logical_date=logical_date):
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_telescope(self):
)

# first run
with env.create_dag_run(dag, pendulum.datetime(year=2021, month=2, day=1)):
with env.create_dag_run(dag, logical_date=pendulum.datetime(year=2021, month=2, day=1)):
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down
6 changes: 3 additions & 3 deletions tests/onix_telescope/test_onix_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_telescope(self):

with env.create(), sftp_server.create() as sftp_root:
# Setup DAG
execution_date = pendulum.datetime(year=2021, month=3, day=31)
logical_date = pendulum.datetime(year=2021, month=3, day=31)
metadata_partner = partner_from_str("onix", metadata_partner=True)
metadata_partner.bq_dataset_id = env.add_dataset()
api_bq_dataset_id = env.add_dataset()
Expand All @@ -143,7 +143,7 @@ def test_telescope(self):
# Add SFTP connection
conn = Connection(conn_id=sftp_service_conn_id, uri=f"ssh://:password@localhost:{self.sftp_port}")
env.add_connection(conn)
with env.create_dag_run(dag, execution_date):
with env.create_dag_run(dag, logical_date=logical_date):
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down Expand Up @@ -218,7 +218,7 @@ def test_telescope(self):
self.assertEqual(len(dataset_releases), 0)

# Add dataset release task
now = pendulum.now()
now = pendulum.now()
with patch("oaebu_workflows.onix_telescope.onix_telescope.pendulum.now") as mock_now:
mock_now.return_value = now
ti = env.run_task("process_release.add_new_dataset_releases", map_index=0)
Expand Down
8 changes: 4 additions & 4 deletions tests/onix_workflow/test_onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,16 +1034,16 @@ def vcr_ignore_condition(request):
sensor.grace_period = timedelta(seconds=1)

# Run Dummy Dags
execution_date = pendulum.datetime(year=2021, month=5, day=17)
logical_date = pendulum.datetime(year=2021, month=5, day=17)
for sensor_id in sensor_dag_ids:
dummy_dag = make_dummy_dag(sensor_id, execution_date)
with env.create_dag_run(dummy_dag, execution_date):
dummy_dag = make_dummy_dag(sensor_id, logical_date)
with env.create_dag_run(dummy_dag, logical_date=logical_date):
# Running all of a DAGs tasks sets the DAG to finished
ti = env.run_task("dummy_task")
self.assertEqual(ti.state, State.SUCCESS)

# Run end to end tests for DAG
with env.create_dag_run(dag, data_interval=DataInterval(execution_date, execution_date.add(days=7))):
with env.create_dag_run(dag, data_interval=DataInterval(logical_date, logical_date.add(days=7))):
# Run dependency check
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down
4 changes: 2 additions & 2 deletions tests/thoth_telescope/test_thoth_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_telescope(self):
# Create the Observatory environment and run tests
with env.create():
# Setup Telescope
execution_date = pendulum.datetime(year=2022, month=12, day=1)
logical_date = pendulum.datetime(year=2022, month=12, day=1)
metadata_partner = partner_from_str("thoth", metadata_partner=True)
metadata_partner.bq_dataset_id = env.add_dataset()
dag_id = "thoth_telescope_test"
Expand All @@ -139,7 +139,7 @@ def test_telescope(self):
api_bq_dataset_id=api_bq_dataset_id,
)

with env.create_dag_run(dag, execution_date):
with env.create_dag_run(dag, logical_date=logical_date):
# Check dependencies task
ti = env.run_task("check_dependencies")
self.assertEqual(ti.state, State.SUCCESS)
Expand Down
4 changes: 2 additions & 2 deletions tests/ucl_discovery_telescope/test_ucl_discovery_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def test_telescope(self):
max_threads=1,
api_bq_dataset_id=api_bq_dataset_id,
)
execution_date = pendulum.datetime(year=2023, month=6, day=1)
logical_date = pendulum.datetime(year=2023, month=6, day=1)

# Create the Observatory environment and run tests
with env.create(), env.create_dag_run(dag, execution_date):
with env.create(), env.create_dag_run(dag, logical_date=logical_date):
# Mock return values of download function
interval_start = pendulum.instance(env.dag_run.data_interval_start)
sheet_return = [
Expand Down
4 changes: 2 additions & 2 deletions tests/ucl_sales_telescope/test_ucl_sales_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ def test_telescope(self):
data_partner=data_partner,
api_bq_dataset_id=api_bq_dataset_id,
)
execution_date = pendulum.datetime(year=2024, month=2, day=4)
logical_date = pendulum.datetime(year=2024, month=2, day=4)

# Create the Observatory environment and run tests
with env.create(), env.create_dag_run(dag, execution_date):
with env.create(), env.create_dag_run(dag, logical_date=logical_date):
# Mock return values of download function
sheet_return = [
["Year", "Month", "Free/Paid/Return?", "Country", "ISBN", "Book", "Qty"],
Expand Down

0 comments on commit fa0a3c2

Please sign in to comment.