Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement appendable tabular data #679

Merged
merged 18 commits into from
Mar 15, 2024
Merged
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!-- Add the recent changes in the code under the relevant category.
Write the date in place of the "Unreleased" in the case a new version is released. -->

# Changelog

## Unreleased

### Added

* Support specifying the format that uploaded data will be stored in.
* Support storing uploaded tabular data in CSV format.
* Added a new HTTP endpoint, `PATCH /api/v1/table/partition/{path}`
supporting appending rows to a tabular dataset.
* Added a new method `DataFrameClient.append_partition`.

### Removed

### Changed

### Fixed
4 changes: 2 additions & 2 deletions tiled/_tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ async def test_write_dataframe_external_direct(a, tmpdir):
management="external",
assets=[
Asset(
parameter="data_uri",
num=None,
parameter="data_uris",
num=0,
data_uri=data_uri,
is_directory=False,
)
Expand Down
1 change: 1 addition & 0 deletions tiled/_tests/test_object_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def test_eviction():


@pytest.mark.asyncio
@pytest.mark.xfail(reason="Object Cache pending removal")
async def test_object_cache_hit_and_miss(tmpdir):
with open(Path(tmpdir, "data.csv"), "w") as file:
file.write(
Expand Down
100 changes: 97 additions & 3 deletions tiled/_tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import pandas.testing
import pytest
import sparse
from pandas.testing import assert_frame_equal

from ..catalog import in_memory
from ..catalog.adapter import CatalogContainerAdapter
from ..client import Context, from_context, record_history
from ..mimetypes import PARQUET_MIMETYPE
from ..queries import Key
from ..server.app import build_app

# from ..server.object_cache import WARNING_PANDAS_BLOCKS
from ..structures.core import Spec
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import DataSource
from ..structures.sparse import COOStructure
from ..structures.table import TableStructure
from ..validation_registration import ValidationRegistry
from .utils import fail_with_status_code

Expand Down Expand Up @@ -454,3 +456,95 @@ async def test_container_export(tree, buffer):
a = client.create_container("a")
a.write_array([1, 2, 3], key="b")
client.export(buffer, format="application/json")


def test_write_with_specified_mimetype(tree):
with Context.from_app(build_app(tree)) as context:
client = from_context(context, include_data_sources=True)
df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
structure = TableStructure.from_pandas(df)

for mimetype in [PARQUET_MIMETYPE, "text/csv"]:
x = client.new(
"table",
[
DataSource(
structure_family=StructureFamily.table,
structure=structure,
mimetype=mimetype,
),
],
)
x.write_partition(df, 0)
x.read()
x.refresh()
x.data_sources()[0]["mimetype"] == mimetype

# Specifying unsupported mimetype raises expected error.
with fail_with_status_code(415):
client.new(
"table",
[
DataSource(
structure_family=StructureFamily.table,
structure=structure,
mimetype="application/x-does-not-exist",
),
],
)


@pytest.mark.parametrize(
"orig_file, file_toappend, expected_file",
[
(
{"A": [1, 2, 3], "B": [4, 5, 6]},
{"A": [11, 12, 13], "B": [14, 15, 16]},
{"A": [1, 2, 3, 11, 12, 13], "B": [4, 5, 6, 14, 15, 16]},
),
(
{"A": [1.2, 2.5, 3.7], "B": [4.6, 5.8, 6.9]},
{"A": [11.2, 12.5, 13.7], "B": [14.6, 15.8, 16.9]},
{
"A": [1.2, 2.5, 3.7, 11.2, 12.5, 13.7],
"B": [4.6, 5.8, 6.9, 14.6, 15.8, 16.9],
},
),
(
{"C": ["x", "y"], "D": ["a", "b"]},
{"C": ["xx", "yy", "zz"], "D": ["aa", "bb", "cc"]},
{"C": ["x", "y", "xx", "yy", "zz"], "D": ["a", "b", "aa", "bb", "cc"]},
),
],
)
def test_append_partition(
tree: CatalogContainerAdapter,
orig_file: dict,
file_toappend: dict,
expected_file: dict,
):
with Context.from_app(build_app(tree)) as context:
client = from_context(context, include_data_sources=True)
df = pandas.DataFrame(orig_file)
structure = TableStructure.from_pandas(df)

x = client.new(
"table",
[
DataSource(
structure_family="table",
structure=structure,
mimetype="text/csv",
),
],
key="x",
)
x.write(df)

df2 = pandas.DataFrame(file_toappend)

x.append_partition(df2, 0)

df3 = pandas.DataFrame(expected_file)

assert_frame_equal(x.read(), df3, check_dtype=False)
130 changes: 129 additions & 1 deletion tiled/adapters/csv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from pathlib import Path

import dask.dataframe

from ..server.object_cache import NO_CACHE, get_object_cache
from ..utils import path_from_uri
from ..structures.core import StructureFamily
from ..structures.data_source import Asset, DataSource, Management
from ..structures.table import TableStructure
from ..utils import ensure_uri, path_from_uri
from .array import ArrayAdapter
from .dataframe import DataFrameAdapter


Expand Down Expand Up @@ -47,3 +53,125 @@ def read_csv(
""" + (
dask.dataframe.read_csv.__doc__ or ""
)


class CSVAdapter:
structure_family = StructureFamily.table

def __init__(
self,
data_uris,
structure=None,
metadata=None,
specs=None,
access_policy=None,
):
# TODO Store data_uris instead and generalize to non-file schemes.
self._partition_paths = [path_from_uri(uri) for uri in data_uris]
self._metadata = metadata or {}
if structure is None:
table = dask.dataframe.read_csv(self._partition_paths)
structure = TableStructure.from_dask_dataframe(table)
self._structure = structure
self.specs = list(specs or [])
self.access_policy = access_policy

def metadata(self):
return self._metadata

@property
def dataframe_adapter(self):
partitions = []
for path in self._partition_paths:
if not Path(path).exists():
partition = None
else:
partition = dask.dataframe.read_csv(path)
partitions.append(partition)
return DataFrameAdapter(partitions, self._structure)

@classmethod
def init_storage(cls, data_uri, structure):
from ..server.schemas import Asset

directory = path_from_uri(data_uri)
directory.mkdir(parents=True, exist_ok=True)
assets = [
Asset(
data_uri=f"{data_uri}/partition-{i}.csv",
is_directory=False,
parameter="data_uris",
num=i,
)
for i in range(structure.npartitions)
]
return assets

def append_partition(self, data, partition):
uri = self._partition_paths[partition]
data.to_csv(uri, index=False, mode="a", header=False)

def write_partition(self, data, partition):
uri = self._partition_paths[partition]
data.to_csv(uri, index=False)

def write(self, data):
if self.structure().npartitions != 1:
raise NotImplementedError
uri = self._partition_paths[0]
data.to_csv(uri, index=False)

def read(self, *args, **kwargs):
return self.dataframe_adapter.read(*args, **kwargs)

def read_partition(self, *args, **kwargs):
return self.dataframe_adapter.read_partition(*args, **kwargs)

def structure(self):
return self._structure

def get(self, key):
if key not in self.structure().columns:
return None
return ArrayAdapter.from_array(self.read([key])[key].values)

def generate_data_sources(self, mimetype, dict_or_none, item, is_directory):
return [
DataSource(
structure_family=self.dataframe_adapter.structure_family,
mimetype=mimetype,
structure=dict_or_none(self.dataframe_adapter.structure()),
parameters={},
management=Management.external,
assets=[
Asset(
data_uri=ensure_uri(item),
is_directory=is_directory,
parameter="data_uris", # <-- PLURAL!
num=0, # <-- denoting that the Adapter expects a list, and this is the first element
)
],
)
]

@classmethod
def from_single_file(
cls, data_uri, structure=None, metadata=None, specs=None, access_policy=None
):
return cls(
[data_uri],
structure=structure,
metadata=metadata,
specs=specs,
access_policy=access_policy,
)

def __getitem__(self, key):
# Must compute to determine shape.
return ArrayAdapter.from_array(self.read([key])[key].values)

def items(self):
yield from (
(key, ArrayAdapter.from_array(self.read([key])[key].values))
for key in self._structure.columns
)
Loading
Loading