diff --git a/data/params.yaml b/data/params.yaml index 02c2aac..be0bb07 100644 --- a/data/params.yaml +++ b/data/params.yaml @@ -7,7 +7,7 @@ s3: data_bucket: 'opentimes-data' public_bucket: 'opentimes-public' public_data_url: 'https://data.opentimes.org' - endpoint: 'https://fcb279b22cfe4c98f903ad8f9e7ccbb2.r2.cloudflarestorage.com' + endpoint_url: 'https://fcb279b22cfe4c98f903ad8f9e7ccbb2.r2.cloudflarestorage.com' account_id: 'fcb279b22cfe4c98f903ad8f9e7ccbb2' # Parameters used to control the chunking of work on GitHub Actions @@ -17,7 +17,7 @@ actions: # The minimum number of origins to include in a job. Higher = fewer jobs # that take longer. Lower = more jobs that finish quicker - min_chunk_size: 500 + min_chunk_size: 400 times: # Travel times output version. Follows SemVer (kind of): diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index d6c992e..bd80229 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -1,7 +1,6 @@ import argparse import json import os -import re import time import uuid from pathlib import Path @@ -10,177 +9,20 @@ import valhalla # type: ignore import yaml from utils.constants import DOCKER_INTERNAL_PATH +from utils.logging import create_logger +from utils.times import TravelTimeCalculator, TravelTimeConfig from utils.utils import format_time, get_md5_hash +logger = create_logger(__name__) + with open(DOCKER_INTERNAL_PATH / "params.yaml") as file: params = yaml.safe_load(file) -os.environ["AWS_PROFILE"] = params["s3"]["profile"] with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f: valhalla_data = json.load(f) +os.environ["AWS_PROFILE"] = params["s3"]["profile"] -def calculate_times( - actor, - o_start_idx: int, - d_start_idx: int, - o_end_idx: int, - d_end_idx: int, - origins: pd.DataFrame, - destinations: pd.DataFrame, - max_split_size_origins: int, - max_split_size_destinations: int, - mode: str, -) -> pd.DataFrame: - """Calculates travel times and distances between origins and destinations. - - Args: - actor: Valhalla actor instance for making matrix API requests. - o_start_idx: Starting index for the origins DataFrame. - d_start_idx: Starting index for the destinations DataFrame. - origins: DataFrame containing origin points with 'lat' and 'lon' columns. - destinations: DataFrame containing destination points with 'lat' and 'lon' columns. - max_split_size: Maximum number of points to process in one iteration. - mode: Travel mode for the Valhalla API (e.g., 'auto', 'bicycle'). - - Returns: - DataFrame containing origin IDs, destination IDs, travel durations, and distances. - """ - start_time = time.time() - job_string = ( - f"Routing origin indices {o_start_idx}-{o_end_idx - 1} to " - f"destination indices {d_start_idx}-{d_end_idx - 1}" - ) - print(job_string) - - # Get the subset of origin and destination points and convert them to lists - # then squash them into the request body - origins_list = ( - origins.iloc[o_start_idx:o_end_idx] - .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) - .tolist() - ) - destinations_list = ( - destinations.iloc[d_start_idx:d_end_idx] - .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) - .tolist() - ) - request_json = json.dumps( - { - "sources": origins_list, - "targets": destinations_list, - "costing": mode, - "verbose": False, - } - ) - - # Make the actual request to the matrix API - response = actor.matrix(request_json) - response_data = json.loads(response) - - # Parse the response data and convert it to a dataframe. Recover the - # origin and destination indices and append them to the dataframe - durations = response_data["sources_to_targets"]["durations"] - distances = response_data["sources_to_targets"]["distances"] - origin_ids = ( - origins.iloc[o_start_idx:o_end_idx]["id"] - .repeat(d_end_idx - d_start_idx) - .tolist() - ) - destination_ids = destinations.iloc[d_start_idx:d_end_idx][ - "id" - ].tolist() * (o_end_idx - o_start_idx) - - df = pd.DataFrame( - { - "origin_id": origin_ids, - "destination_id": destination_ids, - "duration_sec": [i for sl in durations for i in sl], - "distance_km": [i for sl in distances for i in sl], - } - ) - - elapsed_time = time.time() - start_time - print(job_string, f": {format_time(elapsed_time)}") - return df - - -def calculate_times_with_backoff( - actor, - origins, - destinations, - max_split_size_origins, - max_split_size_destinations, - mode, -): - results = [] - n_origins_chunk = len(origins) - n_destinations_chunk = len(destinations) - - def binary_search_times(o_start_idx, d_start_idx, o_end_idx, d_end_idx): - if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: - df = pd.merge( - pd.DataFrame( - origins[o_start_idx:o_end_idx], columns=["origin_id"] - ), - pd.DataFrame( - destinations[d_start_idx:d_end_idx], - columns=["destination_id"], - ), - how="cross", - ) - df["distance_km"] = pd.Series([], dtype=float) - df["duration_sec"] = pd.Series([], dtype=float) - - return [df] - try: - times = calculate_times( - actor=actor, - o_start_idx=o_start_idx, - d_start_idx=d_start_idx, - o_end_idx=o_end_idx, - d_end_idx=d_end_idx, - origins=origins, - destinations=destinations, - max_split_size_origins=max_split_size_origins, - max_split_size_destinations=max_split_size_destinations, - mode=mode, - ) - return [times] - except Exception as e: - print(f"Error: {e}, backing off and retrying...") - mid_o = (o_start_idx + o_end_idx) // 2 - mid_d = (d_start_idx + d_end_idx) // 2 - return ( - binary_search_times(o_start_idx, d_start_idx, mid_o, mid_d) - + binary_search_times(mid_o, d_start_idx, o_end_idx, mid_d) - + binary_search_times(o_start_idx, mid_d, mid_o, d_end_idx) - + binary_search_times(mid_o, mid_d, o_end_idx, d_end_idx) - ) - - for o in range(0, n_origins_chunk, max_split_size_origins): - for d in range(0, n_destinations_chunk, max_split_size_destinations): - results.extend( - binary_search_times( - o, - d, - min(o + max_split_size_origins, n_origins_chunk), - min(d + max_split_size_destinations, n_destinations_chunk), - ) - ) - - return results - - -def create_write_path(key: str, out_type: str, output_dict: dict) -> str: - """Tiny helper to create Parquet output write paths.""" - return ( - "s3://" + output_dict[out_type][key].as_posix() - if out_type == "s3" - else output_dict[out_type][key].as_posix() - ) - - -if __name__ == "__main__": +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--mode", required=True, type=str) parser.add_argument("--year", required=True, type=str) @@ -192,189 +34,46 @@ def create_write_path(key: str, out_type: str, output_dict: dict) -> str: args = parser.parse_args() script_start_time = time.time() - if args.mode not in params["times"]["mode"]: - raise ValueError( - "Invalid mode, must be one of: ", params["times"]["mode"] - ) - if args.centroid_type not in ["weighted", "unweighted"]: - raise ValueError( - "Invalid centroid_type, must be one of: ['weighted', 'unweighted']" - ) - if args.chunk: - if not re.match(r"^\d+-\d+$", args.chunk): - raise ValueError( - "Invalid chunk argument. Must be two numbers separated by a dash (e.g., '1-2')." - ) - - # Split and check chunk value - chunk_start_idx, chunk_end_idx = map(int, args.chunk.split("-")) - chunk_end_idx = chunk_end_idx + 1 - chunk_size = chunk_end_idx - chunk_start_idx - chunk_msg = f", chunk: {args.chunk}" if args.chunk else "" - print( - f"Starting routing for version: {params['times']['version']},", - f"mode: {args.mode}, year: {args.year}, geography: {args.geography},", - f"state: {args.state}, centroid type: {args.centroid_type}" - + chunk_msg, - ) - - ##### FILE PATHS ##### - - # Setup file paths for inputs (pre-made network file and OD points) - input = {} - input["main"] = { - "path": Path( - f"year={args.year}/geography={args.geography}/" - f"state={args.state}/{args.state}.parquet", - ) - } - input["dirs"] = { - "valhalla_tiles": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/valhalla_tiles/year={args.year}/", - f"geography=state/state={args.state}", - ) - } - input["files"] = { - "valhalla_tiles_file": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/valhalla_tiles/year={args.year}", - f"geography=state/state={args.state}/valhalla_tiles.tar.zst", - ), - "origins_file": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/cenloc/{input['main']['path']}", - ), - "destinations_file": Path( - DOCKER_INTERNAL_PATH, - f"intermediate/destpoint/{input['main']['path']}", - ), - } - - # Setup file paths for all outputs both locally and on the remote - output = {} - output["prefix"] = { - "local": Path(DOCKER_INTERNAL_PATH / "output"), - "s3": Path(params["s3"]["data_bucket"]), - } - output["main"] = { - "path": Path( - f"version={params['times']['version']}/mode={args.mode}/", - f"year={args.year}/geography={args.geography}/state={args.state}/", - f"centroid_type={args.centroid_type}", - ), - "file": Path( - f"part-{args.chunk}.parquet" if args.chunk else "part-0.parquet" - ), - } - output["dirs"] = { - "times": Path("times", output["main"]["path"]), - "origins": Path("points", output["main"]["path"], "point_type=origin"), - "destinations": Path( - "points", output["main"]["path"], "point_type=destination" - ), - "missing_pairs": Path("missing_pairs", output["main"]["path"]), - "metadata": Path("metadata", output["main"]["path"]), - } - for loc in ["local", "s3"]: - output[loc] = { - "times_file": Path( - output["prefix"][loc], - output["dirs"]["times"], - output["main"]["file"], - ), - "origins_file": Path( - output["prefix"][loc], - output["dirs"]["origins"], - output["main"]["file"], - ), - "destinations_file": Path( - output["prefix"][loc], - output["dirs"]["destinations"], - output["main"]["file"], - ), - "missing_pairs_file": Path( - output["prefix"][loc], - output["dirs"]["missing_pairs"], - output["main"]["file"], - ), - "metadata_file": Path( - output["prefix"][loc], - output["dirs"]["metadata"], - output["main"]["file"], - ), - } - - # Make sure outputs have somewhere to write to - for path in output["dirs"].values(): - path = output["prefix"]["local"] / path - path.mkdir(parents=True, exist_ok=True) - - ##### DATA PREP ##### - - # Load origins and destinations - od_cols = { - "weighted": {"geoid": "id", "x_4326_wt": "lon", "y_4326_wt": "lat"}, - "unweighted": {"geoid": "id", "x_4326": "lon", "y_4326": "lat"}, - }[args.centroid_type] - - origins = ( - pd.read_parquet(input["files"]["origins_file"]) - .loc[:, list(od_cols.keys())] - .rename(columns=od_cols) - .sort_values(by="id") - ) - n_origins = len(origins) - - # Subset the origins if a chunk is used - if args.chunk: - origins = origins.iloc[chunk_start_idx:chunk_end_idx] - - destinations = ( - pd.read_parquet(input["files"]["destinations_file"]) - .loc[:, list(od_cols.keys())] - .rename(columns=od_cols) - .sort_values(by="id") - ) - n_destinations = len(destinations) - n_origins_chunk = len(origins) - n_destinations_chunk = len(destinations) - - print( - f"Routing from {len(origins)} origins", - f"to {len(destinations)} destinations", - ) - - ##### CALCULATE TIMES ##### - - max_split_size_origins = min(params["times"]["max_split_size"], chunk_size) - max_split_size_destinations = min( - params["times"]["max_split_size"], n_destinations - ) - - # Initialize the Valhalla actor bindings + # Create a travel times configuration and set of origin/destination inputs + config = TravelTimeConfig(args, params=params, logger=logger) + inputs = config.load_default_inputs() + + chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else "" + logger.info( + "Starting routing for version: %s, mode: %s, year: %s, " + "geography: %s, state: %s, centroid type: %s%s", + config.params["times"]["version"], + config.args.mode, + config.args.year, + config.args.geography, + config.args.state, + config.args.centroid_type, + chunk_msg, + ) + logger.info( + "Routing from %s origins to %s destinations", + len(inputs.origins_chunk), + inputs.n_destinations, + ) + + # Initialize the default Valhalla actor bindings actor = valhalla.Actor((Path.cwd() / "valhalla.json").as_posix()) # Calculate times for each chunk and append to a list - results = calculate_times_with_backoff( - actor=actor, - origins=origins, - destinations=destinations, - max_split_size_origins=max_split_size_origins, - max_split_size_destinations=max_split_size_destinations, - mode=args.mode, - ) + tt_calc = TravelTimeCalculator(actor, config, inputs) + results_df = tt_calc.get_times() - print( - "Finished calculating times in", - f"{format_time(time.time() - script_start_time)}", + logger.info( + "Finished calculating times in %s", + format_time(time.time() - script_start_time), + ) + logger.info( + "Routed from %s origins to %s destinations", + inputs.n_origins, + inputs.n_destinations, ) - # Concatenate all results into a single DataFrame - results_df = pd.concat(results, ignore_index=True) - del results - - # Extract missing pairs to a separate dataframe + # Extract missing pairs to a separate DataFrame missing_pairs_df = results_df[results_df["duration_sec"].isnull()] missing_pairs_df = ( pd.DataFrame(missing_pairs_df) @@ -387,90 +86,47 @@ def create_write_path(key: str, out_type: str, output_dict: dict) -> str: by=["origin_id", "destination_id"] ) - ##### SAVE OUTPUTS ##### - - out_types = ["local", "s3"] if args.write_to_s3 else ["local"] - compression_type = params["output"]["compression"]["type"] - compression_level = params["output"]["compression"]["level"] - storage_options = { - "s3": { - "client_kwargs": { - "endpoint_url": params["s3"]["endpoint"], - } - }, - "local": {}, - } - print( - f"Routed from {len(origins)} origins", - f"to {len(destinations)} destinations", - ) - print( - f"Calculated times between {len(results_df)} pairs.", - f"Times missing between {len(missing_pairs_df)} pairs.", - f"Saving outputs to: {', '.join(out_types)}", - ) - # Loop through files and write to both local and remote paths - for out_type in out_types: - results_df.to_parquet( - create_write_path("times_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - origins.to_parquet( - create_write_path("origins_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - destinations.to_parquet( - create_write_path("destinations_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - missing_pairs_df.to_parquet( - create_write_path("missing_pairs_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - - ##### SAVE METADATA ##### - + out_locations = ["local", "s3"] if args.write_to_s3 else ["local"] + logger.info( + "Calculated times between %s pairs. Times missing between %s pairs. " + "Saving outputs to: %s", + len(results_df), + len(missing_pairs_df), + ", ".join(out_locations), + ) + for loc in out_locations: + config.paths.write_to_parquet(results_df, "times", loc) + config.paths.write_to_parquet(inputs.origins_chunk, "origins", loc) + config.paths.write_to_parquet(inputs.destinations, "destinations", loc) + config.paths.write_to_parquet(missing_pairs_df, "missing_pairs", loc) + + # Construct and save a metadata DataFrame run_id = str(uuid.uuid4().hex[:8]) git_commit_sha = str(os.getenv("GITHUB_SHA")) git_commit_sha_short = str(git_commit_sha[:8] if git_commit_sha else None) input_file_hashes = { - f: get_md5_hash(input["files"][f]) for f in input["files"].keys() + f: get_md5_hash(config.paths.input["files"][f]) + for f in config.paths.input["files"].keys() } output_file_hashes = { - f: get_md5_hash(output["local"][f]) - for f in output["local"].keys() + f: get_md5_hash(config.paths.output["local"][f]) + for f in config.paths.output["local"].keys() if f != "metadata_file" } # Create a metadata dataframe of all settings and data used for creating inputs # and generating times - metadata = pd.DataFrame( + metadata_df = pd.DataFrame( { "run_id": run_id, "calc_datetime_finished": pd.Timestamp.now(tz="UTC"), "calc_time_elapsed_sec": time.time() - script_start_time, "calc_chunk_id": args.chunk, - "calc_chunk_n_origins": n_origins_chunk, - "calc_chunk_n_destinations": n_destinations_chunk, - "calc_n_origins": n_origins, - "calc_n_destinations": n_destinations, + "calc_chunk_n_origins": inputs.n_origins_chunk, + "calc_chunk_n_destinations": inputs.n_destinations_chunk, + "calc_n_origins": inputs.n_origins, + "calc_n_destinations": inputs.n_destinations, "git_commit_sha_short": git_commit_sha_short, "git_commit_sha_long": git_commit_sha, "param_network_buffer_m": params["input"]["network_buffer_m"], @@ -498,21 +154,22 @@ def create_write_path(key: str, out_type: str, output_dict: dict) -> str: }, index=[0], ) + for loc in out_locations: + config.paths.write_to_parquet(metadata_df, "metadata", loc) - for out_type in out_types: - metadata.to_parquet( - create_write_path("metadata_file", out_type, output), - engine="pyarrow", - compression=compression_type, - compression_level=compression_level, - index=False, - storage_options=storage_options[out_type], - ) - - print( - f"Finished routing for version: {params['times']['version']},", - f"mode: {args.mode}, year: {args.year}, geography: {args.geography},", - f"state: {args.state}, centroid type: {args.centroid_type}" - + chunk_msg, - f"in {format_time(time.time() - script_start_time)}", + logger.info( + "Finished routing for version: %s, mode: %s, year: %s, " + "geography: %s, state: %s, centroid type: %s%s in %s", + config.params["times"]["version"], + config.args.mode, + config.args.year, + config.args.geography, + config.args.state, + config.args.centroid_type, + chunk_msg, + format_time(time.time() - script_start_time), ) + + +if __name__ == "__main__": + main() diff --git a/data/src/create_public_site.py b/data/src/create_public_site.py index b857ea8..cee3314 100644 --- a/data/src/create_public_site.py +++ b/data/src/create_public_site.py @@ -19,7 +19,7 @@ with open("params.yaml") as file: params = yaml.safe_load(file) session = boto3.Session(profile_name=params["s3"]["profile"]) -s3 = session.client("s3", endpoint_url=params["s3"]["endpoint"]) +s3 = session.client("s3", endpoint_url=params["s3"]["endpoint_url"]) # Initialize Jinja2 environment and template jinja_env = Environment(loader=FileSystemLoader("site/templates")) diff --git a/data/src/utils/times.py b/data/src/utils/times.py new file mode 100644 index 0000000..b368382 --- /dev/null +++ b/data/src/utils/times.py @@ -0,0 +1,513 @@ +import argparse +import json +import logging +import re +import time +from pathlib import Path +from typing import Any, Literal + +import pandas as pd +import valhalla # type: ignore + +from utils.constants import DOCKER_INTERNAL_PATH +from utils.utils import format_time + + +class TravelTimeArgs: + """ + Class to hold and validate arguments for travel time calculations. + + Arguments are passed at runtime via the command line and validated + against the parameters file ('params.yaml'). + """ + + def __init__(self, args: argparse.Namespace, params: dict) -> None: + self.mode: str + self.year: str + self.geography: str + self.state: str + self.centroid_type: str + self.chunk: str | None + self.write_to_s3: bool + + self._args_to_attr(args) + self._validate_mode(params, self.mode) + self._validate_centroid_type(self.centroid_type) + self._validate_chunk(self.chunk) + + def _args_to_attr(self, args: argparse.Namespace) -> None: + for k, v in vars(args).items(): + setattr(self, k.replace("-", "_"), v) + + def _validate_mode(self, params: dict, mode: str) -> None: + valid_modes = params["times"]["mode"] + if mode not in valid_modes: + raise ValueError(f"Invalid mode, must be one of: {valid_modes}") + + def _validate_centroid_type(self, centroid_type: str) -> None: + valid_centroid_types = ["weighted", "unweighted"] + if centroid_type not in valid_centroid_types: + raise ValueError( + "Invalid centroid_type, must be one " + f"of: {valid_centroid_types}" + ) + + def _validate_chunk(self, chunk: str | None) -> None: + if chunk and not re.match(r"^\d+-\d+$", chunk): + raise ValueError( + "Invalid chunk argument. Must be two numbers " + "separated by a dash (e.g., '1-2')." + ) + if chunk: + parts = chunk.split("-") + if len(parts[0]) != len(parts[1]): + raise ValueError( + "Invalid chunk argument. Both numbers must have" + "the same number of digits (including zero-padding)." + ) + + +class TravelTimePaths: + """ + Class to manage all input and output paths for travel time calculations. + + Paths are generated based on input arguments. Also holds remote (R2) + paths and write settings for Parquet files. + """ + + def __init__( + self, + args: TravelTimeArgs, + version: str, + docker_path: Path, + s3_bucket: str, + compression_type: Literal["snappy", "gzip", "brotli", "lz4", "zstd"], + compression_level: int = 3, + endpoint_url: str | None = None, + ) -> None: + self.args: TravelTimeArgs = args + self.version: str = version + self.docker_path: Path = docker_path + self.s3_bucket: str = s3_bucket + self.compression_type: Literal[ + "snappy", "gzip", "brotli", "lz4", "zstd" + ] = compression_type + self.compression_level: int = compression_level + self.endpoint_url: str | None = endpoint_url + self.storage_options = { + "s3": {"client_kwargs": {"endpoint_url": endpoint_url}}, + "local": {}, + } + + self.input: dict[str, dict] = {} + self.output: dict[str, dict] = {} + self._setup_paths() + + @property + def _main_path(self) -> Path: + """Base path for all data.""" + return Path( + f"year={self.args.year}/geography={self.args.geography}/", + f"state={self.args.state}", + ) + + @property + def _output_path(self) -> Path: + """Base path for output data.""" + return Path( + f"version={self.version}/mode={self.args.mode}/", + self._main_path, + f"centroid_type={self.args.centroid_type}", + ) + + @property + def _file_name(self) -> str: + """Generates file name based on chunk.""" + return ( + f"part-{self.args.chunk}.parquet" + if self.args.chunk + else "part-0.parquet" + ) + + def _setup_paths(self) -> None: + """Sets up all input and output paths.""" + self.input = self._create_input_paths() + self.output = self._create_output_paths() + self._create_output_directories() + + def _create_input_paths(self) -> dict[str, dict[str, Path]]: + """Creates all input paths and stores them in a dictionary.""" + return { + "main": {"path": self._main_path}, + "dirs": { + "valhalla_tiles": Path( + self.docker_path, + f"intermediate/valhalla_tiles/year={self.args.year}", + f"geography=state/state={self.args.state}", + ) + }, + "files": { + "valhalla_tiles_file": Path( + self.docker_path, + f"intermediate/valhalla_tiles/year={self.args.year}", + f"geography=state/state={self.args.state}/", + "valhalla_tiles.tar.zst", + ), + "origins_file": Path( + self.docker_path, + "intermediate/cenloc", + self._main_path, + f"{self.args.state}.parquet", + ), + "destinations_file": Path( + self.docker_path, + "intermediate/destpoint", + self._main_path, + f"{self.args.state}.parquet", + ), + }, + } + + def _create_output_paths(self) -> dict[str, dict[str, Any]]: + """Creates all input paths and stores them in a dictionary.""" + output_dirs = { + "times": Path("times", self._output_path), + "origins": Path("points", self._output_path, "point_type=origin"), + "destinations": Path( + "points", self._output_path, "point_type=destination" + ), + "missing_pairs": Path("missing_pairs", self._output_path), + "metadata": Path("metadata", self._output_path), + } + + prefix = { + "local": Path(self.docker_path, "output"), + "s3": Path(self.s3_bucket), + } + + output_files = {} + for loc in ["local", "s3"]: + output_files[loc] = { + f"{key}_file": f"s3://{Path(prefix[loc], path, self._file_name)}" + if loc == "s3" + else Path(prefix[loc], path, self._file_name) + for key, path in output_dirs.items() + } + + return {"prefix": prefix, "dirs": output_dirs, **output_files} + + def _create_output_directories(self) -> None: + """Creates local output directories if they don't exist.""" + for path in self.output["dirs"].values(): + full_path = self.output["prefix"]["local"] / path + full_path.mkdir(parents=True, exist_ok=True) + + def get_path( + self, dataset: str, path_type: str = "output", location: str = "local" + ) -> str | Path: + """ + Get a specific path by dataset name, type, and location. + + Args: + dataset: The dataset name (e.g., 'times', 'origins', 'metadata'). + path_type: Either 'input' or 'output'. + location: Either 'local' or 's3'. + """ + if path_type == "output": + path = self.output[location][f"{dataset}_file"] + else: + path = self.input["files"][f"{dataset}_file"] + return str(path) if location == "s3" else path + + def write_to_parquet( + self, df: pd.DataFrame, dataset: str, location: str = "local" + ) -> None: + """ + Write a DataFrame to an output Parquet file. + + Args: + dataset: The dataset name (e.g., 'times', 'origins', 'metadata'). + location: Either 'local' or 's3'. + """ + df.to_parquet( + self.get_path(dataset, path_type="output", location=location), + engine="pyarrow", + compression=self.compression_type, + compression_level=self.compression_level, + index=False, + storage_options=self.storage_options[location], + ) + + +class TravelTimeInputs: + """ + Class to hold input data and chunk settings for travel time calculations. + """ + + def __init__( + self, + origins: pd.DataFrame, + destinations: pd.DataFrame, + chunk: str | None, + max_split_size_origins: int, + max_split_size_destinations: int, + ) -> None: + self.origins = origins + self.destinations = destinations + self.origins_chunk: pd.DataFrame + + self.chunk = chunk + self.chunk_start_idx: int + self.chunk_end_idx: int + self.chunk_size: int = int(10e7) + self._set_chunk_attributes() + self._set_origins_chunk() + + self.n_origins: int = len(self.origins) + self.n_destinations: int = len(self.destinations) + self.n_origins_chunk: int = len(self.origins_chunk) + self.n_destinations_chunk: int = len(self.destinations) + + self.max_split_size_origins = min( + max_split_size_origins, self.chunk_size + ) + self.max_split_size_destinations = min( + max_split_size_destinations, self.n_destinations + ) + + def _set_chunk_attributes(self) -> None: + """Sets the origin chunk indices given the input chunk string.""" + if self.chunk: + self.chunk_start_idx, self.chunk_end_idx = map( + int, self.chunk.split("-") + ) + self.chunk_size = self.chunk_end_idx - self.chunk_start_idx + + def _set_origins_chunk(self) -> None: + """Sets the origins chunk (if chunk is specified).""" + df = self.origins + if self.chunk: + df = df.iloc[self.chunk_start_idx : self.chunk_end_idx] + self.origins_chunk = df + + +class TravelTimeConfig: + """ + Utility class to hold all configuration settings for travel time + calculations. Also includes loaders for the default input data. + """ + + OD_COLS = { + "weighted": {"geoid": "id", "x_4326_wt": "lon", "y_4326_wt": "lat"}, + "unweighted": {"geoid": "id", "x_4326": "lon", "y_4326": "lat"}, + } + + def __init__( + self, + args: argparse.Namespace, + params: dict, + logger: logging.Logger, + ) -> None: + self.args = TravelTimeArgs(args, params) + self.params = params + self.paths = TravelTimePaths( + args=self.args, + version=self.params["times"]["version"], + docker_path=DOCKER_INTERNAL_PATH, + s3_bucket=self.params["s3"]["data_bucket"], + compression_type=self.params["output"]["compression"]["type"], + compression_level=self.params["output"]["compression"]["level"], + endpoint_url=self.params["s3"]["endpoint_url"], + ) + self.logger = logger + + def _load_od_file(self, path: str) -> pd.DataFrame: + """Load an origins or destinations file and prep for Valhalla.""" + df = ( + pd.read_parquet(self.paths.get_path(path, path_type="input")) + .loc[:, list(self.OD_COLS[self.args.centroid_type].keys())] + .rename(columns=self.OD_COLS[self.args.centroid_type]) + .sort_values(by="id") + ) + return df + + def load_default_inputs(self) -> TravelTimeInputs: + """Load default origins and destinations files.""" + origins = self._load_od_file("origins") + destinations = self._load_od_file("destinations") + return TravelTimeInputs( + origins=origins, + destinations=destinations, + chunk=self.args.chunk, + max_split_size_origins=self.params["times"]["max_split_size"], + max_split_size_destinations=self.params["times"]["max_split_size"], + ) + + +class TravelTimeCalculator: + """ + Class to calculate travel times between origins and destinations. + Uses chunked requests to the Valhalla Matrix API for calculation. + """ + + def __init__( + self, + actor: valhalla.Actor, + config: TravelTimeConfig, + inputs: TravelTimeInputs, + ) -> None: + self.actor = actor + self.config = config + self.inputs = inputs + + def _calculate_times( + self, + o_start_idx: int, + d_start_idx: int, + o_end_idx: int, + d_end_idx: int, + ) -> pd.DataFrame: + """ + Calculates travel times and distances between origins and destinations. + + Args: + o_start_idx: Starting index for the origins DataFrame. + d_start_idx: Starting index for the destinations DataFrame. + o_end_idx: Ending index for the origins DataFrame. + d_end_idx: Ending index for the destinations DataFrame. + + Returns: + DataFrame containing origin IDs, destination IDs, travel durations, + and distances. + """ + start_time = time.time() + job_string = ( + f"Routing origin indices {o_start_idx}-{o_end_idx - 1} to " + f"destination indices {d_start_idx}-{d_end_idx - 1}" + ) + self.config.logger.info(job_string) + + # Get the subset of origin and destination points and convert them to + # lists then squash them into the request body + origins_list = ( + self.inputs.origins.iloc[o_start_idx:o_end_idx] + .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) + .tolist() + ) + destinations_list = ( + self.inputs.destinations.iloc[d_start_idx:d_end_idx] + .apply(lambda row: {"lat": row["lat"], "lon": row["lon"]}, axis=1) + .tolist() + ) + request_json = json.dumps( + { + "sources": origins_list, + "targets": destinations_list, + "costing": self.config.args.mode, + "verbose": False, + } + ) + + # Make the actual JSON request to the matrix API + response = self.actor.matrix(request_json) + response_data = json.loads(response) + + # Parse the response data and convert it to a DataFrame. Recover the + # origin and destination indices and append them to the DataFrame + durations = response_data["sources_to_targets"]["durations"] + distances = response_data["sources_to_targets"]["distances"] + origin_ids = ( + self.inputs.origins.iloc[o_start_idx:o_end_idx]["id"] + .repeat(d_end_idx - d_start_idx) + .tolist() + ) + destination_ids = self.inputs.destinations.iloc[d_start_idx:d_end_idx][ + "id" + ].tolist() * (o_end_idx - o_start_idx) + + df = pd.DataFrame( + { + "origin_id": origin_ids, + "destination_id": destination_ids, + "duration_sec": [i for sl in durations for i in sl], + "distance_km": [i for sl in distances for i in sl], + } + ) + + elapsed_time = time.time() - start_time + self.config.logger.info(f"{job_string}: {format_time(elapsed_time)}") + return df + + def _binary_search(self, o_start_idx, d_start_idx, o_end_idx, d_end_idx): + """ + Recursively split the origins and destinations into smaller chunks. + + Necessary because Valhalla will terminate certain unroutable requests. + Binary searching all origins and destinations will return all routable + values AROUND the unroutable ones. + """ + if o_start_idx + 1 >= o_end_idx and d_start_idx + 1 >= d_end_idx: + df = pd.merge( + pd.DataFrame( + self.inputs.origins[o_start_idx:o_end_idx], + columns=["origin_id"], + ), + pd.DataFrame( + self.inputs.destinations[d_start_idx:d_end_idx], + columns=["destination_id"], + ), + how="cross", + ) + df["distance_km"] = pd.Series([], dtype=float) + df["duration_sec"] = pd.Series([], dtype=float) + + return [df] + try: + times = self._calculate_times( + o_start_idx=o_start_idx, + d_start_idx=d_start_idx, + o_end_idx=o_end_idx, + d_end_idx=d_end_idx, + ) + return [times] + except Exception as e: + self.config.logger.info(f"Error: {e}, backing off and retrying...") + mid_o = (o_start_idx + o_end_idx) // 2 + mid_d = (d_start_idx + d_end_idx) // 2 + return ( + self._binary_search(o_start_idx, d_start_idx, mid_o, mid_d) + + self._binary_search(mid_o, d_start_idx, o_end_idx, mid_d) + + self._binary_search(o_start_idx, mid_d, mid_o, d_end_idx) + + self._binary_search(mid_o, mid_d, o_end_idx, d_end_idx) + ) + + def get_times(self) -> pd.DataFrame: + """ + Entrypoint to calculate times for all combinations of origins and + destinations in inputs. + + Returns: + DataFrame containing origin IDs, destination IDs, travel durations, + and distances for all inputs. + """ + results = [] + msso = self.inputs.max_split_size_origins + noc = self.inputs.n_origins_chunk + mssd = self.inputs.max_split_size_destinations + ndc = self.inputs.n_destinations_chunk + + for o in range(0, noc, msso): + for d in range(0, ndc, mssd): + results.extend( + self._binary_search( + o, + d, + min(o + msso, noc), + min(d + mssd, ndc), + ) + ) + + results_df = pd.concat(results, ignore_index=True) + del results + + return results_df