diff --git a/.github/actions/fetch-locations/action.yaml b/.github/actions/fetch-locations/action.yaml index 2dc8db4..01060a4 100644 --- a/.github/actions/fetch-locations/action.yaml +++ b/.github/actions/fetch-locations/action.yaml @@ -25,7 +25,7 @@ runs: shell: bash working-directory: 'data' run: | - dvc pull --no-run-cache create_destpoint_by_state \ + uv run dvc pull --no-run-cache create_destpoint_by_state \ create_cenloc_national create_cenloc_by_state - name: Cache save location input data diff --git a/.github/actions/fetch-valhalla-tiles/action.yaml b/.github/actions/fetch-valhalla-tiles/action.yaml index 3a53c05..e7597a1 100644 --- a/.github/actions/fetch-valhalla-tiles/action.yaml +++ b/.github/actions/fetch-valhalla-tiles/action.yaml @@ -31,7 +31,7 @@ runs: working-directory: 'data' run: | tile_path=year=${{ inputs.year }}/geography=state/state=${{ inputs.state }} - dvc pull --no-run-cache \ + uv run dvc pull --no-run-cache \ ./intermediate/valhalla_tiles/"$tile_path"/valhalla_tiles.tar.zst - name: Cache save tile input data diff --git a/.github/actions/setup-dvc/action.yaml b/.github/actions/setup-dvc/action.yaml index 6aa0795..122bfee 100644 --- a/.github/actions/setup-dvc/action.yaml +++ b/.github/actions/setup-dvc/action.yaml @@ -7,19 +7,15 @@ runs: using: composite steps: - name: Install uv - uses: astral-sh/setup-uv@v3 + uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-suffix: "dvc" - - name: Install Python - uses: actions/setup-python@v5 - with: - python-version-file: .python-version - - name: Install DVC id: install_python_deps shell: bash - run: uv pip install "dvc[s3]" - env: - UV_SYSTEM_PYTHON: 1 + run: | + uv python install + uv venv + uv pip install "dvc[s3]" diff --git a/.github/workflows/calculate-times.yaml b/.github/workflows/calculate-times.yaml index 4627125..177f6b9 100644 --- a/.github/workflows/calculate-times.yaml +++ b/.github/workflows/calculate-times.yaml @@ -121,7 +121,6 @@ env: # See: https://github.com/aws/aws-cli/issues/5262#issuecomment-705832151 AWS_EC2_METADATA_DISABLED: true PYTHONUNBUFFERED: "1" - UV_SYSTEM_PYTHON: 1 jobs: # Using the location data, split the origins into N jobs (max 256) @@ -156,20 +155,37 @@ jobs: docker load --input /tmp/opentimes.tar docker image ls -a + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + cache-suffix: "site-data" + cache-dependency-glob: | + pyproject.toml + uv.lock + + - name: Install Python dependencies + id: install-python-dependencies + shell: bash + run: | + uv python install + uv venv + uv pip install ".[site,data]" + - name: Fetch locations data uses: ./.github/actions/fetch-locations - name: Create job chunks id: create-job-chunks + working-directory: 'data' shell: bash run: | export USER_ID=${{ env.USER_ID }} export GROUP_ID=${{ env.GROUP_ID }} - chunks=$(docker compose run --rm --quiet-pull \ - --entrypoint=python valhalla-run /data/src/split_chunks.py \ + uv run python ./src/split_chunks.py \ --year ${{ inputs.year }} --geography ${{ inputs.geography }} \ - --state ${{ inputs.state }}) - echo "chunks=$chunks" >> $GITHUB_OUTPUT + --state ${{ inputs.state }} > chunks.txt + echo "chunks=$(cat chunks.txt)" >> $GITHUB_OUTPUT # If override chunks are set, use those instead chunks_parsed=($(echo "$chunks" | jq -r '.[]')) @@ -238,6 +254,23 @@ jobs: docker load --input /tmp/opentimes.tar docker image ls -a + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + cache-suffix: "site-data" + cache-dependency-glob: | + pyproject.toml + uv.lock + + - name: Install Python dependencies + id: install-python-dependencies + shell: bash + run: | + uv python install + uv venv + uv pip install ".[site,data]" + - name: Extract tiles shell: bash working-directory: 'data' @@ -253,8 +286,8 @@ jobs: run: | export USER_ID=${{ env.USER_ID }} export GROUP_ID=${{ env.GROUP_ID }} - docker compose run --rm --quiet-pull --entrypoint=python \ - valhalla-run /data/src/calculate_times.py \ + docker compose up --quiet-pull valhalla-run-fp valhalla-run-sp -d + uv run python ./src/calculate_times.py \ --mode ${{ inputs.mode }} --year ${{ inputs.year }} \ --geography ${{ inputs.geography }} --state ${{ inputs.state }} \ --centroid-type ${{ inputs.centroid_type }} \ diff --git a/.github/workflows/create-public-files.yaml b/.github/workflows/create-public-files.yaml index fa0216f..91edd3e 100644 --- a/.github/workflows/create-public-files.yaml +++ b/.github/workflows/create-public-files.yaml @@ -42,7 +42,6 @@ env: # See: https://github.com/aws/aws-cli/issues/5262#issuecomment-705832151 AWS_EC2_METADATA_DISABLED: true PYTHONUNBUFFERED: "1" - UV_SYSTEM_PYTHON: 1 jobs: setup-jobs: @@ -79,7 +78,7 @@ jobs: uses: actions/checkout@v4 - name: Install uv - uses: astral-sh/setup-uv@v3 + uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-suffix: "site-data" @@ -87,15 +86,12 @@ jobs: pyproject.toml uv.lock - - name: Install Python - uses: actions/setup-python@v5 - with: - python-version-file: .python-version - - name: Install Python dependencies id: install-python-dependencies shell: bash - run: uv pip install ".[site]" ".[data]" + run: | + uv python install + uv pip install ".[site,data]" - name: Setup Cloudflare credentials uses: ./.github/actions/setup-cloudflare-s3 @@ -110,7 +106,7 @@ jobs: run: | datasets_parsed=($(echo "${{ inputs.dataset }}" | tr -d ' ' | tr ',' ' ')) for dataset in "${datasets_parsed[@]}"; do - python ./src/create_public_files.py \ + uv run python ./src/create_public_files.py \ --dataset "$dataset" --version ${{ inputs.version }} \ --mode ${{ matrix.mode }} --year ${{ matrix.year }} \ --geography ${{ matrix.geography }} diff --git a/.github/workflows/create-valhalla-tiles.yaml b/.github/workflows/create-valhalla-tiles.yaml index 332ac4f..3b40624 100644 --- a/.github/workflows/create-valhalla-tiles.yaml +++ b/.github/workflows/create-valhalla-tiles.yaml @@ -30,7 +30,6 @@ env: # See: https://github.com/aws/aws-cli/issues/5262#issuecomment-705832151 AWS_EC2_METADATA_DISABLED: true PYTHONUNBUFFERED: "1" - UV_SYSTEM_PYTHON: 1 jobs: setup-jobs: @@ -110,7 +109,7 @@ jobs: shell: bash working-directory: 'data' run: | - dvc pull --no-run-cache \ + uv run dvc pull --no-run-cache \ ./intermediate/osmextract/year=${{ inputs.year }}/geography=state/state=${{ matrix.state }}/${{ matrix.state }}.osm.pbf - name: Run job chunk @@ -120,7 +119,7 @@ jobs: # Disable elevation for Alaska (which requires 22GB of tiles) BUILD_ELEVATION: ${{ matrix.state == '02' && 'False' || 'True' }} run: | - dvc repro -s create_valhalla_tiles@${{ inputs.year }}-${{ matrix.state }} + uv dvc repro -s create_valhalla_tiles@${{ inputs.year }}-${{ matrix.state }} - name: Write tile files to S3 shell: bash diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml index 735ada8..a847112 100644 --- a/.github/workflows/pre-commit.yaml +++ b/.github/workflows/pre-commit.yaml @@ -7,7 +7,6 @@ name: pre-commit env: PYTHONUNBUFFERED: "1" - UV_SYSTEM_PYTHON: 1 jobs: pre-commit: @@ -17,19 +16,17 @@ jobs: uses: actions/checkout@v4 - name: Install uv - uses: astral-sh/setup-uv@v3 + uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-suffix: "pre-commit" - - name: Install Python - uses: actions/setup-python@v5 - with: - python-version-file: .python-version - - name: Install pre-commit shell: bash - run: uv pip install pre-commit + run: | + uv python install + uv venv + uv pip install pre-commit - name: Cache pre-commit environment uses: actions/cache@v4 @@ -39,4 +36,4 @@ jobs: - name: Run pre-commit shell: bash - run: pre-commit run --show-diff-on-failure --color=always --all-files + run: uv run pre-commit run --show-diff-on-failure --color=always --all-files diff --git a/.github/workflows/pypi-publish.yaml b/.github/workflows/pypi-publish.yaml index 24986af..65f2978 100644 --- a/.github/workflows/pypi-publish.yaml +++ b/.github/workflows/pypi-publish.yaml @@ -7,7 +7,6 @@ name: pypi-publish env: PYTHONUNBUFFERED: "1" - UV_SYSTEM_PYTHON: 1 jobs: pypi-publish: @@ -23,15 +22,7 @@ jobs: uses: actions/checkout@v4 - name: Install uv - uses: astral-sh/setup-uv@v3 - - - name: Install Python - uses: actions/setup-python@v5 - with: - python-version-file: .python-version - - - name: Install Python dependencies - run: uv pip install . + uses: astral-sh/setup-uv@v5 - name: Build Python dist run: uv build diff --git a/.github/workflows/update-data-site.yaml b/.github/workflows/update-data-site.yaml index 6116ee4..cfd6d42 100644 --- a/.github/workflows/update-data-site.yaml +++ b/.github/workflows/update-data-site.yaml @@ -14,7 +14,6 @@ env: # See: https://github.com/aws/aws-cli/issues/5262#issuecomment-705832151 AWS_EC2_METADATA_DISABLED: true PYTHONUNBUFFERED: "1" - UV_SYSTEM_PYTHON: 1 jobs: update-site: @@ -25,7 +24,7 @@ jobs: uses: actions/checkout@v4 - name: Install uv - uses: astral-sh/setup-uv@v3 + uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-suffix: "site-data" @@ -33,15 +32,13 @@ jobs: pyproject.toml uv.lock - - name: Install Python - uses: actions/setup-python@v5 - with: - python-version-file: .python-version - - name: Install Python dependencies id: install-python-dependencies shell: bash - run: uv pip install ".[site]" ".[data]" + run: | + uv python install + uv venv + uv pip install ".[site,data]" - name: Setup Cloudflare credentials uses: ./.github/actions/setup-cloudflare-s3 @@ -57,7 +54,7 @@ jobs: echo "::add-mask::${{ secrets.CLOUDFLARE_CACHE_API_TOKEN }}" echo "::add-mask::${{ secrets.CLOUDFLARE_CACHE_ZONE_ID }}" bucket=$(yq e '.s3.public_bucket' params.yaml) - python ./src/create_public_site.py + uv run python ./src/create_public_site.py env: CLOUDFLARE_CACHE_API_TOKEN: ${{ secrets.CLOUDFLARE_CACHE_API_TOKEN }} CLOUDFLARE_CACHE_ZONE_ID: ${{ secrets.CLOUDFLARE_CACHE_ZONE_ID }} diff --git a/data/Dockerfile b/data/Dockerfile index e9c17ef..7851cec 100644 --- a/data/Dockerfile +++ b/data/Dockerfile @@ -1,17 +1,7 @@ FROM ghcr.io/nilsnolde/docker-valhalla/valhalla:latest - SHELL ["/bin/bash", "-o", "pipefail", "-c"] -WORKDIR /custom_files USER root -# Install Python package dependencies -RUN apt-get update && \ - apt-get install -y --no-install-recommends \ - osmium-tool=1.16.0-1build1 libzstd1=1.5.5+dfsg2-2build1.1 \ - libudunits2-dev=2.2.28-7build1 libproj-dev=9.4.0-1build2 \ - gdal-bin=3.8.4+dfsg-3ubuntu3 geos-bin=3.12.1-3build1 && \ - rm -rf /var/lib/apt/lists/* - # Create a new valhalla user with correct ids # https://jtreminio.com/blog/running-docker-containers-as-current-host-user ARG USER_ID=59999 @@ -21,10 +11,10 @@ RUN userdel -f valhalla && \ if getent group ${GROUP_ID}; then groupdel $(getent group ${GROUP_ID} | cut -d: -f1); fi && \ groupadd -g ${GROUP_ID} valhalla && \ useradd -l -u ${USER_ID} -g valhalla valhalla && \ - install -d -m 0755 -o valhalla -g valhalla /home/valhalla + install -d -m 0755 -o valhalla -g valhalla /home/valhalla && \ + rm -rf /custom_files && mkdir -p /custom_files && \ + chmod 0775 /custom_files && \ + chown valhalla:valhalla /custom_files -# Install Python dependencies for opentimes work inside docker -COPY --from=ghcr.io/astral-sh/uv:0.5.0 /uv /uvx /bin/ -COPY pyproject.toml uv.lock ./ -RUN uv pip install --no-cache-dir --system --break-system-packages .[data] +WORKDIR /custom_files USER valhalla diff --git a/data/params.yaml b/data/params.yaml index 103cd7c..56d4c2c 100644 --- a/data/params.yaml +++ b/data/params.yaml @@ -20,14 +20,14 @@ actions: # The minimum number of origins to include in a job. Higher = fewer jobs # that take longer. Lower = more jobs that finish quicker - origin_min_chunk_size: 500 + origin_min_chunk_size: 900 # The max number of destination splits to create for a workflow destination_n_chunks: 4 # The minimum number of destinations included in each job. For reference, # most states have around 10K Census tract destinations - destination_min_chunk_size: 10000 + destination_min_chunk_size: 20000 times: # Travel times output version. Follows SemVer (kind of): @@ -46,7 +46,7 @@ times: # Maximum size of chunk of origins AND destinations to process in a single call to # Valhalla. This is necessary because larger matrices will cause it to choke - max_split_size: 250 + max_split_size: 150 # Coordinates are snapped to the OSM street network before time calculation. # Setting this to true will use the snapped coordinates directly in the diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index b286d43..92842d1 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -6,9 +6,7 @@ from pathlib import Path import pandas as pd -import valhalla # type: ignore import yaml -from utils.constants import DOCKER_INTERNAL_PATH from utils.logging import create_logger from utils.times import ( TravelTimeCalculator, @@ -19,7 +17,7 @@ logger = create_logger(__name__) -with open(DOCKER_INTERNAL_PATH / "params.yaml") as file: +with open(Path.cwd() / "params.yaml") as file: params = yaml.safe_load(file) os.environ["AWS_PROFILE"] = params["s3"]["profile"] @@ -53,29 +51,23 @@ def main() -> None: chunk_msg, ) logger.info( - "Routing from %s origins to %s destinations (%s pairs)", + "Starting with %s origins to %s destinations (%s pairs)", len(inputs.origins), inputs.n_destinations, len(inputs.origins) * inputs.n_destinations, ) - # Initialize the Valhalla router Python bindings. The _sp version is a - # more expensive fallback router used as a second pass - actor = valhalla.Actor((Path.cwd() / "valhalla.json").as_posix()) - actor_sp = valhalla.Actor((Path.cwd() / "valhalla_sp.json").as_posix()) - # Use the Vahalla Locate API to append coordinates that are snapped to OSM if config.params["times"]["use_snapped"]: logger.info("Snapping coordinates to OSM network") - inputs.origins = snap_df_to_osm( - inputs.origins, config.args.mode, actor - ) + inputs.origins = snap_df_to_osm(inputs.origins, config.args.mode) inputs.destinations = snap_df_to_osm( - inputs.destinations, config.args.mode, actor + inputs.destinations, config.args.mode ) # Calculate times for each chunk and return a single DataFrame - tt_calc = TravelTimeCalculator(actor, actor_sp, config, inputs) + logger.info("Tiles loaded and coodinates ready, starting routing") + tt_calc = TravelTimeCalculator(config, inputs) results_df = tt_calc.many_to_many() logger.info( "Finished calculating times for %s pairs in %s", @@ -126,9 +118,9 @@ def main() -> None: # Create a metadata DataFrame of all settings and data used for creating # inputs and generating times - with open(DOCKER_INTERNAL_PATH / "valhalla.json", "r") as f: + with open(Path.cwd() / "valhalla.json", "r") as f: valhalla_data = json.load(f) - with open(DOCKER_INTERNAL_PATH / "valhalla_sp.json", "r") as f: + with open(Path.cwd() / "valhalla_sp.json", "r") as f: valhalla_data_sp = json.load(f) metadata_df = pd.DataFrame( { diff --git a/data/src/split_chunks.py b/data/src/split_chunks.py index e704f14..6b6873d 100644 --- a/data/src/split_chunks.py +++ b/data/src/split_chunks.py @@ -1,10 +1,10 @@ import argparse +from pathlib import Path import yaml -from utils.constants import DOCKER_INTERNAL_PATH from utils.utils import split_od_files_to_json -with open(DOCKER_INTERNAL_PATH / "params.yaml") as file: +with open(Path.cwd() / "params.yaml") as file: params = yaml.safe_load(file) @@ -40,7 +40,7 @@ def split_chunks( Defaults to 10000. """ origin_file = ( - DOCKER_INTERNAL_PATH + Path.cwd() / "intermediate" / "cenloc" / f"year={year}" @@ -49,7 +49,7 @@ def split_chunks( / f"{state}.parquet" ) destination_file = ( - DOCKER_INTERNAL_PATH + Path.cwd() / "intermediate" / "destpoint" / f"year={year}" diff --git a/data/src/utils/constants.py b/data/src/utils/constants.py index 314ac92..d663e1d 100644 --- a/data/src/utils/constants.py +++ b/data/src/utils/constants.py @@ -1,8 +1,7 @@ -import os -from pathlib import Path - -# Path relative to the mounts within the Valhalla Docker container -DOCKER_INTERNAL_PATH = Path(os.environ.get("DOCKER_INTERNAL_PATH", Path.cwd())) +# Local endpoints for Docker containers running Valhalla service. See the +# Compose file for endpoint setup +DOCKER_ENDPOINT_FIRST_PASS = "http://127.0.0.1:8002" +DOCKER_ENDPOINT_SECOND_PASS = "http://127.0.0.1:8003" # This is a dictionary that determines the construction of the public # OpenTimes files. partition_levels is the number of directories present in diff --git a/data/src/utils/times.py b/data/src/utils/times.py index 3e42d46..d364407 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -1,21 +1,25 @@ import argparse import json import logging +import os import re import time +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Any, Literal import pandas as pd -import valhalla # type: ignore +import requests as r -from utils.constants import DOCKER_INTERNAL_PATH +from utils.constants import ( + DOCKER_ENDPOINT_FIRST_PASS, + DOCKER_ENDPOINT_SECOND_PASS, +) from utils.utils import ( create_empty_df, format_time, group_by_column_sets, merge_overlapping_df_list, - suppress_stdout, ) @@ -86,7 +90,6 @@ def __init__( self, args: TravelTimeArgs, version: str, - docker_path: Path, s3_bucket: str, compression_type: Literal["snappy", "gzip", "brotli", "lz4", "zstd"], compression_level: int = 3, @@ -94,7 +97,6 @@ def __init__( ) -> None: self.args: TravelTimeArgs = args self.version: str = version - self.docker_path: Path = docker_path self.s3_bucket: str = s3_bucket self.compression_type: Literal[ "snappy", "gzip", "brotli", "lz4", "zstd" @@ -148,26 +150,26 @@ def _create_input_paths(self) -> dict[str, dict[str, Path]]: "main": {"path": self._main_path}, "dirs": { "valhalla_tiles": Path( - self.docker_path, + Path.cwd(), f"intermediate/valhalla_tiles/year={self.args.year}", f"geography=state/state={self.args.state}", ) }, "files": { "valhalla_tiles_file": Path( - self.docker_path, + Path.cwd(), f"intermediate/valhalla_tiles/year={self.args.year}", f"geography=state/state={self.args.state}/", "valhalla_tiles.tar.zst", ), "origins_file": Path( - self.docker_path, + Path.cwd(), "intermediate/cenloc", self._main_path, f"{self.args.state}.parquet", ), "destinations_file": Path( - self.docker_path, + Path.cwd(), "intermediate/destpoint", self._main_path, f"{self.args.state}.parquet", @@ -188,7 +190,7 @@ def _create_output_paths(self) -> dict[str, dict[str, Any]]: } prefix = { - "local": Path(self.docker_path, "output"), + "local": Path(Path.cwd(), "output"), "s3": Path(self.s3_bucket), } @@ -336,7 +338,6 @@ def __init__( self.paths = TravelTimePaths( args=self.args, version=self.params["times"]["version"], - docker_path=DOCKER_INTERNAL_PATH, s3_bucket=self.params["s3"]["data_bucket"], compression_type=self.params["output"]["compression"]["type"], compression_level=self.params["output"]["compression"]["level"], @@ -376,13 +377,9 @@ class TravelTimeCalculator: def __init__( self, - actor: valhalla.Actor, - actor_fallback: valhalla.Actor, config: TravelTimeConfig, inputs: TravelTimeInputs, ) -> None: - self.actor = actor - self.actor_fallback = actor_fallback self.config = config self.inputs = inputs @@ -390,7 +387,7 @@ def _calculate_times( self, origins: pd.DataFrame, destinations: pd.DataFrame, - actor: valhalla.Actor, + endpoint: str, ) -> pd.DataFrame: """ Sends the travel time calculation request to the Valhalla Matrix API. @@ -420,11 +417,11 @@ def _col_dict(x, snapped=self.config.params["times"]["use_snapped"]): } ) - # Get the actual JSON response from the API. Suppressing stdout here - # since Valhalla prints a bunch of useless output - with suppress_stdout(): - response = actor.matrix(request_json) - response_data = json.loads(response) + # Get the actual JSON response from the API + response = r.post(endpoint + "/sources_to_targets", data=request_json) + response_data = response.json() + if response.status_code != 200: + raise ValueError(response_data["error"]) # Parse the response data and convert it to a DataFrame. Recover the # origin and destination indices and append them to the DataFrame @@ -454,7 +451,7 @@ def _binary_search( cur_depth: int, origins: pd.DataFrame, destinations: pd.DataFrame, - actor: valhalla.Actor, + endpoint: str, ) -> list[pd.DataFrame]: """ Recursively split the origins and destinations into smaller chunks. @@ -467,14 +464,6 @@ def _binary_search( simply querying the same unroutable values over and over. """ start_time = time.time() - if print_log or self.config.verbose: - self.config.logger.info( - "Routing origin indices %s-%s to destination indices %s-%s", - o_start_idx, - o_end_idx, - d_start_idx, - d_end_idx, - ) # If indices are out-of-bounds return an empty list if o_start_idx >= o_end_idx or d_start_idx >= d_end_idx: @@ -486,7 +475,7 @@ def _binary_search( df = self._calculate_times( origins=origins.iloc[o_start_idx:o_end_idx], destinations=destinations.iloc[d_start_idx:d_end_idx], - actor=actor, + endpoint=endpoint, ) except Exception as e: df = create_empty_df( @@ -526,13 +515,17 @@ def _binary_search( times = self._calculate_times( origins=origins.iloc[o_start_idx:o_end_idx], destinations=destinations.iloc[d_start_idx:d_end_idx], - actor=actor, + endpoint=endpoint, ) if print_log or self.config.verbose: elapsed_time = time.time() - start_time self.config.logger.info( - "Routed %s pairs (%s missing) in %s", + "From origins %s-%s to destinations %s-%s, routed %s pairs (%s missing) in %s", + o_start_idx, + o_end_idx, + d_start_idx, + d_end_idx, (o_end_idx - o_start_idx) * (d_end_idx - d_start_idx), len(times[times["duration_sec"].isnull()]), format_time(elapsed_time), @@ -549,10 +542,10 @@ def _binary_search( mo, md = (osi + oei) // 2, (dsi + dei) // 2 # fmt: off return ( - self._binary_search(osi, dsi, mo, md, False, cur_depth + 1, origins, destinations, actor) - + self._binary_search(mo, dsi, oei, md, False, cur_depth + 1, origins, destinations, actor) - + self._binary_search(osi, md, mo, dei, False, cur_depth + 1, origins, destinations, actor) - + self._binary_search(mo, md, oei, dei, False, cur_depth + 1, origins, destinations, actor) + self._binary_search(osi, dsi, mo, md, False, cur_depth + 1, origins, destinations, endpoint) + + self._binary_search(mo, dsi, oei, md, False, cur_depth + 1, origins, destinations, endpoint) + + self._binary_search(osi, md, mo, dei, False, cur_depth + 1, origins, destinations, endpoint) + + self._binary_search(mo, md, oei, dei, False, cur_depth + 1, origins, destinations, endpoint) ) # fmt: on @@ -573,21 +566,26 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame: m_spl_d = self.inputs.max_split_size_destinations n_dc = self.inputs.n_destinations - for o in range(0, n_oc, max_spl_o): - for d in range(0, n_dc, m_spl_d): - results.extend( - self._binary_search( - o_start_idx=o, - d_start_idx=d, - o_end_idx=min(o + max_spl_o, n_oc), - d_end_idx=min(d + m_spl_d, n_dc), - print_log=True, - cur_depth=0, - origins=self.inputs.origins, - destinations=self.inputs.destinations, - actor=self.actor, + with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + futures = [] + for o in range(0, n_oc, max_spl_o): + for d in range(0, n_dc, m_spl_d): + futures.append( + executor.submit( + self._binary_search, + o_start_idx=o, + d_start_idx=d, + o_end_idx=min(o + max_spl_o, n_oc), + d_end_idx=min(d + m_spl_d, n_dc), + print_log=True, + cur_depth=0, + origins=self.inputs.origins, + destinations=self.inputs.destinations, + endpoint=DOCKER_ENDPOINT_FIRST_PASS, + ) ) - ) + for future in futures: + results.extend(future.result()) # Return empty result set if nothing is routable if len(results) == 0: @@ -652,25 +650,35 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame: self.config.logger.info("Routing missing set number %s", idx) o_ids = missing_set["origin_id"].unique() d_ids = missing_set["destination_id"].unique() - for o in range(0, len(o_ids), max_spl_o): - for d in range(0, len(d_ids), m_spl_d): - results_sp.extend( - self._binary_search( - o_start_idx=o, - d_start_idx=d, - o_end_idx=min(o + max_spl_o, len(o_ids)), - d_end_idx=min(d + m_spl_d, len(d_ids)), - print_log=True, - cur_depth=0, - origins=self.inputs.origins[ - self.inputs.origins["id"].isin(o_ids) - ], - destinations=self.inputs.destinations[ - self.inputs.destinations["id"].isin(d_ids) - ], - actor=self.actor_fallback, + with ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + futures = [] + for o in range(0, len(o_ids), max_spl_o): + for d in range(0, len(d_ids), m_spl_d): + futures.append( + executor.submit( + self._binary_search, + o_start_idx=o, + d_start_idx=d, + o_end_idx=min(o + max_spl_o, len(o_ids)), + d_end_idx=min(d + m_spl_d, len(d_ids)), + print_log=True, + cur_depth=0, + origins=self.inputs.origins[ + self.inputs.origins["id"].isin(o_ids) + ], + destinations=self.inputs.destinations[ + self.inputs.destinations["id"].isin( + d_ids + ) + ], + endpoint=DOCKER_ENDPOINT_SECOND_PASS, + ) ) - ) + + for future in futures: + results_sp.extend(future.result()) # Merge the results from the second pass with the first pass results_sp_df = ( @@ -684,9 +692,7 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame: return results_df -def snap_df_to_osm( - df: pd.DataFrame, mode: str, actor: valhalla.Actor -) -> pd.DataFrame: +def snap_df_to_osm(df: pd.DataFrame, mode: str) -> pd.DataFrame: """ Snap a DataFrame of lat/lon points to the OpenStreetMap network using the Valhalla Locate API. @@ -694,7 +700,6 @@ def snap_df_to_osm( Args: df: DataFrame containing the columns 'id', 'lat', and 'lon'. mode: Travel mode to use for snapping. - actor: Valhalla Actor object for making API requests. """ df_list = df.apply( lambda x: {"lat": x["lat"], "lon": x["lon"]}, axis=1 @@ -707,8 +712,10 @@ def snap_df_to_osm( } ) - response = actor.locate(request_json) - response_data = json.loads(response) + response = r.post("http://127.0.0.1:8002/locate", data=request_json) + response_data = response.json() + if response.status_code != 200: + raise ValueError(response_data["error"]) # Use the first element of nodes to populate the snapped lat/lon, otherwise # fallback to the correlated lat/lon from edges diff --git a/data/src/utils/utils.py b/data/src/utils/utils.py index 4d87d21..3b7fe1f 100644 --- a/data/src/utils/utils.py +++ b/data/src/utils/utils.py @@ -1,9 +1,6 @@ import hashlib import itertools import math -import os -import sys -from contextlib import contextmanager from copy import deepcopy from pathlib import Path @@ -205,23 +202,3 @@ def split_range( chunk_ranges[-1] = (start, n) return chunk_ranges - - -# https://stackoverflow.com/a/17954769 -@contextmanager -def suppress_stdout(): - """Redirect stdout to /dev/null. Useful for sinking Valhalla output.""" - fd = sys.stdout.fileno() - - def _redirect_stdout(to): - sys.stdout.close() - os.dup2(to.fileno(), fd) - sys.stdout = os.fdopen(fd, "w") - - with os.fdopen(os.dup(fd), "w") as old_stdout: - with open(os.devnull, "w") as file: - _redirect_stdout(to=file) - try: - yield - finally: - _redirect_stdout(to=old_stdout) diff --git a/data/valhalla.json b/data/valhalla.json index 74fcc48..2c98e53 100644 --- a/data/valhalla.json +++ b/data/valhalla.json @@ -15,8 +15,6 @@ "loki": { "actions": [ "locate", - "route", - "height", "sources_to_targets", "status" ], @@ -110,7 +108,7 @@ "use_rest_area": false, "use_urban_tag": false }, - "global_synchronized_cache": true, + "global_synchronized_cache": false, "hierarchy": true, "id_table_size": 1300000000, "import_bike_share_stations": false, @@ -127,7 +125,7 @@ "type": "std_out" }, "lru_mem_cache_hard_control": false, - "max_cache_size": 12000000000, + "max_cache_size": 3000000000, "max_concurrent_reader_users": 1, "reclassify_links": true, "shortcuts": true, @@ -138,7 +136,7 @@ "transit_dir": "/custom_files/transit_tiles", "transit_feeds_dir": "/gtfs_feeds", "transit_pbf_limit": 200000, - "use_lru_mem_cache": true, + "use_lru_mem_cache": false, "use_simple_mem_cache": false, "default_speeds_config": "/custom_files/default_speeds.json" }, diff --git a/data/valhalla_sp.json b/data/valhalla_sp.json index a5e2d91..e697eef 100644 --- a/data/valhalla_sp.json +++ b/data/valhalla_sp.json @@ -110,7 +110,7 @@ "use_rest_area": false, "use_urban_tag": false }, - "global_synchronized_cache": true, + "global_synchronized_cache": false, "hierarchy": true, "id_table_size": 1300000000, "import_bike_share_stations": false, @@ -127,7 +127,7 @@ "type": "std_out" }, "lru_mem_cache_hard_control": false, - "max_cache_size": 12000000000, + "max_cache_size": 3000000000, "max_concurrent_reader_users": 1, "reclassify_links": true, "shortcuts": true, @@ -138,7 +138,7 @@ "transit_dir": "/custom_files/transit_tiles", "transit_feeds_dir": "/gtfs_feeds", "transit_pbf_limit": 200000, - "use_lru_mem_cache": true, + "use_lru_mem_cache": false, "use_simple_mem_cache": false, "default_speeds_config": "/custom_files/default_speeds.json" }, @@ -279,7 +279,7 @@ "max_reserved_labels_count_bidir_astar": 10000000, "max_reserved_labels_count_bidir_dijkstras": 20000000, "max_reserved_labels_count_dijkstras": 40000000, - "max_reserved_locations_costmatrix": 100000, + "max_reserved_locations_costmatrix": 50000, "service": { "proxy": "ipc:///tmp/thor" }, diff --git a/docker-compose.yaml b/docker-compose.yaml index 56bdfe8..8d14ef3 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,3 +1,18 @@ +x-valhalla-run: &valhalla-run + image: opentimes:latest + user: ${USER_ID}:${GROUP_ID} + build: + context: . + dockerfile: data/Dockerfile + args: + - USER_ID=${USER_ID} + - GROUP_ID=${GROUP_ID} + environment: + - use_tiles_ignore_pbf=True + - update_existing_config=False + - force_rebuild=False + - tileset_name=tiles + - traffic_name=traffic services: # https://github.com/nilsnolde/docker-valhalla/blob/master/docker-compose.yml @@ -15,7 +30,6 @@ services: - ./data/build/:/custom_files:rw - ./data/valhalla.json:/custom_files/valhalla.json:rw environment: - - PYTHONUNBUFFERED="1" - serve_tiles=False - tileset_name=tiles # Optional so we can exclude Alaska (which requires 22GB of tiles) @@ -29,37 +43,27 @@ services: - use_default_speeds_config=True - traffic_name=traffic - valhalla-run: - image: opentimes:latest - container_name: valhalla-build - user: ${USER_ID}:${GROUP_ID} - build: - context: . - dockerfile: data/Dockerfile - args: - - USER_ID=59999 - - GROUP_ID=59999 + valhalla-run-fp: + <<: *valhalla-run + container_name: valhalla-run-fp ports: - 8002:8002 volumes: - - ./data/build/:/custom_files:rw - - ./data/valhalla.json:/custom_files/valhalla.json:rw - - ./data/valhalla.json:/data/valhalla.json:rw - - ./data/valhalla_sp.json:/data/valhalla_sp.json:rw - - ./data/valhalla_sp.json:/custom_files/valhalla_sp.json:rw + - ./data/build:/custom_files:rw - ./data/output:/data/output:rw - # Read only volumes to serve data to the container - - $HOME/.aws/credentials:/home/valhalla/.aws/credentials:ro - - ./data/src:/data/src:ro - ./data/input:/data/input:ro - ./data/intermediate:/data/intermediate:ro - - ./data/params.yaml:/data/params.yaml:ro - environment: - - GITHUB_SHA # For passing to the metadata output dataframe - - PYTHONUNBUFFERED="1" # For proper printing in GitHub Actions - - DOCKER_INTERNAL_PATH=/data # Used to set wd within Python scripts - - use_tiles_ignore_pbf=True - - update_existing_config=False - - force_rebuild=False - - tileset_name=tiles - - traffic_name=traffic + # All volumes are identical for first/second pass except the config file + - ./data/valhalla.json:/custom_files/valhalla.json:ro + + valhalla-run-sp: + <<: *valhalla-run + container_name: valhalla-run-sp + ports: + - 8003:8002 + volumes: + - ./data/build:/custom_files:rw + - ./data/output:/data/output:rw + - ./data/input:/data/input:ro + - ./data/intermediate:/data/intermediate:ro + - ./data/valhalla_sp.json:/custom_files/valhalla.json:ro