From de30d845adc30490369a50f3bacea98a07073046 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Tue, 23 Jul 2024 16:58:59 +0800 Subject: [PATCH] Schedule should be monday --- plugins/onix_workflow_schedule.py | 15 ++--- .../test_onix_workflow_schedule.py | 58 +++++++++---------- 2 files changed, 34 insertions(+), 39 deletions(-) diff --git a/plugins/onix_workflow_schedule.py b/plugins/onix_workflow_schedule.py index 821fa564..bba0ea6f 100644 --- a/plugins/onix_workflow_schedule.py +++ b/plugins/onix_workflow_schedule.py @@ -25,11 +25,8 @@ def get_start_of_interval(self, time: DateTime) -> DateTime: :return: The previous runtime """ - # Get the previous sunday - days_delta = time.weekday() + 1 - if days_delta == 7: # Is a sunday, don't alter the input - days_delta = 0 - start_time = time - timedelta(days_delta) + # Get the previous monday + start_time = time - timedelta(time.weekday()) # Don't allow the start date to cross the 5th of the month if time >= time.replace(day=5) and start_time <= time.replace(day=5): @@ -43,10 +40,8 @@ def get_end_of_interval(self, time: DateTime) -> DateTime: :return: The end of the interval """ - # Get the next sunday - days_delta = 7 - (time.weekday() + 1) - if days_delta == 0: # Is a sunday, skip ahead 7 days - days_delta += 7 + # Get the next monday + days_delta = 7 - (time.weekday()) end_time = time + timedelta(days_delta) # Don't allow the end date to cross the 5th of the month @@ -63,7 +58,7 @@ def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: # Start of interval - the end of the previous sunday or the 5th start = self.get_start_of_interval(run_after) - end = self.get_end_of_interval(start) + end = self.get_end_of_interval(run_after) return DataInterval(start=start, end=end) diff --git a/tests/onix_workflow/test_onix_workflow_schedule.py b/tests/onix_workflow/test_onix_workflow_schedule.py index b1a2756b..f536ed38 100644 --- a/tests/onix_workflow/test_onix_workflow_schedule.py +++ b/tests/onix_workflow/test_onix_workflow_schedule.py @@ -48,9 +48,9 @@ def test_dag_run(self): env = SandboxEnvironment() with env.create(): dag_start_date = pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC) - expected_start_date = pendulum.datetime(year=2020, month=1, day=26, tz=pendulum.UTC) # Sunday - expected_end_date = pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC) # Sunday - now = pendulum.datetime(year=2020, month=2, day=1, tz=pendulum.UTC) # Saturday + expected_start_date = pendulum.datetime(year=2020, month=1, day=27, tz=pendulum.UTC) # Monday + expected_end_date = pendulum.datetime(year=2020, month=2, day=3, tz=pendulum.UTC) # Monday + now = pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC) # Sunday with time_machine.travel(now): dag = make_test_dag(dag_start_date) dag_run_info = dag.next_dagrun_info(last_automated_dagrun=None) @@ -70,7 +70,7 @@ def test_dag_run_with_catchup(self): """Test the schedule when it's called by airflow's next_dagrun_info function with the catchup setting on""" env = SandboxEnvironment() with env.create(): - start_date = pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC) # Sunday + start_date = pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC) # Monday dag = make_test_dag(start_date, catchup=True) # The timetable raises a value error but it's caught by airflow. So we check that info==None info = dag.next_dagrun_info(last_automated_dagrun=None) @@ -82,16 +82,16 @@ def test_get_start_of_interval(self): inputs = [ pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC), # Wednesday pendulum.datetime(year=2020, month=1, day=31, tz=pendulum.UTC), # Friday - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th - pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday the 5th + pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC), # Tueday pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # Wednesday pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.timezone("Etc/GMT+1")), # Altered timezone ] expected_outputs = [ - pendulum.datetime(year=2019, month=12, day=29, tz=pendulum.UTC), # Sunday - pendulum.datetime(year=2020, month=1, day=26, tz=pendulum.UTC), # Sunday - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday The 5th - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday The 5th + pendulum.datetime(year=2019, month=12, day=30, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=1, day=27, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday The 5th + pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # The 5th pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # The 5th @ UTC ] @@ -106,18 +106,18 @@ def test_get_end_of_interval(self): inputs = [ pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC), # Wednesday pendulum.datetime(year=2020, month=1, day=31, tz=pendulum.UTC), # Friday - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th - pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday the 5th + pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC), # Tuesday pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # Wednesday pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.timezone("Etc/GMT+1")), # Altered timezone ] expected_outputs = [ - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th - pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC), # Sunday - pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC), # Sunday (1 week after start) - pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC), # Sunday - pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC), # Sunday - pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC), # Sunday @ UTC + pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # The 5th + pendulum.datetime(year=2020, month=2, day=3, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=10, day=12, tz=pendulum.UTC), # Monday (1 week after start) + pendulum.datetime(year=2020, month=1, day=13, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC), # Monday @ UTC ] for i, eo in zip(inputs, expected_outputs): logging.info(f"Input time: {i}") @@ -130,35 +130,35 @@ def test_infer_manual_data_interval(self): inputs = [ pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC), # Wednesday pendulum.datetime(year=2020, month=1, day=31, tz=pendulum.UTC), # Friday - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th - pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday + pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday the 5th + pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC), # Tuesday pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # Wednesday pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.timezone("Etc/GMT+1")), # Altered timezone ] expected_outputs = [ DataInterval( - pendulum.datetime(year=2019, month=12, day=29, tz=pendulum.UTC), + pendulum.datetime(year=2019, month=12, day=30, tz=pendulum.UTC), pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), ), DataInterval( - pendulum.datetime(year=2020, month=1, day=26, tz=pendulum.UTC), - pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=1, day=27, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=2, day=3, tz=pendulum.UTC), ), DataInterval( - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), - pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=10, day=12, tz=pendulum.UTC), ), DataInterval( - pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), - pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=1, day=13, tz=pendulum.UTC), ), DataInterval( pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), - pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC), ), DataInterval( pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), - pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC), + pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC), ), ] for i, eo in zip(inputs, expected_outputs):