Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet I/O housekeeping #140

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions spectrum_io/file/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@
import pyarrow.parquet as pq
import scipy

# TODO add sparse matrix / anndata support
# TODO add speed benchmarks
# TODO add support for HuggingFace datasets API

Pathlike = Union[Path, str]
Dataset = Union[pd.DataFrame, scipy.sparse.spmatrix]

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,36 +48,26 @@ def read_partition(path: Pathlike, dataset_name: str) -> pd.DataFrame:
return df


def write_file(data: Dataset, path: Pathlike) -> None:
def write_file(data: pd.DataFrame, path: Pathlike) -> None:
"""Writes a single DataFrame or matrix to a Parquet file.

:param data: Data to store
:param path: Path to write the Parquet file to

:raises NotImplementedError: if anything else but a Pandas DataFrame is used as the dataset
"""
if isinstance(data, pd.DataFrame):
data.to_parquet(path)
else:
raise NotImplementedError
data.to_parquet(path)


def write_partition(datasets: List[Dataset], path: Pathlike, dataset_names: List[str]) -> None:
def write_partition(datasets: List[pd.DataFrame], path: Pathlike, dataset_names: List[str]) -> None:
"""
Write several datasets to a Parquet dataset as a directory containing subdirectories partinioned by dataset name.
Write several datasets to a Parquet dataset as a directory containing subdirectories partitioned by dataset name.

:param datasets: Datasets to write
:param path: Root path to write the partitioned dataset to
:param dataset_names: Names to assign to the datasets for retrieval. Careful: If all of these are strings of ints,
Parquet will convert them to raw integers!

:raises NotImplementedError: if anything else but a Pandas DataFrame is used as the dataset
"""
if all(isinstance(x, pd.DataFrame) for x in datasets):
df = pd.concat([dataset.assign(dataset=name) for dataset, name in zip(datasets, dataset_names)])
table = pa.Table.from_pandas(df)
else:
raise NotImplementedError
df = pd.concat([dataset.assign(dataset=name) for dataset, name in zip(datasets, dataset_names)])
table = pa.Table.from_pandas(df)

if isinstance(path, str):
path = Path(path)
Expand Down
102 changes: 41 additions & 61 deletions tests/unit_tests/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import shutil
import sys
import tempfile
import unittest
from contextlib import nullcontext
from pathlib import Path

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
import scipy

if "typeguard" in sys.modules:
Expand All @@ -15,87 +16,66 @@
from spectrum_io.file import parquet


class TestParquet:
class TestParquet(unittest.TestCase):
"""Test class to check Parquet file I/O."""

