From 2a1a4e8268905a67916c01f10cb5f3dc414f4cb5 Mon Sep 17 00:00:00 2001 From: Sylvain Lemouzy Date: Fri, 2 Dec 2022 14:48:31 -0600 Subject: [PATCH] less memory usage when writing a dataframe. Instead of converting the whole DataFrame into a list of dicts (which can double the memory usage), uses a generator to iterate on the rows of the DataFrame converting each row one after the other. --- pandavro/__init__.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pandavro/__init__.py b/pandavro/__init__.py index 6d5b670..28d59e0 100644 --- a/pandavro/__init__.py +++ b/pandavro/__init__.py @@ -1,6 +1,6 @@ from collections import OrderedDict from pathlib import Path -from typing import Iterable, Optional +from typing import Any, Dict, Generator, Iterable, Optional import fastavro import numpy as np @@ -300,18 +300,24 @@ def from_avro(file_path_or_buffer, schema=None, na_dtypes=False, **kwargs): return read_avro(file_path_or_buffer, schema, na_dtypes=na_dtypes, **kwargs) -def __preprocess_dicts(l): - "Preprocess list of dicts inplace for fastavro compatibility" - for d in l: - for k, v in d.items(): +def __to_fastavro_records(df: pd.DataFrame) -> Generator[Dict[str, Any], None, None]: + "Converts a DataFrame to a fastavro record compatible iterable." + def preprocess_dict(record: Dict[str, Any]) -> Dict[str, Any]: + "Preprocess a dict inplace for fastavro record compatibility." + for k, v in record.items(): # Replace pd.NA with None so fastavro can write it if v is pd.NA: - d[k] = None + record[k] = None # Convert some Pandas dtypes to normal Python dtypes for pandas_type, converter in PANDAS_TO_PYTHON_TYPES.items(): if isinstance(v, pandas_type): - d[k] = converter(v) - return l + record[k] = converter(v) + return record + + record: Dict[str, Any] = dict() + for idx, data in df.iterrows(): + yield preprocess_dict(data.to_dict(into=record)) + record.clear() def to_avro(file_path_or_buffer, df, schema=None, append=False, @@ -329,18 +335,16 @@ def to_avro(file_path_or_buffer, df, schema=None, append=False, times_as_micros: If True (default), save datetimes with microseconds resolution. If False, save with millisecond resolution instead. kwargs: Keyword arguments to fastavro.writer - """ if schema is None: schema = schema_infer(df, times_as_micros) open_mode = 'wb' if not append else 'a+b' - records = __preprocess_dicts(df.to_dict('records')) - if isinstance(file_path_or_buffer, Path): file_path_or_buffer = str(file_path_or_buffer) + records = __to_fastavro_records(df) if isinstance(file_path_or_buffer, str): with open(file_path_or_buffer, open_mode) as f: fastavro.writer(f, schema=schema,