Skip to content

Commit

Permalink
Set chunk size by mode (#9)
Browse files Browse the repository at this point in the history
* Set chunk size by mode

* Convert args to int when passed

* Stop n_chunk by mode
  • Loading branch information
dfsnow authored Jan 11, 2025
1 parent 2b2e1a0 commit 2d8e298
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 25 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/calculate-times.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ jobs:
export USER_ID=${{ env.USER_ID }}
export GROUP_ID=${{ env.GROUP_ID }}
uv run ./src/split_chunks.py \
--year ${{ inputs.year }} --geography ${{ inputs.geography }} \
--mode ${{ inputs.mode }} --year ${{ inputs.year }} \
--geography ${{ inputs.geography }} \
--state ${{ inputs.state }} > chunks.txt
echo "chunks=$(cat chunks.txt)" >> $GITHUB_OUTPUT
Expand Down
10 changes: 8 additions & 2 deletions data/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ actions:

# The minimum number of origins to include in a job. Higher = fewer jobs
# that take longer. Lower = more jobs that finish quicker
origin_min_chunk_size: 1000
origin_min_chunk_size:
auto: 1000
bicycle: 800
pedestrian: 800

# The max number of destination splits to create for a workflow
destination_n_chunks: 4

# The minimum number of destinations included in each job. For reference,
# most states have around 10K Census tract destinations
destination_min_chunk_size: 20000
destination_min_chunk_size:
auto: 20000
bicycle: 10000
pedestrian: 10000

times:
# Travel times output version. Follows SemVer (kind of):
Expand Down
58 changes: 36 additions & 22 deletions data/src/split_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@


def split_chunks(
mode: str,
year: str,
geography: str,
state: str,
origin_n_chunks: int = 64,
origin_min_chunk_size: int = 500,
destination_n_chunks: int = 4,
destination_min_chunk_size: int = 10000,
origin_n_chunks: str | None = None,
origin_min_chunk_size: str | None = None,
destination_n_chunks: str | None = None,
destination_min_chunk_size: str | None = None,
) -> None:
"""
Split Parquet files into N chunks, where each chunk is at least a certain
Expand All @@ -24,20 +25,19 @@ def split_chunks(
more OD pairs get squeezed into each job as origin_n_chunks decreases.
The maximum number of chunks is equal to
origin_n_chunks * destination_n_chunks. By default, it's 256, which is
the maximum number of GitHub jobs per workflow.
origin_n_chunks * destination_n_chunks. By default, all chunk settings
pull from the params.yaml file.
Args:
mode: Travel mode. Determines the chunk settings pulled
from params.yaml.
year: The year of the input origins data.
geography: The geography type of the origins data.
state: The two-digit state FIPS code of the origins data.
origin_n_chunks: The maximum number of origin chunks. Defaults to 128.
origin_n_chunks: The maximum number of origin chunks.
origin_min_chunk_size: The minimum size of each origin chunk.
Defaults to 500.
destination_n_chunks: The maximum number of destination chunks.
Defaults to 2.
origin_min_chunk_size: The minimum size of each destination chunk.
Defaults to 10000.
"""
origin_file = (
Path.cwd()
Expand All @@ -60,54 +60,68 @@ def split_chunks(

file_chunks = split_od_files_to_json(
origin_file=origin_file,
origin_n_chunks=origin_n_chunks,
origin_min_chunk_size=origin_min_chunk_size,
origin_n_chunks=int(
params["actions"]["origin_n_chunks"]
if not origin_n_chunks
else origin_n_chunks
),
origin_min_chunk_size=int(
params["actions"]["origin_min_chunk_size"][mode]
if not origin_min_chunk_size
else origin_min_chunk_size
),
destination_file=destination_file,
destination_n_chunks=destination_n_chunks,
destination_min_chunk_size=destination_min_chunk_size,
destination_n_chunks=int(
params["actions"]["destination_n_chunks"]
if not destination_n_chunks
else destination_n_chunks
),
destination_min_chunk_size=int(
params["actions"]["destination_min_chunk_size"][mode]
if not destination_min_chunk_size
else destination_min_chunk_size
),
)

print(file_chunks)


def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--mode", required=True, type=str)
parser.add_argument("--year", required=True, type=str)
parser.add_argument("--geography", required=True, type=str)
parser.add_argument("--state", required=True, type=str)
parser.add_argument(
"--origin_n_chunks",
required=False,
type=str,
default=params["actions"]["origin_n_chunks"],
)
parser.add_argument(
"--origin_min_chunk_size",
required=False,
type=str,
default=params["actions"]["origin_min_chunk_size"],
)
parser.add_argument(
"--destination_n_chunks",
required=False,
type=str,
default=params["actions"]["destination_n_chunks"],
)
parser.add_argument(
"--destination_min_chunk_size",
required=False,
type=str,
default=params["actions"]["destination_min_chunk_size"],
)
args = parser.parse_args()
split_chunks(
args.mode,
args.year,
args.geography,
args.state,
int(args.origin_n_chunks),
int(args.origin_min_chunk_size),
int(args.destination_n_chunks),
int(args.destination_min_chunk_size),
args.origin_n_chunks,
args.origin_min_chunk_size,
args.destination_n_chunks,
args.destination_min_chunk_size,
)


Expand Down

0 comments on commit 2d8e298

Please sign in to comment.