Skip to content

Commit

Permalink
fix: Reduce duplicated work in speed_restrictions ingestor to save me…
Browse files Browse the repository at this point in the history
…mory (#54)

* fix: Reduce duplicated work in speed_restrictions ingestor to save memory

* ok wait change back to months
  • Loading branch information
idreyn authored Sep 1, 2023
1 parent 82e48a1 commit e4f563f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 1 addition & 1 deletion ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def update_ridership(event):
# 7:20am UTC -> 2:20/3:20am ET every day
@app.schedule(Cron(20, 7, "*", "*", "?", "*"))
def update_speed_restrictions(event):
speed_restrictions.update_speed_restrictions()
speed_restrictions.update_speed_restrictions(max_lookback_months=2)


# 7:30am UTC -> 2:30/3:30am ET every day
Expand Down
19 changes: 14 additions & 5 deletions ingestor/chalicelib/speed_restrictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,21 @@ def bucket_entries_by_key(entries: Iterator[SpeedRestrictionEntry]) -> Dict[Entr
return buckets


def load_speed_restriction_entries() -> Iterator[SpeedRestrictionEntry]:
def csv_is_too_old(csv_file_name: str, max_lookback_months: Union[None, int]) -> bool:
if not max_lookback_months:
return False
csv_date = datetime.strptime(csv_file_name[:7], "%Y-%m").date()
print(csv_date)
return (date.today() - csv_date).days > (1 + max_lookback_months) * 30


def load_speed_restriction_entries(max_lookback_days: Union[None, int]) -> Iterator[SpeedRestrictionEntry]:
req = requests.get(CSV_ZIP_URL)
zip_file = zipfile.ZipFile(BytesIO(req.content))
for csv_file_name in zip_file.namelist():
if not csv_file_name.endswith(".csv"):
if not csv_file_name.endswith(".csv") or csv_is_too_old(csv_file_name, max_lookback_days):
continue
print(csv_file_name)
csv_file = zip_file.open(csv_file_name)
rows = csv.DictReader(TextIOWrapper(csv_file), delimiter=",")
for row in rows:
Expand All @@ -98,8 +107,8 @@ def load_speed_restriction_entries() -> Iterator[SpeedRestrictionEntry]:
yield entry


def update_speed_restrictions():
entries = load_speed_restriction_entries()
def update_speed_restrictions(max_lookback_months: Union[None, int]):
entries = load_speed_restriction_entries(max_lookback_months)
buckets = bucket_entries_by_key(entries)
dynamodb = boto3.resource("dynamodb")
SpeedRestrictions = dynamodb.Table("SpeedRestrictions")
Expand All @@ -116,4 +125,4 @@ def update_speed_restrictions():


if __name__ == "__main__":
update_speed_restrictions()
update_speed_restrictions(max_lookback_months=None)

0 comments on commit e4f563f

Please sign in to comment.