From e61924763ee2cf56a6a36882677075bb748b0d86 Mon Sep 17 00:00:00 2001 From: DavidLandup0 Date: Fri, 6 Dec 2024 14:25:13 +0900 Subject: [PATCH] refactor --- pytd/writer.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pytd/writer.py b/pytd/writer.py index e9614eb..989dca5 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -462,17 +462,16 @@ def write_dataframe( _chunk_record_size = max(chunk_record_size, num_rows // 200) try: with ThreadPoolExecutor(max_workers=max_workers) as executor: - range_func = ( + futures = [] + chunk_range = ( tqdm( range(0, num_rows, _chunk_record_size), - desc="Chunking into msgpack", + desc="Chunking data", ) if show_progress else range(0, num_rows, _chunk_record_size) ) - - futures = [] - for start in range_func: + for start in chunk_range: records = dataframe.iloc[ start : start + _chunk_record_size ].to_dict(orient="records") @@ -489,7 +488,12 @@ def write_dataframe( ) stack.callback(os.unlink, fp.name) stack.callback(fp.close) - for start, future in sorted(futures): + resolve_range = ( + tqdm(sorted(futures), desc="Resolving futures") + if show_progress + else sorted(futures) + ) + for start, future in resolve_range: fps.append(future.result()) except OSError as e: raise RuntimeError(