Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

less memory usage when writing a dataframe #45

Merged
merged 1 commit into from
Mar 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions pandavro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import OrderedDict
from pathlib import Path
from typing import Iterable, Optional
from typing import Any, Dict, Generator, Iterable, Optional

import fastavro
import numpy as np
Expand Down Expand Up @@ -300,18 +300,24 @@ def from_avro(file_path_or_buffer, schema=None, na_dtypes=False, **kwargs):
return read_avro(file_path_or_buffer, schema, na_dtypes=na_dtypes, **kwargs)


def __preprocess_dicts(l):
"Preprocess list of dicts inplace for fastavro compatibility"
for d in l:
for k, v in d.items():
def __to_fastavro_records(df: pd.DataFrame) -> Generator[Dict[str, Any], None, None]:
"Converts a DataFrame to a fastavro record compatible iterable."
def preprocess_dict(record: Dict[str, Any]) -> Dict[str, Any]:
"Preprocess a dict inplace for fastavro record compatibility."
for k, v in record.items():
# Replace pd.NA with None so fastavro can write it
if v is pd.NA:
d[k] = None
record[k] = None
# Convert some Pandas dtypes to normal Python dtypes
for pandas_type, converter in PANDAS_TO_PYTHON_TYPES.items():
if isinstance(v, pandas_type):
d[k] = converter(v)
return l
record[k] = converter(v)
return record

record: Dict[str, Any] = dict()
for idx, data in df.iterrows():
yield preprocess_dict(data.to_dict(into=record))
record.clear()


def to_avro(file_path_or_buffer, df, schema=None, append=False,
Expand All @@ -329,18 +335,16 @@ def to_avro(file_path_or_buffer, df, schema=None, append=False,
times_as_micros: If True (default), save datetimes with microseconds resolution. If False, save with millisecond
resolution instead.
kwargs: Keyword arguments to fastavro.writer

"""
if schema is None:
schema = schema_infer(df, times_as_micros)

open_mode = 'wb' if not append else 'a+b'

records = __preprocess_dicts(df.to_dict('records'))

if isinstance(file_path_or_buffer, Path):
file_path_or_buffer = str(file_path_or_buffer)

records = __to_fastavro_records(df)
if isinstance(file_path_or_buffer, str):
with open(file_path_or_buffer, open_mode) as f:
fastavro.writer(f, schema=schema,
Expand Down
Loading