Skip to content

Commit

Permalink
Merge pull request #965 from cal-itp/explore-road-interp-results
Browse files Browse the repository at this point in the history
switch back to vp-sjoined-to-roads, aggregate shapes up to route-direction with stop_pairs
  • Loading branch information
tiffanychu90 authored Dec 13, 2023
2 parents c247ae5 + 949de26 commit 60d020b
Show file tree
Hide file tree
Showing 16 changed files with 670 additions and 438 deletions.
2 changes: 1 addition & 1 deletion gtfs_funnel/route_typologies.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def pop_density_by_shape(shape_df: gpd.GeoDataFrame):
TRIP_EXPORT = CONFIG_DICT["trip_metrics_file"]
ROUTE_DIR_EXPORT = CONFIG_DICT["route_direction_metrics_file"]

for date in analysis_date_list[:1]:
for date in analysis_date_list:
trip_metrics = assemble_scheduled_trip_metrics(date)

trip_metrics.to_parquet(
Expand Down
61 changes: 35 additions & 26 deletions gtfs_funnel/stop_times_with_direction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
Create a schedule stop_times table with direction of travel
between stops.
"""
import dask.dataframe as dd
import dask_geopandas as dg
import datetime
import geopandas as gpd
import numpy as np
import pandas as pd

from dask import delayed, compute
from typing import Literal

from calitp_data_analysis import utils
Expand All @@ -17,7 +16,7 @@
from segment_speed_utils.project_vars import RT_SCHED_GCS, PROJECT_CRS


def prep_scheduled_stop_times(analysis_date: str) -> dg.GeoDataFrame:
def prep_scheduled_stop_times(analysis_date: str) -> gpd.GeoDataFrame:
"""
Attach stop geometry to stop_times data, and also
add in trip_instance_key (from trips table), so
Expand All @@ -33,7 +32,7 @@ def prep_scheduled_stop_times(analysis_date: str) -> dg.GeoDataFrame:
stop_times = helpers.import_scheduled_stop_times(
analysis_date,
columns = ["feed_key", "trip_id", "stop_id", "stop_sequence"]
)
).compute()

trips = helpers.import_scheduled_trips(
analysis_date,
Expand All @@ -44,31 +43,33 @@ def prep_scheduled_stop_times(analysis_date: str) -> dg.GeoDataFrame:
get_pandas = True
)

st_with_trip = dd.merge(
st_with_trip = pd.merge(
stop_times,
trips,
on = ["feed_key", "trip_id"],
how = "inner"
)

st_with_stop = dd.merge(
st_with_stop = pd.merge(
st_with_trip,
stops,
on = ["feed_key", "stop_id"],
how = "inner"
).drop(columns = ["feed_key", "trip_id"])

st_with_stop = dg.from_dask_dataframe(
st_with_stop, geometry = "geometry").set_crs(PROJECT_CRS)
st_with_stop = gpd.GeoDataFrame(
st_with_stop, geometry = "geometry", crs = PROJECT_CRS)

return st_with_stop


def get_projected_stop_meters(
stop_times: dd.DataFrame,
stop_times: pd.DataFrame,
shapes: gpd.GeoDataFrame
) -> dd.DataFrame:
) -> pd.DataFrame:
"""
Project the stop's position to the shape and
get stop_meters (meters from start of the shape).
"""
gdf = pd.merge(
stop_times,
Expand All @@ -92,42 +93,48 @@ def find_prior_stop(
For trip-stop, find the previous stop (using stop sequence).
Attach the previous stop's geometry.
"""
prior_stop = stop_times[trip_stop_cols].compute().sort_values(
prior_stop = stop_times[trip_stop_cols].sort_values(
trip_stop_cols).reset_index(drop=True)

prior_stop = prior_stop.assign(
prior_stop_sequence = (prior_stop.groupby("trip_instance_key")
.stop_sequence.shift(1)),
subseq_stop_sequence = (prior_stop.groupby("trip_instance_key")
.stop_sequence.shift(-1)),
)


prior_stop_geom = (stop_times[trip_stop_cols + ["geometry"]]
.rename(columns = {
"stop_sequence": "prior_stop_sequence",
"stop_id": "prior_stop_id",
"geometry": "prior_geometry"
})
.set_geometry("prior_geometry")
.repartition(npartitions=1)
)

stop_times_with_prior = dd.merge(
stop_times_with_prior = pd.merge(
stop_times,
prior_stop,
on = trip_stop_cols,
how = "left"
)

stop_times_with_prior_geom = dd.merge(
stop_times_with_prior_geom = pd.merge(
stop_times_with_prior,
prior_stop_geom,
on = ["trip_instance_key", "prior_stop_sequence"],
how = "left"
).astype({
"prior_stop_sequence": "Int64",
"subseq_stop_sequence": "Int64"
})
}).fillna({"prior_stop_id": ""})


stop_times_with_prior_geom = stop_times_with_prior_geom.assign(
stop_pair = stop_times_with_prior_geom.apply(
lambda x:
str(x.prior_stop_id) + "-" + str(x.stop_id),
axis=1,
)
).drop(columns = "prior_stop_id")

return stop_times_with_prior_geom

Expand All @@ -144,10 +151,9 @@ def assemble_stop_times_with_direction(analysis_date: str, dict_inputs: dict):

EXPORT_FILE = dict_inputs["stop_times_direction_file"]

scheduled_stop_times = prep_scheduled_stop_times(analysis_date).repartition(
npartitions=1).persist()
scheduled_stop_times = prep_scheduled_stop_times(analysis_date)

trip_stop_cols = ["trip_instance_key", "stop_sequence"]
trip_stop_cols = ["trip_instance_key", "stop_sequence", "stop_id"]

scheduled_stop_times2 = find_prior_stop(
scheduled_stop_times, trip_stop_cols
Expand All @@ -163,12 +169,12 @@ def assemble_stop_times_with_direction(analysis_date: str, dict_inputs: dict):

first_stop = first_stop.assign(
stop_primary_direction = "Unknown"
).drop(columns = "prior_geometry").compute()
).drop(columns = "prior_geometry")

other_stops_no_geom = other_stops.drop(columns = ["prior_geometry"]).compute()
other_stops_no_geom = other_stops.drop(columns = ["prior_geometry"])

prior_geom = other_stops.prior_geometry.compute()
current_geom = other_stops.geometry.compute()
prior_geom = other_stops.prior_geometry
current_geom = other_stops.geometry

# Create a column with readable direction like westbound, eastbound, etc
stop_direction = np.vectorize(
Expand Down Expand Up @@ -196,7 +202,7 @@ def assemble_stop_times_with_direction(analysis_date: str, dict_inputs: dict):
[first_stop, other_stops_no_geom],
axis=0
)

df = scheduled_stop_times_with_direction.sort_values(
trip_stop_cols).reset_index(drop=True)

Expand All @@ -212,6 +218,9 @@ def assemble_stop_times_with_direction(analysis_date: str, dict_inputs: dict):
end = datetime.datetime.now()
print(f"execution time: {end - start}")

del scheduled_stop_times, scheduled_stop_times2
del other_stops_no_geom, scheduled_stop_times_with_direction, df

return


Expand Down
Loading

0 comments on commit 60d020b

Please sign in to comment.