Skip to content

Commit

Permalink
merge master into feature branch
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidLandup0 committed Dec 6, 2024
2 parents 3c3d3b3 + 7357c4e commit 3acca0f
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions pytd/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,25 +461,36 @@ def write_dataframe(
# chunk number of records should not exceed 200 to avoid OSError
_chunk_record_size = max(chunk_record_size, num_rows // 200)
try:
range_func = (
tqdm(
range(0, num_rows, _chunk_record_size),
desc="Chunking into msgpack",
)
if show_progress
else range(0, num_rows, _chunk_record_size)
)
for start in range_func:
records = dataframe.iloc[
start : start + _chunk_record_size
].to_dict(orient="records")
fp = tempfile.NamedTemporaryFile(
suffix=".msgpack.gz", delete=False
with ThreadPoolExecutor(max_workers=max_workers) as executor:
range_func = (
tqdm(
range(0, num_rows, _chunk_record_size),
desc="Chunking into msgpack",
)
if show_progress
else range(0, num_rows, _chunk_record_size)
)
fp = self._write_msgpack_stream(records, fp)
fps.append(fp)
stack.callback(os.unlink, fp.name)
stack.callback(fp.close)

futures = []
for start in range_func:
records = dataframe.iloc[
start : start + _chunk_record_size
].to_dict(orient="records")
fp = tempfile.NamedTemporaryFile(
suffix=".msgpack.gz", delete=False
)
futures.append(
(
start,
executor.submit(
self._write_msgpack_stream, records, fp
),
)
)
stack.callback(os.unlink, fp.name)
stack.callback(fp.close)
for start, future in sorted(futures):
fps.append(future.result())
except OSError as e:
raise RuntimeError(
"failed to create a temporary file. "
Expand Down

0 comments on commit 3acca0f

Please sign in to comment.