Skip to content

Commit

Permalink
Fix formatting and update CI
Browse files Browse the repository at this point in the history
  • Loading branch information
devinmatte committed Aug 1, 2023
1 parent 7b78481 commit 0ee0253
Show file tree
Hide file tree
Showing 11 changed files with 1,030 additions and 966 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10']
python-version: ["3.10"]
steps:
- name: Checkout repo
uses: actions/checkout@v3
Expand All @@ -28,4 +28,4 @@ jobs:
poetry run flake8 ingestor
- name: Check code format with Black
run: |
poetry run black ingestor
poetry run black --check ingestor
80 changes: 53 additions & 27 deletions ingestor/chalicelib/agg_speed_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dynamodb_json import json_util as ddb_json
import pandas as pd
import concurrent.futures

dynamodb = boto3.resource("dynamodb")


Expand All @@ -37,12 +38,14 @@ def populate_table(line: Line, range: Range):
print(f"Populating {range} table")
table = constants.TABLE_MAP[range]
today = datetime.now().strftime(constants.DATE_FORMAT_BACKEND)
trips = actual_trips_by_line({
"start_date": "2016-01-01",
"end_date": today,
"line": line,
"agg": range,
})
trips = actual_trips_by_line(
{
"start_date": "2016-01-01",
"end_date": today,
"line": line,
"agg": range,
}
)
dynamo.dynamo_batch_write(json.loads(json.dumps(trips), parse_float=Decimal), table["table_name"])
print("Done")

Expand All @@ -54,12 +57,14 @@ def update_tables(range: Range):
for line in constants.LINES:
print(f"Updating {line} for {range}")
start = table["update_start"]
trips = actual_trips_by_line({
"start_date": datetime.strftime(start, constants.DATE_FORMAT_BACKEND),
"end_date": datetime.strftime(yesterday, constants.DATE_FORMAT_BACKEND),
"line": line,
"agg": range,
})
trips = actual_trips_by_line(
{
"start_date": datetime.strftime(start, constants.DATE_FORMAT_BACKEND),
"end_date": datetime.strftime(yesterday, constants.DATE_FORMAT_BACKEND),
"line": line,
"agg": range,
}
)
dynamo.dynamo_batch_write(json.loads(json.dumps(trips), parse_float=Decimal), table["table_name"])
print("Done")

Expand All @@ -73,7 +78,10 @@ def query_daily_trips_on_route(table_name: str, route: str, start_date: str, end
def query_daily_trips_on_line(table_name: str, line: Line, start_date: str, end_date: str):
route_keys = constants.LINE_TO_ROUTE_MAP[line]
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(query_daily_trips_on_route, table_name, route_key, start_date, end_date) for route_key in route_keys]
futures = [
executor.submit(query_daily_trips_on_route, table_name, route_key, start_date, end_date)
for route_key in route_keys
]
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
Expand All @@ -99,47 +107,65 @@ def aggregate_actual_trips(actual_trips, agg: Range, start_date: str):
flat_data = [entry for sublist in actual_trips for entry in sublist]
df = pd.DataFrame(flat_data)
df_grouped = group_data_by_date_and_branch(df)
if agg == 'weekly':
if agg == "weekly":
return group_weekly_data(df_grouped, start_date)
if agg == 'monthly':
if agg == "monthly":
return group_monthly_data(df_grouped, start_date)
return_data = df_grouped.reset_index()
return return_data.to_dict(orient='records')
return return_data.to_dict(orient="records")


def group_monthly_data(df: pd.DataFrame, start_date: str):
df_monthly = df.resample('M').agg({'miles_covered': np.sum, 'count': np.nanmedian, 'total_time': np.sum, 'line': 'min'})
df_monthly = df.resample("M").agg(
{"miles_covered": np.sum, "count": np.nanmedian, "total_time": np.sum, "line": "min"}
)
df_monthly = df_monthly.fillna(0)
df_monthly.index = [datetime(x.year, x.month, 1) for x in df_monthly.index.tolist()]
# Drop the first month if it is incomplete
if datetime.fromisoformat(start_date).day != 1:
df_monthly = df_monthly.tail(-1)
df_monthly['date'] = df_monthly.index.strftime('%Y-%m-%d')
return df_monthly.to_dict(orient='records')
df_monthly["date"] = df_monthly.index.strftime("%Y-%m-%d")
return df_monthly.to_dict(orient="records")


