Skip to content

Commit

Permalink
Merge pull request #45 from slemouzy/reduce-write-memory-usage
Browse files Browse the repository at this point in the history
less memory usage when writing a dataframe
  • Loading branch information
ynqa authored Mar 16, 2024
2 parents 0ebc59c + 2a1a4e8 commit 996a159
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions pandavro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import OrderedDict
from pathlib import Path
from typing import Iterable, Optional
from typing import Any, Dict, Generator, Iterable, Optional

import fastavro
import numpy as np
Expand Down Expand Up @@ -300,18 +300,24 @@ def from_avro(file_path_or_buffer, schema=None, na_dtypes=False, **kwargs):
return read_avro(file_path_or_buffer, schema, na_dtypes=na_dtypes, **kwargs)


def __preprocess_dicts(l):
"Preprocess list of dicts inplace for fastavro compatibility"
for d in l:
for k, v in d.items():
def __to_fastavro_records(df: pd.DataFrame) -> Generator[Dict[str, Any], None, None]:
"Converts a DataFrame to a fastavro record compatible iterable."
def preprocess_dict(record: Dict[str, Any]) -> Dict[str, Any]:
"Preprocess a dict inplace for fastavro record compatibility."
for k, v in record.items():
# Replace pd.NA with None so fastavro can write it
if v is pd.NA:
d[k] = None
record[k] = None
# Convert some Pandas dtypes to normal Python dtypes
for pandas_type, converter in PANDAS_TO_PYTHON_TYPES.items():
if isinstance(v, pandas_type):
d[k] = converter(v)
return l
record[k] = converter(v)
return record

record: Dict[str, Any] = dict()
for idx, data in df.iterrows():
yield preprocess_dict(data.to_dict(into=record))
record.clear()


def to_avro(file_path_or_buffer, df, schema=None, append=False,
Expand All @@ -329,18 +335,16 @@ def to_avro(file_path_or_buffer, df, schema=None, append=False,
times_as_micros: If True (default), save datetimes with microseconds resolution. If False, save with millisecond
resolution instead.
kwargs: Keyword arguments to fastavro.writer
"""
if schema is None:
schema = schema_infer(df, times_as_micros)

open_mode = 'wb' if not append else 'a+b'

records = __preprocess_dicts(df.to_dict('records'))

if isinstance(file_path_or_buffer, Path):
file_path_or_buffer = str(file_path_or_buffer)

records = __to_fastavro_records(df)
if isinstance(file_path_or_buffer, str):
with open(file_path_or_buffer, open_mode) as f:
fastavro.writer(f, schema=schema,
Expand Down

0 comments on commit 996a159

Please sign in to comment.