Skip to content

Commit

Permalink
Schedule should be monday
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jul 23, 2024
1 parent a80b5f9 commit de30d84
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 39 deletions.
15 changes: 5 additions & 10 deletions plugins/onix_workflow_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ def get_start_of_interval(self, time: DateTime) -> DateTime:
:return: The previous runtime
"""

# Get the previous sunday
days_delta = time.weekday() + 1
if days_delta == 7: # Is a sunday, don't alter the input
days_delta = 0
start_time = time - timedelta(days_delta)
# Get the previous monday
start_time = time - timedelta(time.weekday())

# Don't allow the start date to cross the 5th of the month
if time >= time.replace(day=5) and start_time <= time.replace(day=5):
Expand All @@ -43,10 +40,8 @@ def get_end_of_interval(self, time: DateTime) -> DateTime:
:return: The end of the interval
"""

# Get the next sunday
days_delta = 7 - (time.weekday() + 1)
if days_delta == 0: # Is a sunday, skip ahead 7 days
days_delta += 7
# Get the next monday
days_delta = 7 - (time.weekday())
end_time = time + timedelta(days_delta)

# Don't allow the end date to cross the 5th of the month
Expand All @@ -63,7 +58,7 @@ def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:

# Start of interval - the end of the previous sunday or the 5th
start = self.get_start_of_interval(run_after)
end = self.get_end_of_interval(start)
end = self.get_end_of_interval(run_after)

return DataInterval(start=start, end=end)

Expand Down
58 changes: 29 additions & 29 deletions tests/onix_workflow/test_onix_workflow_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def test_dag_run(self):
env = SandboxEnvironment()
with env.create():
dag_start_date = pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC)
expected_start_date = pendulum.datetime(year=2020, month=1, day=26, tz=pendulum.UTC) # Sunday
expected_end_date = pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC) # Sunday
now = pendulum.datetime(year=2020, month=2, day=1, tz=pendulum.UTC) # Saturday
expected_start_date = pendulum.datetime(year=2020, month=1, day=27, tz=pendulum.UTC) # Monday
expected_end_date = pendulum.datetime(year=2020, month=2, day=3, tz=pendulum.UTC) # Monday
now = pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC) # Sunday
with time_machine.travel(now):
dag = make_test_dag(dag_start_date)
dag_run_info = dag.next_dagrun_info(last_automated_dagrun=None)
Expand All @@ -70,7 +70,7 @@ def test_dag_run_with_catchup(self):
"""Test the schedule when it's called by airflow's next_dagrun_info function with the catchup setting on"""
env = SandboxEnvironment()
with env.create():
start_date = pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC) # Sunday
start_date = pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC) # Monday
dag = make_test_dag(start_date, catchup=True)
# The timetable raises a value error but it's caught by airflow. So we check that info==None
info = dag.next_dagrun_info(last_automated_dagrun=None)
Expand All @@ -82,16 +82,16 @@ def test_get_start_of_interval(self):
inputs = [
pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC), # Wednesday
pendulum.datetime(year=2020, month=1, day=31, tz=pendulum.UTC), # Friday
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th
pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday the 5th
pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC), # Tueday
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # Wednesday
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.timezone("Etc/GMT+1")), # Altered timezone
]
expected_outputs = [
pendulum.datetime(year=2019, month=12, day=29, tz=pendulum.UTC), # Sunday
pendulum.datetime(year=2020, month=1, day=26, tz=pendulum.UTC), # Sunday
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday The 5th
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday The 5th
pendulum.datetime(year=2019, month=12, day=30, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=1, day=27, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday The 5th
pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # The 5th
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # The 5th @ UTC
]
Expand All @@ -106,18 +106,18 @@ def test_get_end_of_interval(self):
inputs = [
pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC), # Wednesday
pendulum.datetime(year=2020, month=1, day=31, tz=pendulum.UTC), # Friday
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th
pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday the 5th
pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC), # Tuesday
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # Wednesday
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.timezone("Etc/GMT+1")), # Altered timezone
]
expected_outputs = [
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th
pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC), # Sunday
pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC), # Sunday (1 week after start)
pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC), # Sunday
pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC), # Sunday
pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC), # Sunday @ UTC
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # The 5th
pendulum.datetime(year=2020, month=2, day=3, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=10, day=12, tz=pendulum.UTC), # Monday (1 week after start)
pendulum.datetime(year=2020, month=1, day=13, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC), # Monday @ UTC
]
for i, eo in zip(inputs, expected_outputs):
logging.info(f"Input time: {i}")
Expand All @@ -130,35 +130,35 @@ def test_infer_manual_data_interval(self):
inputs = [
pendulum.datetime(year=2020, month=1, day=1, tz=pendulum.UTC), # Wednesday
pendulum.datetime(year=2020, month=1, day=31, tz=pendulum.UTC), # Friday
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC), # Sunday the 5th
pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC), # Monday
pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC), # Monday the 5th
pendulum.datetime(year=2020, month=1, day=7, tz=pendulum.UTC), # Tuesday
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC), # Wednesday
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.timezone("Etc/GMT+1")), # Altered timezone
]
expected_outputs = [
DataInterval(
pendulum.datetime(year=2019, month=12, day=29, tz=pendulum.UTC),
pendulum.datetime(year=2019, month=12, day=30, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC),
),
DataInterval(
pendulum.datetime(year=2020, month=1, day=26, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=2, day=2, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=1, day=27, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=2, day=3, tz=pendulum.UTC),
),
DataInterval(
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=10, day=5, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=10, day=12, tz=pendulum.UTC),
),
DataInterval(
pendulum.datetime(year=2020, month=1, day=5, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=1, day=12, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=1, day=6, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=1, day=13, tz=pendulum.UTC),
),
DataInterval(
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC),
),
DataInterval(
pendulum.datetime(year=2020, month=2, day=5, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=2, day=9, tz=pendulum.UTC),
pendulum.datetime(year=2020, month=2, day=10, tz=pendulum.UTC),
),
]
for i, eo in zip(inputs, expected_outputs):
Expand Down

0 comments on commit de30d84

Please sign in to comment.