def group_weekly_data(df: pd.DataFrame, start_date: str):
# Group from Monday - Sunday
df_weekly = df.resample('W-SUN').agg({'miles_covered': np.sum, 'count': np.nanmedian, 'total_time': np.sum, 'line': 'min'})
df_weekly = df.resample("W-SUN").agg(
{"miles_covered": np.sum, "count": np.nanmedian, "total_time": np.sum, "line": "min"}
)
df_weekly = df_weekly.fillna(0)
# Pandas resample uses the end date of the range as the index. So we subtract 6 days to convert to first date of the range.
df_weekly.index = df_weekly.index - pd.Timedelta(days=6)
# Drop the first week if it is incomplete
if datetime.fromisoformat(start_date).weekday() != 0:
df_weekly = df_weekly.tail(-1)
# Convert date back to string.
df_weekly['date'] = df_weekly.index.strftime('%Y-%m-%d')
return df_weekly.to_dict(orient='records')
df_weekly["date"] = df_weekly.index.strftime("%Y-%m-%d")
return df_weekly.to_dict(orient="records")


def group_data_by_date_and_branch(df: pd.DataFrame):
""" Convert data from objects with specific route/date/direction to data by date. """
"""Convert data from objects with specific route/date/direction to data by date."""
# Set values for date to NaN when any entry for a different branch/direction has miles_covered as nan.
df.loc[df.groupby('date')['miles_covered'].transform(lambda x: (np.isnan(x)).any()), ['count', 'total_time', 'miles_covered']] = np.nan
df.loc[
df.groupby("date")["miles_covered"].transform(lambda x: (np.isnan(x)).any()),
["count", "total_time", "miles_covered"],
] = np.nan
# Aggregate valuues.
df_grouped = df.groupby('date').agg({'miles_covered': lambda x: np.nan if all(np.isnan(i) for i in x) else np.nansum(x), 'total_time': lambda x: np.nan if all(np.isnan(i) for i in x) else np.nansum(x), 'count': lambda x: np.nan if all(np.isnan(i) for i in x) else np.nansum(x), 'line': 'first'}).reset_index()
df_grouped = (
df.groupby("date")
.agg(
{
"miles_covered": lambda x: np.nan if all(np.isnan(i) for i in x) else np.nansum(x),
"total_time": lambda x: np.nan if all(np.isnan(i) for i in x) else np.nansum(x),
"count": lambda x: np.nan if all(np.isnan(i) for i in x) else np.nansum(x),
"line": "first",
}
)
.reset_index()
)
# use datetime for index rather than string.
df_grouped.set_index(pd.to_datetime(df_grouped['date']), inplace=True)
df_grouped.set_index(pd.to_datetime(df_grouped["date"]), inplace=True)
# Remove date column (it is the index.)
df_grouped.drop('date', axis=1, inplace=True)
df_grouped.drop("date", axis=1, inplace=True)
return df_grouped
137 changes: 93 additions & 44 deletions ingestor/chalicelib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,153 @@
from decimal import Decimal
from chalicelib import stations

ALL_ROUTES = [["line-red", "a"], ["line-red", "b"], ["line-orange", None], ["line-blue", None], ["line-green", "b"], ["line-green", "c"], ["line-green", "d"], ["line-green", "e"]]
ALL_ROUTES = [
["line-red", "a"],
["line-red", "b"],
["line-orange", None],
["line-blue", None],
["line-green", "b"],
["line-green", "c"],
["line-green", "d"],
["line-green", "e"],
]
STATIONS = stations.STATIONS