def test_read_file(self, raw_data, tmpdir):
def setUp(self): # noqa: D102
# Simple toy MS data containing float, list[float], str, int, and list[int]
self.raw_data = {
"scan_number": [1, 234, 5678],
"intensities": [
[4e-5, 0.0, -1.0, 0.0, 0.0, -1.0, 0.03, 0.0, -1.0, 0.4],
[0.3, 0.0, -1.0, 1.0, 0.0, -1.0, 0.4, 0.0, -1.0, 0.05],
[0.04, 0.0, 0.0, 0.0, 0.0, 0.0, 2e-3, 0.0, 0.0, 0.13],
],
"sequence": ["SVFLTFLR", "KTSQIFLAK", "SPVGRVTPKEWR"],
"precursor_charge_onehot": [
[0, 1, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
],
"collision_energy_normed": [0.250827308624, 0.288798207462, 0.2887064038764],
}
self.temp_dir = Path(tempfile.mkdtemp())

def tearDown(self): # noqa: D102
shutil.rmtree(self.temp_dir)

def test_read_file(self):
"""Test read operation for a single dataset."""
output_path = Path(tmpdir / "table.parquet")
pq.write_table(pa.Table.from_pydict(raw_data), output_path)
output_path = self.temp_dir / "table.parquet"
pq.write_table(pa.Table.from_pydict(self.raw_data), output_path)
df = parquet.read_file(output_path)
output_path.unlink()
pd.testing.assert_frame_equal(df, pd.DataFrame(raw_data))
pd.testing.assert_frame_equal(df, pd.DataFrame(self.raw_data))

def test_write_file(self, raw_data, tmpdir):
def test_write_file(self):
"""Check write operation for a single dataset."""
output_path = Path(tmpdir / "table.parquet")
df = pd.DataFrame(raw_data)
output_path = self.temp_dir / "table.parquet"
df = pd.DataFrame(self.raw_data)
parquet.write_file(df, output_path)
pd.testing.assert_frame_equal(df, pd.read_parquet(output_path))
output_path.unlink()

def test_read_write_partition(self, raw_data, tmpdir):
def test_read_write_partition(self):
"""Check whether data is unmodified after being written to and then read from a partitioned dataset."""
output_path = Path(tmpdir / "partition")
df = pd.DataFrame(raw_data)
output_path = self.temp_dir / "partition"
df = pd.DataFrame(self.raw_data)
parquet.write_partition([df, df], output_path, ["dataset_1", "dataset_2"])
read_df = parquet.read_partition(output_path, "dataset_1")
shutil.rmtree(output_path)
pd.testing.assert_frame_equal(read_df, df)

def test_read_write_partition_integer_key(self, raw_data, tmpdir):
def test_read_write_partition_integer_key(self):
"""Check whether Parquet's under-the-hood conversion of string to integer keys is handled seamlessly."""
output_path = Path(tmpdir / "partition")
df = pd.DataFrame(raw_data)
output_path = self.temp_dir / "partition"
df = pd.DataFrame(self.raw_data)
parquet.write_partition([df, df], output_path, ["1", "2"])
read_df = parquet.read_partition(output_path, "1")
shutil.rmtree(output_path)
pd.testing.assert_frame_equal(read_df, df)

def test_modify_partition(self, raw_data, tmpdir):
def test_modify_partition(self):
"""Check whether file content stays the same when writing new data to the same partitioned directory."""
output_path = Path(tmpdir / "partition")
df = pd.DataFrame(raw_data)
output_path = self.temp_dir / "partition"
df = pd.DataFrame(self.raw_data)
parquet.write_partition([df, df], output_path, ["1", "2"])
parquet.write_partition([df, df, df], output_path, ["1", "2", "3"])
read_df = parquet.read_partition(output_path, "2")
shutil.rmtree(output_path)
pd.testing.assert_frame_equal(read_df, df)

def test_write_not_implemented(self, raw_data, tmpdir):
"""Check whether write_file() raises a NotImplementedError if provided with an unsupported object."""
with pytest.raises(NotImplementedError):
with suppress_type_checks() if "typeguard" in sys.modules else nullcontext():
output_path = Path(tmpdir / "table.parquet")
df = pd.DataFrame(raw_data).to_numpy()
parquet.write_file(df, output_path)
output_path.unlink()

def test_read_write_partition_not_implemented(self, raw_data, tmpdir):
"""Check whether write_partition() raises a NotImplementedError if provided with an unsupported object."""
with pytest.raises(NotImplementedError):
with suppress_type_checks() if "typeguard" in sys.modules else nullcontext():
output_path = Path(tmpdir / "partition")
df = pd.DataFrame(raw_data).to_numpy()
parquet.write_partition([df, df], output_path, ["dataset_1", "dataset_2"])
output_path.unlink()


@pytest.fixture
def raw_data():
"""Simple toy MS data containing float, list[float], str, int, and list[int]."""
return {
"scan_number": [1, 234, 5678],
"intensities": [
[4e-5, 0.0, -1.0, 0.0, 0.0, -1.0, 0.03, 0.0, -1.0, 0.4],
[0.3, 0.0, -1.0, 1.0, 0.0, -1.0, 0.4, 0.0, -1.0, 0.05],
[0.04, 0.0, 0.0, 0.0, 0.0, 0.0, 2e-3, 0.0, 0.0, 0.13],
],
"sequence": ["SVFLTFLR", "KTSQIFLAK", "SPVGRVTPKEWR"],
"precursor_charge_onehot": [
[0, 1, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
],
"collision_energy_normed": [0.250827308624, 0.288798207462, 0.2887064038764],
}
Loading