Skip to content

Commit

Permalink
Improve parallelism and memory management (#8)
Browse files Browse the repository at this point in the history
* Remove first-pass Docker container on second pass

* Spin down first-pass Valhalla container before second pass

* Reduce chunk and batch size

* Upgrade s3fs to fix MultiPartUpload chunksize

fsspec/s3fs#789

* Disable Valhalla shortcuts

See: valhalla/valhalla#4956

* Add flag to use fixed_upload_size

* Bump swapfile size to fix OoM errors
  • Loading branch information
dfsnow authored Dec 31, 2024
1 parent b0c0ab0 commit 40f8b46
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 22 deletions.
17 changes: 16 additions & 1 deletion .github/workflows/calculate-times.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ jobs:
echo "chunks=$(cat chunks.txt)" >> $GITHUB_OUTPUT
# If override chunks are set, use those instead
chunks_parsed=($(echo "$chunks" | jq -r '.[]'))
chunks_parsed=($(cat chunks.txt | jq -r '.[]'))
if [ -n "${{ inputs.override_chunks }}" ]; then
override_chunks_parsed=($(echo "${{ inputs.override_chunks }}" | tr -d ' ' | tr ',' ' '))
for chunk in "${override_chunks_parsed[@]}"; do
Expand Down Expand Up @@ -280,6 +280,21 @@ jobs:
tar -xf ./build/valhalla_tiles.tar.zst -C ./build
rm -f ./build/valhalla_tiles.tar.zst
# In rare cases the runner gets killed due to OoM errors. This bumps swap
# to 90% of the space remaining on disk
- name: Increase swapfile
run: |
space_left=$(df /dev/root -B 1 --output=avail | grep -v Avail)
space_mult=0.9
space_alloc=$(echo "${space_left}*${space_mult}" | bc)
space_alloc_rnd=$(printf %.0f $(echo ${space_alloc}))
sudo swapoff -a
sudo fallocate -l ${space_alloc_rnd} /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
sudo swapon --show
- name: Run job chunk
shell: bash
working-directory: 'data'
Expand Down
4 changes: 2 additions & 2 deletions data/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ actions:

# The minimum number of origins to include in a job. Higher = fewer jobs
# that take longer. Lower = more jobs that finish quicker
origin_min_chunk_size: 900
origin_min_chunk_size: 1000

# The max number of destination splits to create for a workflow
destination_n_chunks: 4
Expand All @@ -46,7 +46,7 @@ times:

# Maximum size of chunk of origins AND destinations to process in a single call to
# Valhalla. This is necessary because larger matrices will cause it to choke
max_split_size: 150
max_split_size: 100

# Coordinates are snapped to the OSM street network before time calculation.
# Setting this to true will use the snapped coordinates directly in the
Expand Down
4 changes: 3 additions & 1 deletion data/src/calculate_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ def main() -> None:
inputs.destinations, config.args.mode
)

# Calculate times for each chunk and return a single DataFrame
# Calculate times for each chunk and return a single DataFrame. Assumes
# there are Valhalla services running locally at localhost:8002 (and :8003
# if second-pass is enabled)
logger.info("Tiles loaded and coodinates ready, starting routing")
tt_calc = TravelTimeCalculator(config, inputs)
results_df = tt_calc.many_to_many()
Expand Down
34 changes: 27 additions & 7 deletions data/src/utils/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import os
import re
import shutil
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
Expand Down Expand Up @@ -104,7 +106,11 @@ def __init__(
self.compression_level: int = compression_level
self.endpoint_url: str | None = endpoint_url
self.storage_options = {
"s3": {"client_kwargs": {"endpoint_url": endpoint_url}},
"s3": {
# https://github.com/fsspec/s3fs/pull/888
"client_kwargs": {"endpoint_url": endpoint_url},
"fixed_upload_size": True,
},
"local": {},
}

Expand Down Expand Up @@ -331,6 +337,7 @@ def __init__(
args: argparse.Namespace,
params: dict,
logger: logging.Logger,
ncpu: int | None = None,
verbose: bool = False,
) -> None:
self.args = TravelTimeArgs(args, params)
Expand All @@ -344,6 +351,7 @@ def __init__(
endpoint_url=self.params["s3"]["endpoint_url"],
)
self.logger = logger
self.ncpu = ncpu if ncpu else os.cpu_count()
self.verbose = verbose

def _load_od_file(self, path: str) -> pd.DataFrame:
Expand Down Expand Up @@ -566,7 +574,7 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame:
m_spl_d = self.inputs.max_split_size_destinations
n_dc = self.inputs.n_destinations

with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
with ThreadPoolExecutor(self.config.ncpu) as executor:
futures = []
for o in range(0, n_oc, max_spl_o):
for d in range(0, n_dc, m_spl_d):
Expand Down Expand Up @@ -608,6 +616,22 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame:
# Check for completeness in the output. If any times are missing
# after the first pass, run a second pass with the fallback router
if results_df.isnull().values.any() and second_pass:
# Stop the first-pass Docker container with the goal of freeing
# the memory used by Valhalla caching
if shutil.which("docker"):
self.config.logger.info(
"Stopping first-pass Valhalla Docker container"
)
subprocess.run(
["docker", "compose", "down", "valhalla-run-fp"],
check=True,
text=True,
)
else:
self.config.logger.warning(
"Tried to stop the first-pass Valhalla Docker container, "
"but Docker is not installed on this machine"
)
missing = results_df[results_df["duration_sec"].isnull()]
self.config.logger.info(
"Starting second pass for %s missing pairs (out of %s total)",
Expand Down Expand Up @@ -651,11 +675,7 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame:
o_ids = missing_set["origin_id"].unique()
d_ids = missing_set["destination_id"].unique()

# Don't use the all cores here as it tends to choke
ncpu = os.cpu_count()
ncpu = ncpu - 1 if ncpu is not None and ncpu > 1 else 1

with ThreadPoolExecutor(max_workers=ncpu) as executor:
with ThreadPoolExecutor(self.config.ncpu) as executor:
futures = []
for o in range(0, len(o_ids), max_spl_o):
for d in range(0, len(d_ids), m_spl_d):
Expand Down
2 changes: 1 addition & 1 deletion data/valhalla.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"max_cache_size": 3000000000,
"max_concurrent_reader_users": 1,
"reclassify_links": true,
"shortcuts": true,
"shortcuts": false,
"tile_dir": "/custom_files/tiles",
"tile_extract": "/custom_files/tiles.tar",
"timezone": "/custom_files/timezone_data/timezones.sqlite",
Expand Down
2 changes: 1 addition & 1 deletion data/valhalla_sp.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
"max_cache_size": 3000000000,
"max_concurrent_reader_users": 1,
"reclassify_links": true,
"shortcuts": true,
"shortcuts": false,
"tile_dir": "/custom_files/tiles",
"tile_extract": "/custom_files/tiles.tar",
"timezone": "/custom_files/timezone_data/timezones.sqlite",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ data = [
"pandas==2.2.3",
"pyarrow==17.0.0",
"pyyaml==6.0.2",
"requests==2.32.3"
"requests==2.32.3",
"s3fs==2024.12.0",
]
site = [
"boto3==1.35.35",
Expand Down
25 changes: 17 additions & 8 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 40f8b46

Please sign in to comment.