Skip to content

Commit

Permalink
less memory usage when writing a dataframe.
Browse files Browse the repository at this point in the history
Instead of converting the whole DataFrame into a list of dicts
(which can double the memory usage), uses a generator to iterate
on the rows of the DataFrame converting each row one after the other.
  • Loading branch information
slemouzy committed Dec 2, 2022
1 parent 6d60fe3 commit bd68b72
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions pandavro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import OrderedDict
from pathlib import Path
from typing import Optional, Iterable
from typing import Any, Dict, Generator, Iterable, Optional

import fastavro
import numpy as np
Expand Down Expand Up @@ -276,18 +276,24 @@ def from_avro(file_path_or_buffer, schema=None, na_dtypes=False, **kwargs):
return read_avro(file_path_or_buffer, schema, na_dtypes=na_dtypes, **kwargs)


def __preprocess_dicts(l):
"Preprocess list of dicts inplace for fastavro compatibility"
for d in l:
for k, v in d.items():
def __to_fastavro_records(df: pd.DataFrame) -> Generator[Dict[str, Any], None, None]:
"Converts a DataFrame to a fastavro record compatible iterable."
def preprocess_dict(record: Dict[str, Any]) -> Dict[str, Any]:
"Preprocess a dict inplace for fastavro record compatibility."
for k, v in record.items():
# Replace pd.NA with None so fastavro can write it
if v is pd.NA:
d[k] = None
record[k] = None
# Convert some Pandas dtypes to normal Python dtypes
for key, value in PANDAS_TO_PYTHON_TYPES.items():
if isinstance(v, key):
d[k] = value(v)
return l
for pandas_type, converter in PANDAS_TO_PYTHON_TYPES.items():
if isinstance(v, pandas_type):
record[k] = converter(v)
return record

record: Dict[str, Any] = dict()
for idx, data in df.iterrows():
yield preprocess_dict(data.to_dict(into=record))
record.clear()


def to_avro(file_path_or_buffer, df, schema=None, append=False,
Expand All @@ -305,18 +311,16 @@ def to_avro(file_path_or_buffer, df, schema=None, append=False,
times_as_micros: If True (default), save datetimes with microseconds resolution. If False, save with millisecond
resolution instead.
kwargs: Keyword arguments to fastavro.writer
"""
if schema is None:
schema = schema_infer(df, times_as_micros)

open_mode = 'wb' if not append else 'a+b'

records = __preprocess_dicts(df.to_dict('records'))

if isinstance(file_path_or_buffer, Path):
file_path_or_buffer = str(file_path_or_buffer)

records = __to_fastavro_records(df)
if isinstance(file_path_or_buffer, str):
with open(file_path_or_buffer, open_mode) as f:
fastavro.writer(f, schema=schema,
Expand Down

0 comments on commit bd68b72

Please sign in to comment.