Skip to content

Commit

Permalink
Fix pandas indexing for null return case
Browse files Browse the repository at this point in the history
  • Loading branch information
dfsnow committed Nov 30, 2024
1 parent dfde348 commit 724ec95
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 46 deletions.
31 changes: 18 additions & 13 deletions data/src/calculate_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

with open(DOCKER_INTERNAL_PATH / "params.yaml") as file:
params = yaml.safe_load(file)
with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f:
valhalla_data = json.load(f)
os.environ["AWS_PROFILE"] = params["s3"]["profile"]


Expand Down Expand Up @@ -57,9 +55,9 @@ def main() -> None:
)
logger.info(
"Routing from %s origins to %s destinations (%s pairs)",
f"{len(inputs.origins_chunk):,}",
f"{len(inputs.origins):,}",
f"{inputs.n_destinations:,}",
f"{len(inputs.origins_chunk) * inputs.n_destinations:,}",
f"{len(inputs.origins) * inputs.n_destinations:,}",
)

# Initialize the default Valhalla actor bindings
Expand All @@ -68,8 +66,8 @@ def main() -> None:
# Use the Vahalla Locate API to append coordinates that are snapped to OSM
if config.params["times"]["use_snapped"]:
logger.info("Snapping coordinates to OSM network")
inputs.origins_chunk = snap_df_to_osm(
inputs.origins_chunk, config.args.mode, actor
inputs.origins = snap_df_to_osm(
inputs.origins, config.args.mode, actor
)
inputs.destinations = snap_df_to_osm(
inputs.destinations, config.args.mode, actor
Expand All @@ -85,7 +83,7 @@ def main() -> None:
)
logger.info(
"Routed from %s origins to %s destinations",
f"{inputs.n_origins_chunk:,}",
f"{inputs.n_origins:,}",
f"{inputs.n_destinations:,}",
)

Expand All @@ -104,8 +102,8 @@ def main() -> None:

# Create a new input class, keeping only pairs that were unroutable
inputs_sp = TravelTimeInputs(
origins=inputs.origins_chunk[
inputs.origins_chunk["id"].isin(
origins=inputs.origins[
inputs.origins["id"].isin(
missing_pairs_df.index.get_level_values("origin_id")
)
].reset_index(drop=True),
Expand Down Expand Up @@ -151,7 +149,7 @@ def main() -> None:
)
for loc in out_locations:
config.paths.write_to_parquet(results_df, "times", loc)
config.paths.write_to_parquet(inputs.origins_chunk, "origins", loc)
config.paths.write_to_parquet(inputs.origins, "origins", loc)
config.paths.write_to_parquet(inputs.destinations, "destinations", loc)
config.paths.write_to_parquet(missing_pairs_df, "missing_pairs", loc)

Expand All @@ -171,15 +169,19 @@ def main() -> None:

# Create a metadata dataframe of all settings and data used for creating inputs
# and generating times
with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f:
valhalla_data = json.load(f)
with open(DOCKER_INTERNAL_PATH / "valhalla_sp.json", "r") as f:
valhalla_data_sp = json.load(f)
metadata_df = pd.DataFrame(
{
"run_id": run_id,
"calc_datetime_finished": pd.Timestamp.now(tz="UTC"),
"calc_time_elapsed_sec": time.time() - script_start_time,
"calc_chunk_id": args.chunk,
"calc_chunk_n_origins": inputs.n_origins_chunk,
"calc_chunk_n_destinations": inputs.n_destinations_chunk,
"calc_n_origins": inputs.n_origins,
"calc_chunk_n_origins": inputs.n_origins,
"calc_chunk_n_destinations": inputs.n_destinations,
"calc_n_origins": inputs.n_origins_full,
"calc_n_destinations": inputs.n_destinations,
"git_commit_sha_short": git_commit_sha_short,
"git_commit_sha_long": git_commit_sha,
Expand All @@ -205,6 +207,9 @@ def main() -> None:
"valhalla_config_data": json.dumps(
valhalla_data, separators=(",", ":")
),
"valhalla_config_data_second_pass": json.dumps(
valhalla_data_sp, separators=(",", ":")
),
},
index=[0],
)
Expand Down
59 changes: 26 additions & 33 deletions data/src/utils/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,20 +254,17 @@ def __init__(
) -> None:
self.origins = origins
self.destinations = destinations
self.origins_chunk: pd.DataFrame
self.n_origins_full: int = len(self.origins)

self.chunk = chunk
self.chunk_start_idx: int
self.chunk_end_idx: int
self.chunk_size: int = int(10e7)
self._set_chunk_attributes()
self._set_origins_chunk()
self._subset_origins()

self.n_origins: int = len(self.origins)
self.n_destinations: int = len(self.destinations)
self.n_origins_chunk: int = len(self.origins_chunk)
self.n_destinations_chunk: int = len(self.destinations)

self.max_split_size_origins = min(
max_split_size_origins, self.chunk_size
)
Expand All @@ -283,12 +280,12 @@ def _set_chunk_attributes(self) -> None:
self.chunk_end_idx = int(chunk_end_idx) + 1
self.chunk_size = self.chunk_end_idx - self.chunk_start_idx

def _set_origins_chunk(self) -> None:
def _subset_origins(self) -> None:
"""Sets the origins chunk (if chunk is specified)."""
df = self.origins
if self.chunk:
df = df.iloc[self.chunk_start_idx : self.chunk_end_idx]
self.origins_chunk = df
self.origins = self.origins.iloc[
self.chunk_start_idx : self.chunk_end_idx
]


class TravelTimeConfig:
Expand Down Expand Up @@ -389,7 +386,7 @@ def col_dict(x, snapped=self.config.params["times"]["use_snapped"]):
# Get the subset of origin and destination points and convert them to
# lists, then squash them into the request body
origins_list = (
self.inputs.origins_chunk.iloc[o_start_idx:o_end_idx]
self.inputs.origins.iloc[o_start_idx:o_end_idx]
.apply(col_dict, axis=1)
.tolist()
)
Expand Down Expand Up @@ -463,13 +460,11 @@ def _binary_search(

if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx:
df = pd.merge(
pd.DataFrame(
self.inputs.origins[o_start_idx:o_end_idx],
columns=["origin_id"],
self.inputs.origins["id"][o_start_idx:o_end_idx].rename(
"origin_id"
),
pd.DataFrame(
self.inputs.destinations[d_start_idx:d_end_idx],
columns=["destination_id"],
self.inputs.destinations["id"][d_start_idx:d_end_idx].rename(
"destination_id"
),
how="cross",
)
Expand Down Expand Up @@ -518,9 +513,9 @@ def get_times(self) -> pd.DataFrame:
"""
results = []
max_spl_o = self.inputs.max_split_size_origins
n_oc = self.inputs.n_origins_chunk
n_oc = self.inputs.n_origins
m_spl_d = self.inputs.max_split_size_destinations
n_dc = self.inputs.n_destinations_chunk
n_dc = self.inputs.n_destinations

for o in range(0, n_oc, max_spl_o):
for d in range(0, n_dc, m_spl_d):
Expand Down Expand Up @@ -582,29 +577,27 @@ def snap_df_to_osm(

# Use the first element of nodes to populate the snapped lat/lon, otherwise
# fallback to the correlated lat/lon from edges
def get_col(x: dict, col: str):
return (
x["nodes"][0][col]
if x["nodes"]
else (x["edges"][0][f"correlated_{col}"] if x["edges"] else None)
)

response_df = pd.DataFrame(
[
{
"lon_snapped": item["nodes"][0]["lon"]
if item["nodes"]
else (
item["edges"][0]["correlated_lon"]
if item["edges"]
else None
),
"lat_snapped": item["nodes"][0]["lat"]
if item["nodes"]
else (
item["edges"][0]["correlated_lat"]
if item["edges"]
else None
),
"lon_snapped": get_col(item, "lon"),
"lat_snapped": get_col(item, "lat"),
}
for item in response_data
]
)

df = pd.concat([df, response_df], axis=1)
df = pd.concat(
[df.reset_index(drop=True), response_df.reset_index(drop=True)],
axis=1,
)
df.fillna({"lon_snapped": df["lon"]}, inplace=True)
df.fillna({"lat_snapped": df["lat"]}, inplace=True)
df["is_snapped"] = df["lon"] != df["lon_snapped"]
Expand Down

0 comments on commit 724ec95

Please sign in to comment.