TERMINI_NEW = {
"line-red": {
"a": {
"line": "line-red",
"route": "a",
"stops": [[STATIONS["SHAWMUT"]["NB"], STATIONS["DAVIS"]["NB"]], [STATIONS["DAVIS"]["SB"], STATIONS["SHAWMUT"]["SB"]]],
"stops": [
[STATIONS["SHAWMUT"]["NB"], STATIONS["DAVIS"]["NB"]],
[STATIONS["DAVIS"]["SB"], STATIONS["SHAWMUT"]["SB"]],
],
"length": Decimal("20.26"),
},
"b": {
"line": "line-red",
"route": "b",
"stops": [[STATIONS["QUINCY_ADAMS"]["NB"], STATIONS["DAVIS"]["NB"]], [STATIONS["DAVIS"]["SB"], STATIONS["QUINCY_ADAMS"]["SB"]]],
"stops": [
[STATIONS["QUINCY_ADAMS"]["NB"], STATIONS["DAVIS"]["NB"]],
[STATIONS["DAVIS"]["SB"], STATIONS["QUINCY_ADAMS"]["SB"]],
],
"length": Decimal("29.64"),
}
},
},
"line-orange": {
"line": "line-orange",
"route": None,
"stops": [[STATIONS["GREEN_STREET"]["NB"], STATIONS["MALDEN_CENTER"]["NB"]], [STATIONS["MALDEN_CENTER"]["SB"], STATIONS["GREEN_STREET"]["SB"]]],
"stops": [
[STATIONS["GREEN_STREET"]["NB"], STATIONS["MALDEN_CENTER"]["NB"]],
[STATIONS["MALDEN_CENTER"]["SB"], STATIONS["GREEN_STREET"]["SB"]],
],
"length": Decimal("19.22"),
},
"line-blue": {
"line": "line-blue",
"route": None,
"stops": [[STATIONS["GOV_CENTER_BLUE"]["NB"], STATIONS["REVERE_BEACH"]["NB"]], [STATIONS["REVERE_BEACH"]["SB"], STATIONS["GOV_CENTER_BLUE"]["SB"]]],
"stops": [
[STATIONS["GOV_CENTER_BLUE"]["NB"], STATIONS["REVERE_BEACH"]["NB"]],
[STATIONS["REVERE_BEACH"]["SB"], STATIONS["GOV_CENTER_BLUE"]["SB"]],
],
"length": Decimal("10.75"),
},
"line-green-post-glx":
{
"b": {
"line": "line-green",
"route": "b",
"stops": [[STATIONS["SOUTH_ST"]["NB"], STATIONS["BOYLSTON"]["NB"]], [STATIONS["BOYLSTON"]["SB"], STATIONS["SOUTH_ST"]["SB"]]],
"length": Decimal("5.39") * 2,
},
"c": {
"line": "line-green",
"route": "c",
"stops": [[STATIONS["ENGLEWOOD"]["NB"], STATIONS["GOV_CENTER_GREEN"]["NB"]], [STATIONS["GOV_CENTER_GREEN"]["SB"], STATIONS["ENGLEWOOD"]["SB"]]],
"length": Decimal("4.91") * 2,
},
"d": {
"line": "line-green",
"route": "d",
"stops": [[STATIONS["WOODLAND"]["NB"], STATIONS["LECHMERE"]["NB"]], [STATIONS["LECHMERE"]["SB"], STATIONS["WOODLAND"]["SB"]]],
"length": Decimal("12.81") * 2,
},
"e": {
"line": "line-green",
"route": "e",
"stops": [[STATIONS["BACK_OF_THE_HILL"]["NB"], STATIONS["BALL_SQ"]["NB"]], [STATIONS["BALL_SQ"]["SB"], STATIONS["BACK_OF_THE_HILL"]["SB"]]],
"length": Decimal("7.88") * 2,
}
"line-green-post-glx": {
"b": {
"line": "line-green",
"route": "b",
"stops": [
[STATIONS["SOUTH_ST"]["NB"], STATIONS["BOYLSTON"]["NB"]],
[STATIONS["BOYLSTON"]["SB"], STATIONS["SOUTH_ST"]["SB"]],
],
"length": Decimal("5.39") * 2,
},
"c": {
"line": "line-green",
"route": "c",
"stops": [
[STATIONS["ENGLEWOOD"]["NB"], STATIONS["GOV_CENTER_GREEN"]["NB"]],
[STATIONS["GOV_CENTER_GREEN"]["SB"], STATIONS["ENGLEWOOD"]["SB"]],
],
"length": Decimal("4.91") * 2,
},
"d": {
"line": "line-green",
"route": "d",
"stops": [
[STATIONS["WOODLAND"]["NB"], STATIONS["LECHMERE"]["NB"]],
[STATIONS["LECHMERE"]["SB"], STATIONS["WOODLAND"]["SB"]],
],
"length": Decimal("12.81") * 2,
},
"e": {
"line": "line-green",
"route": "e",
"stops": [
[STATIONS["BACK_OF_THE_HILL"]["NB"], STATIONS["BALL_SQ"]["NB"]],
[STATIONS["BALL_SQ"]["SB"], STATIONS["BACK_OF_THE_HILL"]["SB"]],
],
"length": Decimal("7.88") * 2,
},
},
"line-green-pre-glx": {
"line-green-pre-glx": {
"b": {
"line": "line-green",
"route": "b",
"stops": [[STATIONS["SOUTH_ST"]["NB"], STATIONS["BOYLSTON"]["NB"]], [STATIONS["BOYLSTON"]["SB"], STATIONS["SOUTH_ST"]["SB"]]],
"stops": [
[STATIONS["SOUTH_ST"]["NB"], STATIONS["BOYLSTON"]["NB"]],
[STATIONS["BOYLSTON"]["SB"], STATIONS["SOUTH_ST"]["SB"]],
],
"length": Decimal("5.39") * 2,
},
"c": {
"line": "line-green",
"route": "c",
"stops": [[STATIONS["ENGLEWOOD"]["NB"], STATIONS["GOV_CENTER_GREEN"]["NB"]], [STATIONS["GOV_CENTER_GREEN"]["SB"], STATIONS["ENGLEWOOD"]["SB"]]],
"stops": [
[STATIONS["ENGLEWOOD"]["NB"], STATIONS["GOV_CENTER_GREEN"]["NB"]],
[STATIONS["GOV_CENTER_GREEN"]["SB"], STATIONS["ENGLEWOOD"]["SB"]],
],
"length": Decimal("4.91") * 2,
},
"d": {
"line": "line-green",
"route": "d",
"stops": [[STATIONS["WOODLAND"]["NB"], STATIONS["PARK_ST"]["NB"]], [STATIONS["PARK_ST"]["SB"], STATIONS["WOODLAND"]["SB"]]],
"stops": [
[STATIONS["WOODLAND"]["NB"], STATIONS["PARK_ST"]["NB"]],
[STATIONS["PARK_ST"]["SB"], STATIONS["WOODLAND"]["SB"]],
],
"length": Decimal("11.06") * 2,
},
"e": {
"line": "line-green",
"route": "e",
"stops": [[STATIONS["BACK_OF_THE_HILL"]["NB"], STATIONS["GOV_CENTER_GREEN"]["NB"]], [STATIONS["GOV_CENTER_GREEN"]["SB"], STATIONS["BACK_OF_THE_HILL"]["SB"]]],
"stops": [
[STATIONS["BACK_OF_THE_HILL"]["NB"], STATIONS["GOV_CENTER_GREEN"]["NB"]],
[STATIONS["GOV_CENTER_GREEN"]["SB"], STATIONS["BACK_OF_THE_HILL"]["SB"]],
],
"length": Decimal("3.73") * 2,
}
}
},
},
}


