diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index 8be0c47..3ed75b4 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -22,8 +22,6 @@ with open(DOCKER_INTERNAL_PATH / "params.yaml") as file: params = yaml.safe_load(file) -with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f: - valhalla_data = json.load(f) os.environ["AWS_PROFILE"] = params["s3"]["profile"] @@ -57,9 +55,9 @@ def main() -> None: ) logger.info( "Routing from %s origins to %s destinations (%s pairs)", - f"{len(inputs.origins_chunk):,}", + f"{len(inputs.origins):,}", f"{inputs.n_destinations:,}", - f"{len(inputs.origins_chunk) * inputs.n_destinations:,}", + f"{len(inputs.origins) * inputs.n_destinations:,}", ) # Initialize the default Valhalla actor bindings @@ -68,8 +66,8 @@ def main() -> None: # Use the Vahalla Locate API to append coordinates that are snapped to OSM if config.params["times"]["use_snapped"]: logger.info("Snapping coordinates to OSM network") - inputs.origins_chunk = snap_df_to_osm( - inputs.origins_chunk, config.args.mode, actor + inputs.origins = snap_df_to_osm( + inputs.origins, config.args.mode, actor ) inputs.destinations = snap_df_to_osm( inputs.destinations, config.args.mode, actor @@ -85,7 +83,7 @@ def main() -> None: ) logger.info( "Routed from %s origins to %s destinations", - f"{inputs.n_origins_chunk:,}", + f"{inputs.n_origins:,}", f"{inputs.n_destinations:,}", ) @@ -104,8 +102,8 @@ def main() -> None: # Create a new input class, keeping only pairs that were unroutable inputs_sp = TravelTimeInputs( - origins=inputs.origins_chunk[ - inputs.origins_chunk["id"].isin( + origins=inputs.origins[ + inputs.origins["id"].isin( missing_pairs_df.index.get_level_values("origin_id") ) ].reset_index(drop=True), @@ -151,7 +149,7 @@ def main() -> None: ) for loc in out_locations: config.paths.write_to_parquet(results_df, "times", loc) - config.paths.write_to_parquet(inputs.origins_chunk, "origins", loc) + config.paths.write_to_parquet(inputs.origins, "origins", loc) config.paths.write_to_parquet(inputs.destinations, "destinations", loc) config.paths.write_to_parquet(missing_pairs_df, "missing_pairs", loc) @@ -171,15 +169,19 @@ def main() -> None: # Create a metadata dataframe of all settings and data used for creating inputs # and generating times + with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f: + valhalla_data = json.load(f) + with open(DOCKER_INTERNAL_PATH / "valhalla_sp.json", "r") as f: + valhalla_data_sp = json.load(f) metadata_df = pd.DataFrame( { "run_id": run_id, "calc_datetime_finished": pd.Timestamp.now(tz="UTC"), "calc_time_elapsed_sec": time.time() - script_start_time, "calc_chunk_id": args.chunk, - "calc_chunk_n_origins": inputs.n_origins_chunk, - "calc_chunk_n_destinations": inputs.n_destinations_chunk, - "calc_n_origins": inputs.n_origins, + "calc_chunk_n_origins": inputs.n_origins, + "calc_chunk_n_destinations": inputs.n_destinations, + "calc_n_origins": inputs.n_origins_full, "calc_n_destinations": inputs.n_destinations, "git_commit_sha_short": git_commit_sha_short, "git_commit_sha_long": git_commit_sha, @@ -205,6 +207,9 @@ def main() -> None: "valhalla_config_data": json.dumps( valhalla_data, separators=(",", ":") ), + "valhalla_config_data_second_pass": json.dumps( + valhalla_data_sp, separators=(",", ":") + ), }, index=[0], ) diff --git a/data/src/utils/times.py b/data/src/utils/times.py index 6d17fbd..708cbf2 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -254,20 +254,17 @@ def __init__( ) -> None: self.origins = origins self.destinations = destinations - self.origins_chunk: pd.DataFrame + self.n_origins_full: int = len(self.origins) self.chunk = chunk self.chunk_start_idx: int self.chunk_end_idx: int self.chunk_size: int = int(10e7) self._set_chunk_attributes() - self._set_origins_chunk() + self._subset_origins() self.n_origins: int = len(self.origins) self.n_destinations: int = len(self.destinations) - self.n_origins_chunk: int = len(self.origins_chunk) - self.n_destinations_chunk: int = len(self.destinations) - self.max_split_size_origins = min( max_split_size_origins, self.chunk_size ) @@ -283,12 +280,12 @@ def _set_chunk_attributes(self) -> None: self.chunk_end_idx = int(chunk_end_idx) + 1 self.chunk_size = self.chunk_end_idx - self.chunk_start_idx - def _set_origins_chunk(self) -> None: + def _subset_origins(self) -> None: """Sets the origins chunk (if chunk is specified).""" - df = self.origins if self.chunk: - df = df.iloc[self.chunk_start_idx : self.chunk_end_idx] - self.origins_chunk = df + self.origins = self.origins.iloc[ + self.chunk_start_idx : self.chunk_end_idx + ] class TravelTimeConfig: @@ -389,7 +386,7 @@ def col_dict(x, snapped=self.config.params["times"]["use_snapped"]): # Get the subset of origin and destination points and convert them to # lists, then squash them into the request body origins_list = ( - self.inputs.origins_chunk.iloc[o_start_idx:o_end_idx] + self.inputs.origins.iloc[o_start_idx:o_end_idx] .apply(col_dict, axis=1) .tolist() ) @@ -463,13 +460,11 @@ def _binary_search( if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: df = pd.merge( - pd.DataFrame( - self.inputs.origins[o_start_idx:o_end_idx], - columns=["origin_id"], + self.inputs.origins["id"][o_start_idx:o_end_idx].rename( + "origin_id" ), - pd.DataFrame( - self.inputs.destinations[d_start_idx:d_end_idx], - columns=["destination_id"], + self.inputs.destinations["id"][d_start_idx:d_end_idx].rename( + "destination_id" ), how="cross", ) @@ -518,9 +513,9 @@ def get_times(self) -> pd.DataFrame: """ results = [] max_spl_o = self.inputs.max_split_size_origins - n_oc = self.inputs.n_origins_chunk + n_oc = self.inputs.n_origins m_spl_d = self.inputs.max_split_size_destinations - n_dc = self.inputs.n_destinations_chunk + n_dc = self.inputs.n_destinations for o in range(0, n_oc, max_spl_o): for d in range(0, n_dc, m_spl_d): @@ -582,29 +577,27 @@ def snap_df_to_osm( # Use the first element of nodes to populate the snapped lat/lon, otherwise # fallback to the correlated lat/lon from edges + def get_col(x: dict, col: str): + return ( + x["nodes"][0][col] + if x["nodes"] + else (x["edges"][0][f"correlated_{col}"] if x["edges"] else None) + ) + response_df = pd.DataFrame( [ { - "lon_snapped": item["nodes"][0]["lon"] - if item["nodes"] - else ( - item["edges"][0]["correlated_lon"] - if item["edges"] - else None - ), - "lat_snapped": item["nodes"][0]["lat"] - if item["nodes"] - else ( - item["edges"][0]["correlated_lat"] - if item["edges"] - else None - ), + "lon_snapped": get_col(item, "lon"), + "lat_snapped": get_col(item, "lat"), } for item in response_data ] ) - df = pd.concat([df, response_df], axis=1) + df = pd.concat( + [df.reset_index(drop=True), response_df.reset_index(drop=True)], + axis=1, + ) df.fillna({"lon_snapped": df["lon"]}, inplace=True) df.fillna({"lat_snapped": df["lat"]}, inplace=True) df["is_snapped"] = df["lon"] != df["lon_snapped"]