Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidLandup0 committed Dec 6, 2024
1 parent 3acca0f commit e619247
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions pytd/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,17 +462,16 @@ def write_dataframe(
_chunk_record_size = max(chunk_record_size, num_rows // 200)
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
range_func = (
futures = []
chunk_range = (
tqdm(
range(0, num_rows, _chunk_record_size),
desc="Chunking into msgpack",
desc="Chunking data",
)
if show_progress
else range(0, num_rows, _chunk_record_size)
)

futures = []
for start in range_func:
for start in chunk_range:
records = dataframe.iloc[
start : start + _chunk_record_size
].to_dict(orient="records")
Expand All @@ -489,7 +488,12 @@ def write_dataframe(
)
stack.callback(os.unlink, fp.name)
stack.callback(fp.close)
for start, future in sorted(futures):
resolve_range = (
tqdm(sorted(futures), desc="Resolving futures")
if show_progress
else sorted(futures)
)
for start, future in resolve_range:
fps.append(future.result())
except OSError as e:
raise RuntimeError(
Expand Down

0 comments on commit e619247

Please sign in to comment.