def get_route_metadata(line, date, route=None):
if line == 'line-green':
if line == "line-green":
if date < GLX_EXTENSION_DATE:
return TERMINI_NEW[f'{line}-pre-glx'][route]
return TERMINI_NEW[f'{line}-post-glx'][route]
return TERMINI_NEW[f"{line}-pre-glx"][route]
return TERMINI_NEW[f"{line}-post-glx"][route]
if route:
return TERMINI_NEW[line][route]
return TERMINI_NEW[line]


LINES = ["line-red", "line-orange", "line-blue", "line-green"]
RIDERSHIP_KEYS = {"line-red": "line-Red", "line-orange": "line-Orange", "line-blue": "line-Blue", "line-green": "line-Green"}
RIDERSHIP_KEYS = {
"line-red": "line-Red",
"line-orange": "line-Orange",
"line-blue": "line-Blue",
"line-green": "line-Green",
}
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
DATE_FORMAT_BACKEND = "%Y-%m-%d"
GLX_EXTENSION_DATE = datetime.strptime("2023-03-19", DATE_FORMAT_BACKEND)
Expand Down Expand Up @@ -147,5 +196,5 @@ def get_weekly_table_update_start():
"line-red": ["line-red-a", "line-red-b"],
"line-green": ["line-green-b", "line-green-c", "line-green-d", "line-green-e"],
"line-blue": ["line-blue"],
"line-orange": ["line-orange"]
"line-orange": ["line-orange"],
}
Loading

0 comments on commit 0ee0253

Please sign in to comment.