From a67d832ad73e9adaaa6842767e26d9b19024870b Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Sat, 21 Dec 2024 22:54:21 -0600 Subject: [PATCH] Limit CPU cores for second pass --- data/src/calculate_times.py | 4 ++-- data/src/utils/times.py | 15 ++++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/data/src/calculate_times.py b/data/src/calculate_times.py index 92842d1..6e7bed2 100644 --- a/data/src/calculate_times.py +++ b/data/src/calculate_times.py @@ -40,8 +40,8 @@ def main() -> None: chunk_msg = f", chunk: {config.args.chunk}" if config.args.chunk else "" logger.info( - "Starting routing for version: %s, mode: %s, year: %s, " - "geography: %s, state: %s, centroid type: %s%s", + "Starting times calculation with parameters: version=%s, " + "mode=%s, year=%s, geography=%s, state=%s, centroid_type=%s%s", config.params["times"]["version"], config.args.mode, config.args.year, diff --git a/data/src/utils/times.py b/data/src/utils/times.py index d364407..89fe997 100644 --- a/data/src/utils/times.py +++ b/data/src/utils/times.py @@ -610,7 +610,7 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame: if results_df.isnull().values.any() and second_pass: missing = results_df[results_df["duration_sec"].isnull()] self.config.logger.info( - "Starting second pass for %s missing pairs (%s total)", + "Starting second pass for %s missing pairs (out of %s total)", len(missing), len(results_df), ) @@ -650,9 +650,12 @@ def many_to_many(self, second_pass: bool = True) -> pd.DataFrame: self.config.logger.info("Routing missing set number %s", idx) o_ids = missing_set["origin_id"].unique() d_ids = missing_set["destination_id"].unique() - with ThreadPoolExecutor( - max_workers=os.cpu_count() - ) as executor: + + # Don't use the all cores here as it tends to choke + ncpu = os.cpu_count() + ncpu = ncpu - 1 if ncpu is not None and ncpu > 1 else 1 + + with ThreadPoolExecutor(max_workers=ncpu) as executor: futures = [] for o in range(0, len(o_ids), max_spl_o): for d in range(0, len(d_ids), m_spl_d): @@ -712,7 +715,9 @@ def snap_df_to_osm(df: pd.DataFrame, mode: str) -> pd.DataFrame: } ) - response = r.post("http://127.0.0.1:8002/locate", data=request_json) + response = r.post( + DOCKER_ENDPOINT_FIRST_PASS + "/locate", data=request_json + ) response_data = response.json() if response.status_code != 200: raise ValueError(response_data["error"])