Skip to content

Commit

Permalink
Implement appendable tabular data (#679)
Browse files Browse the repository at this point in the history
* append method in the client side

* Support writing tables as CSV.

* implemented appending to files

* fix bug in a test

* Add ability to parse file structure in .csvAdapter 📂

* fix some more bugs

* fix some more tests

* cleaned the files

* fixed black and flake errors

* implemented append method in the client side

* implemented append method in the client side

* added an integration test for the new append method

* Added a CHANGELOG file

* changed the append test to parametrized

* fixed failing string test

* fix append strng test

* changed the type check in assert_frame_equal

* Elaborate on changelog.

---------

Co-authored-by: Seher Karakuzu <skarakuzu@Sehers-MacBook-Pro.local>
Co-authored-by: Dan Allan <dallan@bnl.gov>
Co-authored-by: Seher Karakuzu <skarakuzu1@bnl.gov>
Co-authored-by: kari Barry <kezzsim@gmail.com>
  • Loading branch information
5 people authored Mar 15, 2024
1 parent 13e8f64 commit 428834d
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 79 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!-- Add the recent changes in the code under the relevant category.
Write the date in place of the "Unreleased" in the case a new version is released. -->

# Changelog

## Unreleased

### Added

* Support specifying the format that uploaded data will be stored in.
* Support storing uploaded tabular data in CSV format.
* Added a new HTTP endpoint, `PATCH /api/v1/table/partition/{path}`
supporting appending rows to a tabular dataset.
* Added a new method `DataFrameClient.append_partition`.

### Removed

### Changed

### Fixed
4 changes: 2 additions & 2 deletions tiled/_tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ async def test_write_dataframe_external_direct(a, tmpdir):
management="external",
assets=[
Asset(
parameter="data_uri",
num=None,
parameter="data_uris",
num=0,
data_uri=data_uri,
is_directory=False,
)
Expand Down
1 change: 1 addition & 0 deletions tiled/_tests/test_object_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def test_eviction():


@pytest.mark.asyncio
@pytest.mark.xfail(reason="Object Cache pending removal")
async def test_object_cache_hit_and_miss(tmpdir):
with open(Path(tmpdir, "data.csv"), "w") as file:
file.write(
Expand Down
100 changes: 97 additions & 3 deletions tiled/_tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import pandas.testing
import pytest
import sparse
from pandas.testing import assert_frame_equal

from ..catalog import in_memory
from ..catalog.adapter import CatalogContainerAdapter
from ..client import Context, from_context, record_history
from ..mimetypes import PARQUET_MIMETYPE
from ..queries import Key
from ..server.app import build_app

# from ..server.object_cache import WARNING_PANDAS_BLOCKS
from ..structures.core import Spec
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import DataSource
from ..structures.sparse import COOStructure
from ..structures.table import TableStructure
from ..validation_registration import ValidationRegistry
from .utils import fail_with_status_code

Expand Down Expand Up @@ -454,3 +456,95 @@ async def test_container_export(tree, buffer):
a = client.create_container("a")
a.write_array([1, 2, 3], key="b")
client.export(buffer, format="application/json")


def test_write_with_specified_mimetype(tree):
with Context.from_app(build_app(tree)) as context:
client = from_context(context, include_data_sources=True)
df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
structure = TableStructure.from_pandas(df)

for mimetype in [PARQUET_MIMETYPE, "text/csv"]:
x = client.new(
"table",
[
DataSource(
structure_family=StructureFamily.table,
structure=structure,
mimetype=mimetype,
),
],
)
x.write_partition(df, 0)
x.read()
x.refresh()
x.data_sources()[0]["mimetype"] == mimetype

# Specifying unsupported mimetype raises expected error.
with fail_with_status_code(415):
client.new(
"table",
[
DataSource(
structure_family=StructureFamily.table,
structure=structure,
mimetype="application/x-does-not-exist",
),
],
)


@pytest.mark.parametrize(
"orig_file, file_toappend, expected_file",
[
(
{"A": [1, 2, 3], "B": [4, 5, 6]},
{"A": [11, 12, 13], "B": [14, 15, 16]},
{"A": [1, 2, 3, 11, 12, 13], "B": [4, 5, 6, 14, 15, 16]},
),
(
{"A": [1.2, 2.5, 3.7], "B": [4.6, 5.8, 6.9]},
{"A": [11.2, 12.5, 13.7], "B": [14.6, 15.8, 16.9]},
{
"A": [1.2, 2.5, 3.7, 11.2, 12.5, 13.7],
"B": [4.6, 5.8, 6.9, 14.6, 15.8, 16.9],
},
),
(
{"C": ["x", "y"], "D": ["a", "b"]},
{"C": ["xx", "yy", "zz"], "D": ["aa", "bb", "cc"]},
{"C": ["x", "y", "xx", "yy", "zz"], "D": ["a", "b", "aa", "bb", "cc"]},
),
],
)
def test_append_partition(
tree: CatalogContainerAdapter,
orig_file: dict,
file_toappend: dict,
expected_file: dict,
):
with Context.from_app(build_app(tree)) as context:
client = from_context(context, include_data_sources=True)
df = pandas.DataFrame(orig_file)
structure = TableStructure.from_pandas(df)

x = client.new(
"table",
[
DataSource(
structure_family="table",
structure=structure,
mimetype="text/csv",
),
],
key="x",
)
x.write(df)

df2 = pandas.DataFrame(file_toappend)

x.append_partition(df2, 0)

df3 = pandas.DataFrame(expected_file)

assert_frame_equal(x.read(), df3, check_dtype=False)
130 changes: 129 additions & 1 deletion tiled/adapters/csv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from pathlib import Path

import dask.dataframe

from ..server.object_cache import NO_CACHE, get_object_cache
from ..utils import path_from_uri
from ..structures.core import StructureFamily
from ..structures.data_source import Asset, DataSource, Management
from ..structures.table import TableStructure
from ..utils import ensure_uri, path_from_uri
from .array import ArrayAdapter
from .dataframe import DataFrameAdapter


Expand Down Expand Up @@ -47,3 +53,125 @@ def read_csv(
""" + (
dask.dataframe.read_csv.__doc__ or ""
)


class CSVAdapter:
structure_family = StructureFamily.table

def __init__(
self,
data_uris,
structure=None,
metadata=None,
specs=None,
access_policy=None,
):
# TODO Store data_uris instead and generalize to non-file schemes.
self._partition_paths = [path_from_uri(uri) for uri in data_uris]
self._metadata = metadata or {}
if structure is None:
table = dask.dataframe.read_csv(self._partition_paths)
structure = TableStructure.from_dask_dataframe(table)
self._structure = structure
self.specs = list(specs or [])
self.access_policy = access_policy

def metadata(self):
return self._metadata

@property
def dataframe_adapter(self):
partitions = []
for path in self._partition_paths:
if not Path(path).exists():
partition = None
else:
partition = dask.dataframe.read_csv(path)
partitions.append(partition)
return DataFrameAdapter(partitions, self._structure)

@classmethod
def init_storage(cls, data_uri, structure):
from ..server.schemas import Asset

directory = path_from_uri(data_uri)
directory.mkdir(parents=True, exist_ok=True)
assets = [
Asset(
data_uri=f"{data_uri}/partition-{i}.csv",
is_directory=False,
parameter="data_uris",
num=i,
)
for i in range(structure.npartitions)
]
return assets

def append_partition(self, data, partition):
uri = self._partition_paths[partition]
data.to_csv(uri, index=False, mode="a", header=False)

def write_partition(self, data, partition):
uri = self._partition_paths[partition]
data.to_csv(uri, index=False)

def write(self, data):
if self.structure().npartitions != 1:
raise NotImplementedError
uri = self._partition_paths[0]
data.to_csv(uri, index=False)

def read(self, *args, **kwargs):
return self.dataframe_adapter.read(*args, **kwargs)

def read_partition(self, *args, **kwargs):
return self.dataframe_adapter.read_partition(*args, **kwargs)

def structure(self):
return self._structure

def get(self, key):
if key not in self.structure().columns:
return None
return ArrayAdapter.from_array(self.read([key])[key].values)

def generate_data_sources(self, mimetype, dict_or_none, item, is_directory):
return [
DataSource(
structure_family=self.dataframe_adapter.structure_family,
mimetype=mimetype,
structure=dict_or_none(self.dataframe_adapter.structure()),
parameters={},
management=Management.external,
assets=[
Asset(
data_uri=ensure_uri(item),
is_directory=is_directory,
parameter="data_uris", # <-- PLURAL!
num=0, # <-- denoting that the Adapter expects a list, and this is the first element
)
],
)
]

@classmethod
def from_single_file(
cls, data_uri, structure=None, metadata=None, specs=None, access_policy=None
):
return cls(
[data_uri],
structure=structure,
metadata=metadata,
specs=specs,
access_policy=access_policy,
)

def __getitem__(self, key):
# Must compute to determine shape.
return ArrayAdapter.from_array(self.read([key])[key].values)

def items(self):
yield from (
(key, ArrayAdapter.from_array(self.read([key])[key].values))
for key in self._structure.columns
)
Loading

0 comments on commit 428834d

Please sign in to comment.