From cc2d15242682ac873b0790838780b5575cf85d2b Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Fri, 29 Nov 2024 17:13:04 -0600 Subject: [PATCH 1/6] Refactor times calc to OOP --- data/src/calculate_times.py | 465 ++++------------------------------- data/src/utils/times.py | 475 ++++++++++++++++++++++++++++++++++++ 2 files changed, 529 insertions(+), 411 deletions(-) create mode 100644 data/src/utils/times.py diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index d6c992e..a375db5 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -1,7 +1,6 @@ import argparse import json import os -import re import time import uuid from pathlib import Path @@ -10,177 +9,20 @@ import valhalla # type: ignore import yaml from utils.constants import DOCKER_INTERNAL_PATH +from utils.logging import create_logger +from utils.times import TravelTimeCalculator, TravelTimeConfig from utils.utils import format_time, get_md5_hash +logger = create_logger(__name__) + with open(DOCKER_INTERNAL_PATH / "params.yaml") as file: params = yaml.safe_load(file) -os.environ["AWS_PROFILE"] = params["s3"]["profile"] with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f: valhalla_data = json.load(f) +os.environ["AWS_PROFILE"] = params["s3"]["profile"] -def calculate_times( - actor, - o_start_idx: int, - d_start_idx: int, - o_end_idx: int, - d_end_idx: int, - origins: pd.DataFrame, - destinations: pd.DataFrame, - max_split_size_origins: int, - max_split_size_destinations: int, - mode: str, -) -> pd.DataFrame: - """Calculates travel times and distances between origins and destinations. - - Args: - actor: Valhalla actor instance for making matrix API requests. - o_start_idx: Starting index for the origins DataFrame. - d_start_idx: Starting index for the destinations DataFrame. - origins: DataFrame containing origin points with 'lat' and 'lon' columns. - destinations: DataFrame containing destination points with 'lat' and 'lon' columns. - max_split_size: Maximum number of points to process in one iteration. - mode: Travel mode for the Valhalla API (e.g., 'auto', 'bicycle'). - - Returns: - DataFrame containing origin IDs, destination IDs, travel durations, and distances. - """ - start_time = time.time() - job_string = ( - f"Routing origin indices {o_start_idx}-{o_end_idx - 1} to " - f"destination indices {d_start_idx}-{d_end_idx - 1}" - ) - print(job_string) - - # Get the subset of origin and destination points and convert them to lists - # then squash them into the request body - origins_list = ( - origins.iloc[o_start_idx:o_end_idx] - .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) - .tolist() - ) - destinations_list = ( - destinations.iloc[d_start_idx:d_end_idx] - .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) - .tolist() - ) - request_json = json.dumps( - { - "sources": origins_list, - "targets": destinations_list, - "costing": mode, - "verbose": False, - } - ) - - # Make the actual request to the matrix API - response = actor.matrix(request_json) - response_data = json.loads(response) - - # Parse the response data and convert it to a dataframe. Recover the - # origin and destination indices and append them to the dataframe - durations = response_data["sources_to_targets"]["durations"] - distances = response_data["sources_to_targets"]["distances"] - origin_ids = ( - origins.iloc[o_start_idx:o_end_idx]["id"] - .repeat(d_end_idx - d_start_idx) - .tolist() - ) - destination_ids = destinations.iloc[d_start_idx:d_end_idx][ - "id" - ].tolist() * (o_end_idx - o_start_idx) - - df = pd.DataFrame( - { - "origin_id": origin_ids, - "destination_id": destination_ids, - "duration_sec": [i for sl in durations for i in sl], - "distance_km": [i for sl in distances for i in sl], - } - ) - - elapsed_time = time.time() - start_time - print(job_string, f": {format_time(elapsed_time)}") - return df - - -def calculate_times_with_backoff( - actor, - origins, - destinations, - max_split_size_origins, - max_split_size_destinations, - mode, -): - results = [] - n_origins_chunk = len(origins) - n_destinations_chunk = len(destinations) - - def binary_search_times(o_start_idx, d_start_idx, o_end_idx, d_end_idx): - if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: - df = pd.merge( - pd.DataFrame( - origins[o_start_idx:o_end_idx], columns=["origin_id"] - ), - pd.DataFrame( - destinations[d_start_idx:d_end_idx], - columns=["destination_id"], - ), - how="cross", - ) - df["distance_km"] = pd.Series([], dtype=float) - df["duration_sec"] = pd.Series([], dtype=float) - - return [df] - try: - times = calculate_times( - actor=actor, - o_start_idx=o_start_idx, - d_start_idx=d_start_idx, - o_end_idx=o_end_idx, - d_end_idx=d_end_idx, - origins=origins, - destinations=destinations, - max_split_size_origins=max_split_size_origins, - max_split_size_destinations=max_split_size_destinations, - mode=mode, - ) - return [times] - except Exception as e: - print(f"Error: {e}, backing off and retrying...") - mid_o = (o_start_idx + o_end_idx) // 2 - mid_d = (d_start_idx + d_end_idx) // 2 - return ( - binary_search_times(o_start_idx, d_start_idx, mid_o, mid_d) - + binary_search_times(mid_o, d_start_idx, o_end_idx, mid_d) - + binary_search_times(o_start_idx, mid_d, mid_o, d_end_idx) - + binary_search_times(mid_o, mid_d, o_end_idx, d_end_idx) - ) - - for o in range(0, n_origins_chunk, max_split_size_origins): - for d in range(0, n_destinations_chunk, max_split_size_destinations): - results.extend( - binary_search_times( - o, - d, - min(o + max_split_size_origins, n_origins_chunk), - min(d + max_split_size_destinations, n_destinations_chunk), - ) - ) - - return results - - -def create_write_path(key: str, out_type: str, output_dict: dict) -> str: - """Tiny helper to create Parquet output write paths.""" - return ( - "s3://" + output_dict[out_type][key].as_posix() - if out_type == "s3" - else output_dict[out_type][key].as_posix() - ) - - -if __name__ == "__main__": +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--mode", required=True, type=str) parser.add_argument("--year", required=True, type=str) @@ -192,187 +34,37 @@ def create_write_path(key: str, out_type: str, output_dict: dict) -> str: args = parser.parse_args() script_start_time = time.time() - if args.mode not in params["times"]["mode"]: - raise ValueError( - "Invalid mode, must be one of: ", params["times"]["mode"] - ) - if args.centroid_type not in ["weighted", "unweighted"]: - raise ValueError( - "Invalid centroid_type, must be one of: ['weighted', 'unweighted']" - ) - if args.chunk: - if not re.match(r"^\d+-\d+$", args.chunk): - raise ValueError( - "Invalid chunk argument. Must be two numbers separated by a dash (e.g., '1-2')." - ) - - # Split and check chunk value - chunk_start_idx, chunk_end_idx = map(int, args.chunk.split("-")) - chunk_end_idx = chunk_end_idx + 1 - chunk_size = chunk_end_idx - chunk_start_idx - chunk_msg = f", chunk: {args.chunk}" if args.chunk else "" - print( - f"Starting routing for version: {params['times']['version']},", - f"mode: {args.mode}, year: {args.year}, geography: {args.geography},", - f"state: {args.state}, centroid type: {args.centroid_type}" - + chunk_msg, - ) - - ##### FILE PATHS ##### - - # Setup file paths for inputs (pre-made network file and OD points) - input = {} - input["main"] = { - "path": Path( - f"year={args.year}/geography={args.geography}/" - f"state={args.state}/{args.state}.parquet", - ) - } - input["dirs"] = { - "valhalla_tiles": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/valhalla_tiles/year={args.year}/", - f"geography=state/state={args.state}", - ) - } - input["files"] = { - "valhalla_tiles_file": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/valhalla_tiles/year={args.year}", - f"geography=state/state={args.state}/valhalla_tiles.tar.zst", - ), - "origins_file": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/cenloc/{input['main']['path']}", - ), - "destinations_file": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/destpoint/{input['main']['path']}", - ), - } - - # Setup file paths for all outputs both locally and on the remote - output = {} - output["prefix"] = { - "local": Path(DOCKER_INTERNAL_PATH / "output"), - "s3": Path(params["s3"]["data_bucket"]), - } - output["main"] = { - "path": Path( - f"version={params['times']['version']}/mode={args.mode}/", - f"year={args.year}/geography={args.geography}/state={args.state}/", - f"centroid_type={args.centroid_type}", - ), - "file": Path( - f"part-{args.chunk}.parquet" if args.chunk else "part-0.parquet" - ), - } - output["dirs"] = { - "times": Path("times", output["main"]["path"]), - "origins": Path("points", output["main"]["path"], "point_type=origin"), - "destinations": Path( - "points", output["main"]["path"], "point_type=destination" - ), - "missing_pairs": Path("missing_pairs", output["main"]["path"]), - "metadata": Path("metadata", output["main"]["path"]), - } - for loc in ["local", "s3"]: - output[loc] = { - "times_file": Path( - output["prefix"][loc], - output["dirs"]["times"], - output["main"]["file"], - ), - "origins_file": Path( - output["prefix"][loc], - output["dirs"]["origins"], - output["main"]["file"], - ), - "destinations_file": Path( - output["prefix"][loc], - output["dirs"]["destinations"], - output["main"]["file"], - ), - "missing_pairs_file": Path( - output["prefix"][loc], - output["dirs"]["missing_pairs"], - output["main"]["file"], - ), - "metadata_file": Path( - output["prefix"][loc], - output["dirs"]["metadata"], - output["main"]["file"], - ), - } - - # Make sure outputs have somewhere to write to - for path in output["dirs"].values(): - path = output["prefix"]["local"] / path - path.mkdir(parents=True, exist_ok=True) - - ##### DATA PREP ##### - - # Load origins and destinations - od_cols = { - "weighted": {"geoid": "id", "x_4326_wt": "lon", "y_4326_wt": "lat"}, - "unweighted": {"geoid": "id", "x_4326": "lon", "y_4326": "lat"}, - }[args.centroid_type] - - origins = ( - pd.read_parquet(input["files"]["origins_file"]) - .loc[:, list(od_cols.keys())] - .rename(columns=od_cols) - .sort_values(by="id") - ) - n_origins = len(origins) + config = TravelTimeConfig(args, params=params, logger=logger) + inputs = config.load_default_inputs() - # Subset the origins if a chunk is used - if args.chunk: - origins = origins.iloc[chunk_start_idx:chunk_end_idx] - - destinations = ( - pd.read_parquet(input["files"]["destinations_file"]) - .loc[:, list(od_cols.keys())] - .rename(columns=od_cols) - .sort_values(by="id") + chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else "" + logger.info( + f"Starting routing for version: {config.params['times']['version']},", + f"mode: {config.args.mode}, year: {config.args.year},", + f"geography: {config.args.geography}, state: {config.args.state},", + f"centroid type: {config.args.centroid_type}" + chunk_msg, ) - n_destinations = len(destinations) - n_origins_chunk = len(origins) - n_destinations_chunk = len(destinations) - print( - f"Routing from {len(origins)} origins", - f"to {len(destinations)} destinations", + logger.info( + f"Routing from {inputs.n_origins_chunk} origins", + f"to {inputs.n_destinations} destinations", ) - ##### CALCULATE TIMES ##### - - max_split_size_origins = min(params["times"]["max_split_size"], chunk_size) - max_split_size_destinations = min( - params["times"]["max_split_size"], n_destinations - ) - - # Initialize the Valhalla actor bindings + # Initialize the default Valhalla actor bindings actor = valhalla.Actor((Path.cwd() / "valhalla.json").as_posix()) # Calculate times for each chunk and append to a list - results = calculate_times_with_backoff( - actor=actor, - origins=origins, - destinations=destinations, - max_split_size_origins=max_split_size_origins, - max_split_size_destinations=max_split_size_destinations, - mode=args.mode, - ) + tt_calc = TravelTimeCalculator(actor, config, inputs) + results_df = tt_calc.get_times() - print( + logger.info( "Finished calculating times in", f"{format_time(time.time() - script_start_time)}", ) - - # Concatenate all results into a single DataFrame - results_df = pd.concat(results, ignore_index=True) - del results + logger.info( + f"Routed from {inputs.n_origins} origins", + f"to {inputs.n_destinations} destinations", + ) # Extract missing pairs to a separate dataframe missing_pairs_df = results_df[results_df["duration_sec"].isnull()] @@ -387,90 +79,45 @@ def create_write_path(key: str, out_type: str, output_dict: dict) -> str: by=["origin_id", "destination_id"] ) - ##### SAVE OUTPUTS ##### - - out_types = ["local", "s3"] if args.write_to_s3 else ["local"] - compression_type = params["output"]["compression"]["type"] - compression_level = params["output"]["compression"]["level"] - storage_options = { - "s3": { - "client_kwargs": { - "endpoint_url": params["s3"]["endpoint"], - } - }, - "local": {}, - } - print( - f"Routed from {len(origins)} origins", - f"to {len(destinations)} destinations", - ) - print( + out_locations = ["local", "s3"] if args.write_to_s3 else ["local"] + logger.info( f"Calculated times between {len(results_df)} pairs.", f"Times missing between {len(missing_pairs_df)} pairs.", - f"Saving outputs to: {', '.join(out_types)}", + f"Saving outputs to: {', '.join(out_locations)}", ) - # Loop through files and write to both local and remote paths - for out_type in out_types: - results_df.to_parquet( - create_write_path("times_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - origins.to_parquet( - create_write_path("origins_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - destinations.to_parquet( - create_write_path("destinations_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - missing_pairs_df.to_parquet( - create_write_path("missing_pairs_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - - ##### SAVE METADATA ##### + for loc in out_locations: + config.paths.write_to_parquet(results_df, "times", loc) + config.paths.write_to_parquet(inputs.origins_chunk, "origins", loc) + config.paths.write_to_parquet(inputs.destinations, "destinations", loc) + config.paths.write_to_parquet(missing_pairs_df, "missing_pairs", loc) + # Construct and save a metadata DataFrame run_id = str(uuid.uuid4().hex[:8]) git_commit_sha = str(os.getenv("GITHUB_SHA")) git_commit_sha_short = str(git_commit_sha[:8] if git_commit_sha else None) input_file_hashes = { - f: get_md5_hash(input["files"][f]) for f in input["files"].keys() + f: get_md5_hash(config.paths.input["files"][f]) + for f in config.paths.input["files"].keys() } output_file_hashes = { - f: get_md5_hash(output["local"][f]) - for f in output["local"].keys() + f: get_md5_hash(config.paths.output["local"][f]) + for f in config.paths.output["local"].keys() if f != "metadata_file" } # Create a metadata dataframe of all settings and data used for creating inputs # and generating times - metadata = pd.DataFrame( + metadata_df = pd.DataFrame( { "run_id": run_id, "calc_datetime_finished": pd.Timestamp.now(tz="UTC"), "calc_time_elapsed_sec": time.time() - script_start_time, "calc_chunk_id": args.chunk, - "calc_chunk_n_origins": n_origins_chunk, - "calc_chunk_n_destinations": n_destinations_chunk, - "calc_n_origins": n_origins, - "calc_n_destinations": n_destinations, + "calc_chunk_n_origins": inputs.n_origins_chunk, + "calc_chunk_n_destinations": inputs.n_destinations_chunk, + "calc_n_origins": inputs.n_origins, + "calc_n_destinations": inputs.n_destinations, "git_commit_sha_short": git_commit_sha_short, "git_commit_sha_long": git_commit_sha, "param_network_buffer_m": params["input"]["network_buffer_m"], @@ -498,21 +145,17 @@ def create_write_path(key: str, out_type: str, output_dict: dict) -> str: }, index=[0], ) + for loc in out_locations: + config.paths.write_to_parquet(metadata_df, "metadata", loc) - for out_type in out_types: - metadata.to_parquet( - create_write_path("metadata_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - - print( - f"Finished routing for version: {params['times']['version']},", - f"mode: {args.mode}, year: {args.year}, geography: {args.geography},", - f"state: {args.state}, centroid type: {args.centroid_type}" - + chunk_msg, + logger.info( + f"Finished routing for version: {config.params['times']['version']},", + f"mode: {config.args.mode}, year: {config.args.year},", + f"geography: {config.args.geography}, state: {config.args.state}," + f"centroid type: {config.args.centroid_type}" + chunk_msg, f"in {format_time(time.time() - script_start_time)}", ) + + +if __name__ == "__main__": + main() diff --git a/data/src/utils/times.py b/data/src/utils/times.py new file mode 100644 index 0000000..dd9477e --- /dev/null +++ b/data/src/utils/times.py @@ -0,0 +1,475 @@ +import argparse +import json +import logging +import re +import time +from pathlib import Path +from typing import Literal + +import pandas as pd +import valhalla # type: ignore + +from utils.constants import DOCKER_INTERNAL_PATH +from utils.utils import format_time + + +class S3Path(Path): + """Custom Path class that maintains 's3://' prefix.""" + + def __new__(cls, *args): + return super().__new__(cls, *args) + + def __str__(self): + """Preserve 's3://' prefix when converting to string.""" + return ( + f"s3://{super().__str__()}" + if str(self).startswith("s3/") + else str(super()) + ) + + def __truediv__(self, key): + """Maintain S3Path type when joining paths.""" + return type(self)(super().__truediv__(key)) + + +class TravelTimeArgs: + def __init__(self, args: argparse.Namespace, params: dict) -> None: + self.mode: str + self.year: str + self.geography: str + self.state: str + self.centroid_type: str + self.chunk: str | None + self.write_to_s3: bool + + self._args_to_attr(args) + self._validate_mode(params, self.mode) + self._validate_centroid_type(self.centroid_type) + self._validate_chunk(self.chunk) + + def _args_to_attr(self, args: argparse.Namespace) -> None: + for k, v in vars(args).items(): + setattr(self, k.replace("-", "_"), v) + + def _validate_mode(self, params: dict, mode: str) -> None: + valid_modes = params["times"]["mode"] + if mode not in valid_modes: + raise ValueError(f"Invalid mode, must be one of: {valid_modes}") + + def _validate_centroid_type(self, centroid_type: str) -> None: + valid_centroid_types = ["weighted", "unweighted"] + if centroid_type not in valid_centroid_types: + raise ValueError( + f"Invalid centroid_type, must be one of: {valid_centroid_types}" + ) + + def _validate_chunk(self, chunk: str | None) -> None: + if chunk and not re.match(r"^\d+-\d+$", chunk): + raise ValueError( + "Invalid chunk argument. Must be two numbers" + "separated by a dash (e.g., '1-2')." + ) + + +class TravelTimePaths: + def __init__( + self, + args: TravelTimeArgs, + version: str, + docker_path: Path, + s3_bucket: str, + compression_type: Literal["snappy", "gzip", "brotli", "lz4", "zstd"], + compression_level: int = 3, + endpoint_url: str | None = None, + ) -> None: + self.args: TravelTimeArgs = args + self.version: str = version + self.docker_path: Path = docker_path + self.s3_bucket: S3Path = S3Path(s3_bucket) + self.compression_type: Literal[ + "snappy", "gzip", "brotli", "lz4", "zstd" + ] = compression_type + self.compression_level: int = compression_level + self.endpoint_url: str | None = endpoint_url + self.storage_options = { + "s3": {"client_kwargs": {"endpoint_url": endpoint_url}}, + "local": {}, + } + + self.input: dict[str, dict] = {} + self.output: dict[str, dict] = {} + self._setup_paths() + + @property + def _main_path(self) -> Path: + """Base path for state data.""" + return Path( + f"year={self.args.year}/geography={self.args.geography}/", + f"state={self.args.state}", + ) + + @property + def _output_path(self) -> Path: + """Base path for output data.""" + return Path( + f"version={self.version}/mode={self.args.mode}/" + f"year={self.args.year}/geography={self.args.geography}/" + f"state={self.args.state}/centroid_type={self.args.centroid_type}" + ) + + @property + def _file_name(self) -> str: + """Generates file name based on chunk.""" + return ( + f"part-{self.args.chunk}.parquet" + if self.args.chunk + else "part-0.parquet" + ) + + def _setup_paths(self) -> None: + """Sets up all input and output paths.""" + self.input = self._create_input_paths() + self.output = self._create_output_paths() + self._create_output_directories() + + def _create_input_paths(self) -> dict[str, dict[str, Path]]: + """Creates all input paths.""" + return { + "main": {"path": self._main_path}, + "dirs": { + "valhalla_tiles": Path( + self.docker_path, + f"intermediate/valhalla_tiles/year={self.args.year}", + f"geography=state/state={self.args.state}", + ) + }, + "files": { + "valhalla_tiles_file": Path( + self.docker_path, + f"intermediate/valhalla_tiles/year={self.args.year}", + f"geography=state/state={self.args.state}/valhalla_tiles.tar.zst", + ), + "origins_file": Path( + self.docker_path, + "intermediate/cenloc", + self._main_path, + f"{self.args.state}.parquet", + ), + "destinations_file": Path( + self.docker_path, + "intermediate/destpoint", + self._main_path, + f"{self.args.state}.parquet", + ), + }, + } + + def _create_output_paths(self) -> dict[str, dict[str, Path]]: + """Creates all output paths.""" + output_dirs = { + "times": Path("times", self._output_path), + "origins": Path("points", self._output_path, "point_type=origin"), + "destinations": Path( + "points", self._output_path, "point_type=destination" + ), + "missing_pairs": Path("missing_pairs", self._output_path), + "metadata": Path("metadata", self._output_path), + } + + prefix = { + "local": Path(self.docker_path, "output"), + "s3": self.s3_bucket, + } + + output_files = {} + for loc in ["local", "s3"]: + output_files[loc] = { + f"{key}_file": Path(prefix[loc], path, self._file_name) + for key, path in output_dirs.items() + } + + return {"prefix": prefix, "dirs": output_dirs, **output_files} + + def _create_output_directories(self) -> None: + """Creates local output directories if they don't exist.""" + for path in self.output["dirs"].values(): + full_path = self.output["prefix"]["local"] / path + full_path.mkdir(parents=True, exist_ok=True) + + def get_path( + self, dataset: str, path_type: str = "output", location: str = "local" + ) -> str | Path: + """ + Get a specific path by type and location. + + Args: + dataset: The type of path (e.g., 'times', 'origins', 'metadata') + location: Either 'local' or 's3' + """ + if path_type == "output": + path = self.output[location][f"{dataset}_file"] + else: + path = self.input["files"][f"{dataset}_file"] + return str(path) if location == "s3" else path + + def write_to_parquet( + self, df: pd.DataFrame, dataset: str, location: str = "local" + ) -> None: + df.to_parquet( + self.get_path(dataset, location), + engine="pyarrow", + compression=self.compression_type, + compression_level=self.compression_level, + index=False, + storage_options=self.storage_options[location], + ) + + +class TravelTimeInputs: + def __init__( + self, + origins: pd.DataFrame, + destinations: pd.DataFrame, + chunk: str | None, + max_split_size_origins: int, + max_split_size_destinations: int, + ) -> None: + self.origins = origins + self.destinations = destinations + self.origins_chunk: pd.DataFrame + + self.chunk = chunk + self.chunk_start_idx: int + self.chunk_end_idx: int + self.chunk_size: int + self._set_chunk_attributes() + self._set_origins_chunk() + + self.n_origins: int = len(self.origins) + self.n_destinations: int = len(self.destinations) + self.n_origins_chunk: int = len(self.origins_chunk) + self.n_destinations_chunk: int = len(self.destinations) + + self.max_split_size_origins = min( + max_split_size_origins, self.chunk_size + ) + self.max_split_size_destinations = min( + max_split_size_destinations, self.n_destinations + ) + + def _set_chunk_attributes(self) -> None: + if self.chunk: + self.chunk_start_idx, self.chunk_end_idx = map( + int, self.chunk.split("-") + ) + self.chunk_size = self.chunk_end_idx - self.chunk_start_idx + + def _set_origins_chunk(self) -> None: + df = self.origins + if self.chunk: + df = df.iloc[self.chunk_start_idx : self.chunk_end_idx] + self.origins_chunk = df + + +class TravelTimeConfig: + """Configuration for time calculations with validation.""" + + OD_COLS = { + "weighted": {"geoid": "id", "x_4326_wt": "lon", "y_4326_wt": "lat"}, + "unweighted": {"geoid": "id", "x_4326": "lon", "y_4326": "lat"}, + } + + def __init__( + self, + args: argparse.Namespace, + params: dict, + logger: logging.Logger, + ) -> None: + self.args = TravelTimeArgs(args, params) + self.params = params + self.paths = TravelTimePaths( + args=self.args, + version=self.params["times"]["version"], + docker_path=DOCKER_INTERNAL_PATH, + s3_bucket=self.params["s3"]["data_bucket"], + compression_type=self.params["output"]["compression"]["type"], + compression_level=self.params["output"]["compression"]["level"], + ) + self.logger = logger + + def _load_od_file(self, path: str) -> pd.DataFrame: + df = ( + pd.read_parquet(self.paths.get_path(path, path_type="input")) + .loc[:, list(self.OD_COLS[self.args.centroid_type].keys())] + .rename(columns=self.OD_COLS[self.args.centroid_type]) + .sort_values(by="id") + ) + return df + + def load_default_inputs(self) -> TravelTimeInputs: + origins = self._load_od_file("origins") + destinations = self._load_od_file("destinations") + return TravelTimeInputs( + origins=origins, + destinations=destinations, + chunk=self.args.chunk, + max_split_size_origins=self.params["times"]["max_split_size"], + max_split_size_destinations=self.params["times"]["max_split_size"], + ) + + +class TravelTimeCalculator: + def __init__( + self, + actor: valhalla.Actor, + config: TravelTimeConfig, + inputs: TravelTimeInputs, + ) -> None: + self.actor = actor + self.config = config + self.inputs = inputs + + def calculate_times( + self, + o_start_idx: int, + d_start_idx: int, + o_end_idx: int, + d_end_idx: int, + ) -> pd.DataFrame: + """Calculates travel times and distances between origins and destinations. + + Args: + o_start_idx: Starting index for the origins DataFrame. + d_start_idx: Starting index for the destinations DataFrame. + + Returns: + DataFrame containing origin IDs, destination IDs, travel durations, and distances. + """ + start_time = time.time() + job_string = ( + f"Routing origin indices {o_start_idx}-{o_end_idx - 1} to " + f"destination indices {d_start_idx}-{d_end_idx - 1}" + ) + self.config.logger.info(job_string) + + # Get the subset of origin and destination points and convert them to lists + # then squash them into the request body + origins_list = ( + self.inputs.origins.iloc[o_start_idx:o_end_idx] + .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) + .tolist() + ) + destinations_list = ( + self.inputs.destinations.iloc[d_start_idx:d_end_idx] + .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) + .tolist() + ) + request_json = json.dumps( + { + "sources": origins_list, + "targets": destinations_list, + "costing": self.config.args.mode, + "verbose": False, + } + ) + + # Make the actual request to the matrix API + response = self.actor.matrix(request_json) + response_data = json.loads(response) + + # Parse the response data and convert it to a dataframe. Recover the + # origin and destination indices and append them to the dataframe + durations = response_data["sources_to_targets"]["durations"] + distances = response_data["sources_to_targets"]["distances"] + origin_ids = ( + self.inputs.origins.iloc[o_start_idx:o_end_idx]["id"] + .repeat(d_end_idx - d_start_idx) + .tolist() + ) + destination_ids = self.inputs.destinations.iloc[d_start_idx:d_end_idx][ + "id" + ].tolist() * (o_end_idx - o_start_idx) + + df = pd.DataFrame( + { + "origin_id": origin_ids, + "destination_id": destination_ids, + "duration_sec": [i for sl in durations for i in sl], + "distance_km": [i for sl in distances for i in sl], + } + ) + + elapsed_time = time.time() - start_time + self.config.logger.info(job_string, f": {format_time(elapsed_time)}") + return df + + def get_times(self): + results = [] + + def binary_search_times( + o_start_idx, d_start_idx, o_end_idx, d_end_idx + ): + if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: + df = pd.merge( + pd.DataFrame( + self.inputs.origins[o_start_idx:o_end_idx], + columns=["origin_id"], + ), + pd.DataFrame( + self.inputs.destinations[d_start_idx:d_end_idx], + columns=["destination_id"], + ), + how="cross", + ) + df["distance_km"] = pd.Series([], dtype=float) + df["duration_sec"] = pd.Series([], dtype=float) + + return [df] + try: + times = self.calculate_times( + o_start_idx=o_start_idx, + d_start_idx=d_start_idx, + o_end_idx=o_end_idx, + d_end_idx=d_end_idx, + ) + return [times] + except Exception as e: + self.config.logger.info( + f"Error: {e}, backing off and retrying..." + ) + mid_o = (o_start_idx + o_end_idx) // 2 + mid_d = (d_start_idx + d_end_idx) // 2 + return ( + binary_search_times(o_start_idx, d_start_idx, mid_o, mid_d) + + binary_search_times(mid_o, d_start_idx, o_end_idx, mid_d) + + binary_search_times(o_start_idx, mid_d, mid_o, d_end_idx) + + binary_search_times(mid_o, mid_d, o_end_idx, d_end_idx) + ) + + for o in range( + 0, self.inputs.n_origins_chunk, self.inputs.max_split_size_origins + ): + for d in range( + 0, + self.inputs.n_destinations_chunk, + self.inputs.max_split_size_destinations, + ): + results.extend( + binary_search_times( + o, + d, + min( + o + self.inputs.max_split_size_origins, + self.inputs.n_origins_chunk, + ), + min( + d + self.inputs.max_split_size_destinations, + self.inputs.n_destinations_chunk, + ), + ) + ) + + results_df = pd.concat(results, ignore_index=True) + del results + + return results_df From d6f84058c57b262eaa117fad782570f67f261567 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Fri, 29 Nov 2024 17:40:56 -0600 Subject: [PATCH 2/6] Finalize working times refactor --- data/src/calculate_times.py | 52 ++++++++++------ data/src/utils/times.py | 114 ++++++++++++++++-------------------- 2 files changed, 85 insertions(+), 81 deletions(-) diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index a375db5..49c1109 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -39,15 +39,20 @@ def main() -> None: chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else "" logger.info( - f"Starting routing for version: {config.params['times']['version']},", - f"mode: {config.args.mode}, year: {config.args.year},", - f"geography: {config.args.geography}, state: {config.args.state},", - f"centroid type: {config.args.centroid_type}" + chunk_msg, + "Starting routing for version: %s, mode: %s, year: %s, " + "geography: %s, state: %s, centroid type: %s%s", + config.params["times"]["version"], + config.args.mode, + config.args.year, + config.args.geography, + config.args.state, + config.args.centroid_type, + chunk_msg, ) - logger.info( - f"Routing from {inputs.n_origins_chunk} origins", - f"to {inputs.n_destinations} destinations", + "Routing from %s origins to %s destinations", + len(inputs.origins_chunk), + inputs.n_destinations, ) # Initialize the default Valhalla actor bindings @@ -58,12 +63,13 @@ def main() -> None: results_df = tt_calc.get_times() logger.info( - "Finished calculating times in", - f"{format_time(time.time() - script_start_time)}", + "Finished calculating times in %s", + format_time(time.time() - script_start_time), ) logger.info( - f"Routed from {inputs.n_origins} origins", - f"to {inputs.n_destinations} destinations", + "Routed from %s origins to %s destinations", + inputs.n_origins, + inputs.n_destinations, ) # Extract missing pairs to a separate dataframe @@ -81,10 +87,13 @@ def main() -> None: out_locations = ["local", "s3"] if args.write_to_s3 else ["local"] logger.info( - f"Calculated times between {len(results_df)} pairs.", - f"Times missing between {len(missing_pairs_df)} pairs.", - f"Saving outputs to: {', '.join(out_locations)}", + "Calculated times between %s pairs. Times missing between %s pairs. " + "Saving outputs to: %s", + len(results_df), + len(missing_pairs_df), + ", ".join(out_locations), ) + # Loop through files and write to both local and remote paths for loc in out_locations: config.paths.write_to_parquet(results_df, "times", loc) @@ -149,11 +158,16 @@ def main() -> None: config.paths.write_to_parquet(metadata_df, "metadata", loc) logger.info( - f"Finished routing for version: {config.params['times']['version']},", - f"mode: {config.args.mode}, year: {config.args.year},", - f"geography: {config.args.geography}, state: {config.args.state}," - f"centroid type: {config.args.centroid_type}" + chunk_msg, - f"in {format_time(time.time() - script_start_time)}", + "Finished routing for version: %s, mode: %s, year: %s, " + "geography: %s, state: %s, centroid type: %s%s in %s", + config.params["times"]["version"], + config.args.mode, + config.args.year, + config.args.geography, + config.args.state, + config.args.centroid_type, + chunk_msg, + format_time(time.time() - script_start_time), ) diff --git a/data/src/utils/times.py b/data/src/utils/times.py index dd9477e..de5e870 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -216,7 +216,7 @@ def write_to_parquet( self, df: pd.DataFrame, dataset: str, location: str = "local" ) -> None: df.to_parquet( - self.get_path(dataset, location), + self.get_path(dataset, path_type="output", location=location), engine="pyarrow", compression=self.compression_type, compression_level=self.compression_level, @@ -329,21 +329,23 @@ def __init__( self.config = config self.inputs = inputs - def calculate_times( + def _calculate_times( self, o_start_idx: int, d_start_idx: int, o_end_idx: int, d_end_idx: int, ) -> pd.DataFrame: - """Calculates travel times and distances between origins and destinations. + """ + Calculates travel times and distances between origins and destinations. Args: o_start_idx: Starting index for the origins DataFrame. d_start_idx: Starting index for the destinations DataFrame. Returns: - DataFrame containing origin IDs, destination IDs, travel durations, and distances. + DataFrame containing origin IDs, destination IDs, travel durations, + and distances. """ start_time = time.time() job_string = ( @@ -400,72 +402,60 @@ def calculate_times( ) elapsed_time = time.time() - start_time - self.config.logger.info(job_string, f": {format_time(elapsed_time)}") + self.config.logger.info(f"{job_string}: {format_time(elapsed_time)}") return df + def _binary_search(self, o_start_idx, d_start_idx, o_end_idx, d_end_idx): + if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: + df = pd.merge( + pd.DataFrame( + self.inputs.origins[o_start_idx:o_end_idx], + columns=["origin_id"], + ), + pd.DataFrame( + self.inputs.destinations[d_start_idx:d_end_idx], + columns=["destination_id"], + ), + how="cross", + ) + df["distance_km"] = pd.Series([], dtype=float) + df["duration_sec"] = pd.Series([], dtype=float) + + return [df] + try: + times = self._calculate_times( + o_start_idx=o_start_idx, + d_start_idx=d_start_idx, + o_end_idx=o_end_idx, + d_end_idx=d_end_idx, + ) + return [times] + except Exception as e: + self.config.logger.info(f"Error: {e}, backing off and retrying...") + mid_o = (o_start_idx + o_end_idx) // 2 + mid_d = (d_start_idx + d_end_idx) // 2 + return ( + self._binary_search(o_start_idx, d_start_idx, mid_o, mid_d) + + self._binary_search(mid_o, d_start_idx, o_end_idx, mid_d) + + self._binary_search(o_start_idx, mid_d, mid_o, d_end_idx) + + self._binary_search(mid_o, mid_d, o_end_idx, d_end_idx) + ) + def get_times(self): results = [] + msso = self.inputs.max_split_size_origins + noc = self.inputs.n_origins_chunk + mssd = self.inputs.max_split_size_destinations + ndc = self.inputs.n_destinations_chunk - def binary_search_times( - o_start_idx, d_start_idx, o_end_idx, d_end_idx - ): - if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: - df = pd.merge( - pd.DataFrame( - self.inputs.origins[o_start_idx:o_end_idx], - columns=["origin_id"], - ), - pd.DataFrame( - self.inputs.destinations[d_start_idx:d_end_idx], - columns=["destination_id"], - ), - how="cross", - ) - df["distance_km"] = pd.Series([], dtype=float) - df["duration_sec"] = pd.Series([], dtype=float) - - return [df] - try: - times = self.calculate_times( - o_start_idx=o_start_idx, - d_start_idx=d_start_idx, - o_end_idx=o_end_idx, - d_end_idx=d_end_idx, - ) - return [times] - except Exception as e: - self.config.logger.info( - f"Error: {e}, backing off and retrying..." - ) - mid_o = (o_start_idx + o_end_idx) // 2 - mid_d = (d_start_idx + d_end_idx) // 2 - return ( - binary_search_times(o_start_idx, d_start_idx, mid_o, mid_d) - + binary_search_times(mid_o, d_start_idx, o_end_idx, mid_d) - + binary_search_times(o_start_idx, mid_d, mid_o, d_end_idx) - + binary_search_times(mid_o, mid_d, o_end_idx, d_end_idx) - ) - - for o in range( - 0, self.inputs.n_origins_chunk, self.inputs.max_split_size_origins - ): - for d in range( - 0, - self.inputs.n_destinations_chunk, - self.inputs.max_split_size_destinations, - ): + for o in range(0, noc, msso): + for d in range(0, ndc, mssd): results.extend( - binary_search_times( + self._binary_search( o, d, - min( - o + self.inputs.max_split_size_origins, - self.inputs.n_origins_chunk, - ), - min( - d + self.inputs.max_split_size_destinations, - self.inputs.n_destinations_chunk, - ), + min(o + msso, noc), + min(d + mssd, ndc), ) ) From 1625aa881085afafe253d2dca22b7c5fb7183e0d Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Fri, 29 Nov 2024 17:44:35 -0600 Subject: [PATCH 3/6] Lower chunk size to 400 --- data/params.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/params.yaml b/data/params.yaml index 02c2aac..1da0a05 100644 --- a/data/params.yaml +++ b/data/params.yaml @@ -17,7 +17,7 @@ actions: # The minimum number of origins to include in a job. Higher = fewer jobs # that take longer. Lower = more jobs that finish quicker - min_chunk_size: 500 + min_chunk_size: 400 times: # Travel times output version. Follows SemVer (kind of): From 75b43ae22a61bd09553445c3503f6c3e49257a61 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Fri, 29 Nov 2024 18:09:51 -0600 Subject: [PATCH 4/6] Fix S3 path references --- data/params.yaml | 2 +- data/src/calculate_times.py | 6 +++--- data/src/create_public_site.py | 2 +- data/src/utils/times.py | 32 ++++++++------------------------ 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/data/params.yaml b/data/params.yaml index 1da0a05..be0bb07 100644 --- a/data/params.yaml +++ b/data/params.yaml @@ -7,7 +7,7 @@ s3: data_bucket: 'opentimes-data' public_bucket: 'opentimes-public' public_data_url: 'https://data.opentimes.org' - endpoint: 'https://fcb279b22cfe4c98f903ad8f9e7ccbb2.r2.cloudflarestorage.com' + endpoint_url: 'https://fcb279b22cfe4c98f903ad8f9e7ccbb2.r2.cloudflarestorage.com' account_id: 'fcb279b22cfe4c98f903ad8f9e7ccbb2' # Parameters used to control the chunking of work on GitHub Actions diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index 49c1109..bd80229 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -34,6 +34,7 @@ def main() -> None: args = parser.parse_args() script_start_time = time.time() + # Create a travel times configuration and set of origin/destination inputs config = TravelTimeConfig(args, params=params, logger=logger) inputs = config.load_default_inputs() @@ -72,7 +73,7 @@ def main() -> None: inputs.n_destinations, ) - # Extract missing pairs to a separate dataframe + # Extract missing pairs to a separate DataFrame missing_pairs_df = results_df[results_df["duration_sec"].isnull()] missing_pairs_df = ( pd.DataFrame(missing_pairs_df) @@ -85,6 +86,7 @@ def main() -> None: by=["origin_id", "destination_id"] ) + # Loop through files and write to both local and remote paths out_locations = ["local", "s3"] if args.write_to_s3 else ["local"] logger.info( "Calculated times between %s pairs. Times missing between %s pairs. " @@ -93,8 +95,6 @@ def main() -> None: len(missing_pairs_df), ", ".join(out_locations), ) - - # Loop through files and write to both local and remote paths for loc in out_locations: config.paths.write_to_parquet(results_df, "times", loc) config.paths.write_to_parquet(inputs.origins_chunk, "origins", loc) diff --git a/data/src/create_public_site.py b/data/src/create_public_site.py index b857ea8..cee3314 100644 --- a/data/src/create_public_site.py +++ b/data/src/create_public_site.py @@ -19,7 +19,7 @@ with open("params.yaml") as file: params = yaml.safe_load(file) session = boto3.Session(profile_name=params["s3"]["profile"]) -s3 = session.client("s3", endpoint_url=params["s3"]["endpoint"]) +s3 = session.client("s3", endpoint_url=params["s3"]["endpoint_url"]) # Initialize Jinja2 environment and template jinja_env = Environment(loader=FileSystemLoader("site/templates")) diff --git a/data/src/utils/times.py b/data/src/utils/times.py index de5e870..f23391e 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -4,7 +4,7 @@ import re import time from pathlib import Path -from typing import Literal +from typing import Any, Literal import pandas as pd import valhalla # type: ignore @@ -13,25 +13,6 @@ from utils.utils import format_time -class S3Path(Path): - """Custom Path class that maintains 's3://' prefix.""" - - def __new__(cls, *args): - return super().__new__(cls, *args) - - def __str__(self): - """Preserve 's3://' prefix when converting to string.""" - return ( - f"s3://{super().__str__()}" - if str(self).startswith("s3/") - else str(super()) - ) - - def __truediv__(self, key): - """Maintain S3Path type when joining paths.""" - return type(self)(super().__truediv__(key)) - - class TravelTimeArgs: def __init__(self, args: argparse.Namespace, params: dict) -> None: self.mode: str @@ -85,7 +66,7 @@ def __init__( self.args: TravelTimeArgs = args self.version: str = version self.docker_path: Path = docker_path - self.s3_bucket: S3Path = S3Path(s3_bucket) + self.s3_bucket: str = s3_bucket self.compression_type: Literal[ "snappy", "gzip", "brotli", "lz4", "zstd" ] = compression_type @@ -164,7 +145,7 @@ def _create_input_paths(self) -> dict[str, dict[str, Path]]: }, } - def _create_output_paths(self) -> dict[str, dict[str, Path]]: + def _create_output_paths(self) -> dict[str, dict[str, Any]]: """Creates all output paths.""" output_dirs = { "times": Path("times", self._output_path), @@ -178,13 +159,15 @@ def _create_output_paths(self) -> dict[str, dict[str, Path]]: prefix = { "local": Path(self.docker_path, "output"), - "s3": self.s3_bucket, + "s3": Path(self.s3_bucket), } output_files = {} for loc in ["local", "s3"]: output_files[loc] = { - f"{key}_file": Path(prefix[loc], path, self._file_name) + f"{key}_file": f"s3://{Path(prefix[loc], path, self._file_name)}" + if loc == "s3" + else Path(prefix[loc], path, self._file_name) for key, path in output_dirs.items() } @@ -294,6 +277,7 @@ def __init__( s3_bucket=self.params["s3"]["data_bucket"], compression_type=self.params["output"]["compression"]["type"], compression_level=self.params["output"]["compression"]["level"], + endpoint_url=self.params["s3"]["endpoint_url"], ) self.logger = logger From 98ef45cef14a90998d24070aaee21752b7ec89e7 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Fri, 29 Nov 2024 18:35:12 -0600 Subject: [PATCH 5/6] Finalize times cleanup --- data/src/utils/times.py | 97 ++++++++++++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 20 deletions(-) diff --git a/data/src/utils/times.py b/data/src/utils/times.py index f23391e..664a289 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -14,6 +14,13 @@ class TravelTimeArgs: + """ + Class to hold and validate arguments for travel time calculations. + + Arguments are passed at runtime via the command line and validated + against the parameters file ('params.yaml'). + """ + def __init__(self, args: argparse.Namespace, params: dict) -> None: self.mode: str self.year: str @@ -41,18 +48,26 @@ def _validate_centroid_type(self, centroid_type: str) -> None: valid_centroid_types = ["weighted", "unweighted"] if centroid_type not in valid_centroid_types: raise ValueError( - f"Invalid centroid_type, must be one of: {valid_centroid_types}" + "Invalid centroid_type, must be one " + f"of: {valid_centroid_types}" ) def _validate_chunk(self, chunk: str | None) -> None: if chunk and not re.match(r"^\d+-\d+$", chunk): raise ValueError( - "Invalid chunk argument. Must be two numbers" + "Invalid chunk argument. Must be two numbers " "separated by a dash (e.g., '1-2')." ) class TravelTimePaths: + """ + Class to manage all input and output paths for travel time calculations. + + Paths are generated based on input arguments. Also holds remote (R2) + paths and write settings for Parquet files. + """ + def __init__( self, args: TravelTimeArgs, @@ -83,7 +98,7 @@ def __init__( @property def _main_path(self) -> Path: - """Base path for state data.""" + """Base path for all data.""" return Path( f"year={self.args.year}/geography={self.args.geography}/", f"state={self.args.state}", @@ -93,9 +108,9 @@ def _main_path(self) -> Path: def _output_path(self) -> Path: """Base path for output data.""" return Path( - f"version={self.version}/mode={self.args.mode}/" - f"year={self.args.year}/geography={self.args.geography}/" - f"state={self.args.state}/centroid_type={self.args.centroid_type}" + f"version={self.version}/mode={self.args.mode}/", + self._main_path, + f"centroid_type={self.args.centroid_type}", ) @property @@ -114,7 +129,7 @@ def _setup_paths(self) -> None: self._create_output_directories() def _create_input_paths(self) -> dict[str, dict[str, Path]]: - """Creates all input paths.""" + """Creates all input paths and stores them in a dictionary.""" return { "main": {"path": self._main_path}, "dirs": { @@ -128,7 +143,8 @@ def _create_input_paths(self) -> dict[str, dict[str, Path]]: "valhalla_tiles_file": Path( self.docker_path, f"intermediate/valhalla_tiles/year={self.args.year}", - f"geography=state/state={self.args.state}/valhalla_tiles.tar.zst", + f"geography=state/state={self.args.state}/", + "valhalla_tiles.tar.zst", ), "origins_file": Path( self.docker_path, @@ -146,7 +162,7 @@ def _create_input_paths(self) -> dict[str, dict[str, Path]]: } def _create_output_paths(self) -> dict[str, dict[str, Any]]: - """Creates all output paths.""" + """Creates all input paths and stores them in a dictionary.""" output_dirs = { "times": Path("times", self._output_path), "origins": Path("points", self._output_path, "point_type=origin"), @@ -183,11 +199,12 @@ def get_path( self, dataset: str, path_type: str = "output", location: str = "local" ) -> str | Path: """ - Get a specific path by type and location. + Get a specific path by dataset name, type, and location. Args: - dataset: The type of path (e.g., 'times', 'origins', 'metadata') - location: Either 'local' or 's3' + dataset: The dataset name (e.g., 'times', 'origins', 'metadata'). + path_type: Either 'input' or 'output'. + location: Either 'local' or 's3'. """ if path_type == "output": path = self.output[location][f"{dataset}_file"] @@ -198,6 +215,13 @@ def get_path( def write_to_parquet( self, df: pd.DataFrame, dataset: str, location: str = "local" ) -> None: + """ + Write a DataFrame to an output Parquet file. + + Args: + dataset: The dataset name (e.g., 'times', 'origins', 'metadata'). + location: Either 'local' or 's3'. + """ df.to_parquet( self.get_path(dataset, path_type="output", location=location), engine="pyarrow", @@ -209,6 +233,10 @@ def write_to_parquet( class TravelTimeInputs: + """ + Class to hold input data and chunk settings for travel time calculations. + """ + def __init__( self, origins: pd.DataFrame, @@ -224,7 +252,7 @@ def __init__( self.chunk = chunk self.chunk_start_idx: int self.chunk_end_idx: int - self.chunk_size: int + self.chunk_size: int = int(10e7) self._set_chunk_attributes() self._set_origins_chunk() @@ -241,6 +269,7 @@ def __init__( ) def _set_chunk_attributes(self) -> None: + """Sets the origin chunk indices given the input chunk string.""" if self.chunk: self.chunk_start_idx, self.chunk_end_idx = map( int, self.chunk.split("-") @@ -248,6 +277,7 @@ def _set_chunk_attributes(self) -> None: self.chunk_size = self.chunk_end_idx - self.chunk_start_idx def _set_origins_chunk(self) -> None: + """Sets the origins chunk (if chunk is specified).""" df = self.origins if self.chunk: df = df.iloc[self.chunk_start_idx : self.chunk_end_idx] @@ -255,7 +285,10 @@ def _set_origins_chunk(self) -> None: class TravelTimeConfig: - """Configuration for time calculations with validation.""" + """ + Utility class to hold all configuration settings for travel time + calculations. Also includes loaders for the default input data. + """ OD_COLS = { "weighted": {"geoid": "id", "x_4326_wt": "lon", "y_4326_wt": "lat"}, @@ -282,6 +315,7 @@ def __init__( self.logger = logger def _load_od_file(self, path: str) -> pd.DataFrame: + """Load an origins or destinations file and prep for Valhalla.""" df = ( pd.read_parquet(self.paths.get_path(path, path_type="input")) .loc[:, list(self.OD_COLS[self.args.centroid_type].keys())] @@ -291,6 +325,7 @@ def _load_od_file(self, path: str) -> pd.DataFrame: return df def load_default_inputs(self) -> TravelTimeInputs: + """Load default origins and destinations files.""" origins = self._load_od_file("origins") destinations = self._load_od_file("destinations") return TravelTimeInputs( @@ -303,6 +338,11 @@ def load_default_inputs(self) -> TravelTimeInputs: class TravelTimeCalculator: + """ + Class to calculate travel times between origins and destinations. + Uses chunked requests to the Valhalla Matrix API for calculation. + """ + def __init__( self, actor: valhalla.Actor, @@ -326,6 +366,8 @@ def _calculate_times( Args: o_start_idx: Starting index for the origins DataFrame. d_start_idx: Starting index for the destinations DataFrame. + o_end_idx: Ending index for the origins DataFrame. + d_end_idx: Ending index for the destinations DataFrame. Returns: DataFrame containing origin IDs, destination IDs, travel durations, @@ -338,8 +380,8 @@ def _calculate_times( ) self.config.logger.info(job_string) - # Get the subset of origin and destination points and convert them to lists - # then squash them into the request body + # Get the subset of origin and destination points and convert them to + # lists then squash them into the request body origins_list = ( self.inputs.origins.iloc[o_start_idx:o_end_idx] .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) @@ -359,12 +401,12 @@ def _calculate_times( } ) - # Make the actual request to the matrix API + # Make the actual JSON request to the matrix API response = self.actor.matrix(request_json) response_data = json.loads(response) - # Parse the response data and convert it to a dataframe. Recover the - # origin and destination indices and append them to the dataframe + # Parse the response data and convert it to a DataFrame. Recover the + # origin and destination indices and append them to the DataFrame durations = response_data["sources_to_targets"]["durations"] distances = response_data["sources_to_targets"]["distances"] origin_ids = ( @@ -390,6 +432,13 @@ def _calculate_times( return df def _binary_search(self, o_start_idx, d_start_idx, o_end_idx, d_end_idx): + """ + Recursively split the origins and destinations into smaller chunks. + + Necessary because Valhalla will terminate certain unroutable requests. + Binary searching all origins and destinations will return all routable + values AROUND the unroutable ones. + """ if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: df = pd.merge( pd.DataFrame( @@ -425,7 +474,15 @@ def _binary_search(self, o_start_idx, d_start_idx, o_end_idx, d_end_idx): + self._binary_search(mid_o, mid_d, o_end_idx, d_end_idx) ) - def get_times(self): + def get_times(self) -> pd.DataFrame: + """ + Entrypoint to calculate times for all combinations of origins and + destinations in inputs. + + Returns: + DataFrame containing origin IDs, destination IDs, travel durations, + and distances for all inputs. + """ results = [] msso = self.inputs.max_split_size_origins noc = self.inputs.n_origins_chunk From 35b925679308cb8463abbeacea6a034f387bfdba Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Fri, 29 Nov 2024 18:40:18 -0600 Subject: [PATCH 6/6] Add check of chunk digits --- data/src/utils/times.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/data/src/utils/times.py b/data/src/utils/times.py index 664a289..b368382 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -58,6 +58,13 @@ def _validate_chunk(self, chunk: str | None) -> None: "Invalid chunk argument. Must be two numbers " "separated by a dash (e.g., '1-2')." ) + if chunk: + parts = chunk.split("-") + if len(parts[0]) != len(parts[1]): + raise ValueError( + "Invalid chunk argument. Both numbers must have" + "the same number of digits (including zero-padding)." + ) class TravelTimePaths: