Skip to content

Commit

Permalink
Limit CPU cores for second pass
Browse files Browse the repository at this point in the history
  • Loading branch information
dfsnow committed Dec 22, 2024
1 parent 7b0b59b commit a67d832
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
4 changes: 2 additions & 2 deletions data/src/calculate_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def main() -> None:

chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else ""
logger.info(
"Starting routing for version: %s, mode: %s, year: %s, "
"geography: %s, state: %s, centroid type: %s%s",
"Starting times calculation with parameters: version=%s, "
"mode=%s, year=%s, geography=%s, state=%s, centroid_type=%s%s",
config.params["times"]["version"],
config.args.mode,
config.args.year,
Expand Down
15 changes: 10 additions & 5 deletions data/src/utils/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame:
if results_df.isnull().values.any() and second_pass:
missing = results_df[results_df["duration_sec"].isnull()]
self.config.logger.info(
"Starting second pass for %s missing pairs (%s total)",
"Starting second pass for %s missing pairs (out of %s total)",
len(missing),
len(results_df),
)
Expand Down Expand Up @@ -650,9 +650,12 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame:
self.config.logger.info("Routing missing set number %s", idx)
o_ids = missing_set["origin_id"].unique()
d_ids = missing_set["destination_id"].unique()
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:

# Don't use the all cores here as it tends to choke
ncpu = os.cpu_count()
ncpu = ncpu - 1 if ncpu is not None and ncpu > 1 else 1

with ThreadPoolExecutor(max_workers=ncpu) as executor:
futures = []
for o in range(0, len(o_ids), max_spl_o):
for d in range(0, len(d_ids), m_spl_d):
Expand Down Expand Up @@ -712,7 +715,9 @@ def snap_df_to_osm(df: pd.DataFrame, mode: str) -> pd.DataFrame:
}
)

response = r.post("http://127.0.0.1:8002/locate", data=request_json)
response = r.post(
DOCKER_ENDPOINT_FIRST_PASS + "/locate", data=request_json
)
response_data = response.json()
if response.status_code != 200:
raise ValueError(response_data["error"])
Expand Down

0 comments on commit a67d832

Please sign in to comment.