diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..9cd900c70 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,20 @@ + + +# Changelog + +## Unreleased + +### Added + +* Support specifying the format that uploaded data will be stored in. +* Support storing uploaded tabular data in CSV format. +* Added a new HTTP endpoint, `PATCH /api/v1/table/partition/{path}` + supporting appending rows to a tabular dataset. +* Added a new method `DataFrameClient.append_partition`. + +### Removed + +### Changed + +### Fixed diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index 63446130a..22daa33da 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -251,8 +251,8 @@ async def test_write_dataframe_external_direct(a, tmpdir): management="external", assets=[ Asset( - parameter="data_uri", - num=None, + parameter="data_uris", + num=0, data_uri=data_uri, is_directory=False, ) diff --git a/tiled/_tests/test_object_cache.py b/tiled/_tests/test_object_cache.py index 1bebc1e7f..8ca335dd2 100644 --- a/tiled/_tests/test_object_cache.py +++ b/tiled/_tests/test_object_cache.py @@ -63,6 +63,7 @@ def test_eviction(): @pytest.mark.asyncio +@pytest.mark.xfail(reason="Object Cache pending removal") async def test_object_cache_hit_and_miss(tmpdir): with open(Path(tmpdir, "data.csv"), "w") as file: file.write( diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index 7b335fab4..8aaeda912 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -13,16 +13,18 @@ import pandas.testing import pytest import sparse +from pandas.testing import assert_frame_equal from ..catalog import in_memory +from ..catalog.adapter import CatalogContainerAdapter from ..client import Context, from_context, record_history +from ..mimetypes import PARQUET_MIMETYPE from ..queries import Key from ..server.app import build_app - -# from ..server.object_cache import WARNING_PANDAS_BLOCKS -from ..structures.core import Spec +from ..structures.core import Spec, StructureFamily from ..structures.data_source import DataSource from ..structures.sparse import COOStructure +from ..structures.table import TableStructure from ..validation_registration import ValidationRegistry from .utils import fail_with_status_code @@ -454,3 +456,95 @@ async def test_container_export(tree, buffer): a = client.create_container("a") a.write_array([1, 2, 3], key="b") client.export(buffer, format="application/json") + + +def test_write_with_specified_mimetype(tree): + with Context.from_app(build_app(tree)) as context: + client = from_context(context, include_data_sources=True) + df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) + structure = TableStructure.from_pandas(df) + + for mimetype in [PARQUET_MIMETYPE, "text/csv"]: + x = client.new( + "table", + [ + DataSource( + structure_family=StructureFamily.table, + structure=structure, + mimetype=mimetype, + ), + ], + ) + x.write_partition(df, 0) + x.read() + x.refresh() + x.data_sources()[0]["mimetype"] == mimetype + + # Specifying unsupported mimetype raises expected error. + with fail_with_status_code(415): + client.new( + "table", + [ + DataSource( + structure_family=StructureFamily.table, + structure=structure, + mimetype="application/x-does-not-exist", + ), + ], + ) + + +@pytest.mark.parametrize( + "orig_file, file_toappend, expected_file", + [ + ( + {"A": [1, 2, 3], "B": [4, 5, 6]}, + {"A": [11, 12, 13], "B": [14, 15, 16]}, + {"A": [1, 2, 3, 11, 12, 13], "B": [4, 5, 6, 14, 15, 16]}, + ), + ( + {"A": [1.2, 2.5, 3.7], "B": [4.6, 5.8, 6.9]}, + {"A": [11.2, 12.5, 13.7], "B": [14.6, 15.8, 16.9]}, + { + "A": [1.2, 2.5, 3.7, 11.2, 12.5, 13.7], + "B": [4.6, 5.8, 6.9, 14.6, 15.8, 16.9], + }, + ), + ( + {"C": ["x", "y"], "D": ["a", "b"]}, + {"C": ["xx", "yy", "zz"], "D": ["aa", "bb", "cc"]}, + {"C": ["x", "y", "xx", "yy", "zz"], "D": ["a", "b", "aa", "bb", "cc"]}, + ), + ], +) +def test_append_partition( + tree: CatalogContainerAdapter, + orig_file: dict, + file_toappend: dict, + expected_file: dict, +): + with Context.from_app(build_app(tree)) as context: + client = from_context(context, include_data_sources=True) + df = pandas.DataFrame(orig_file) + structure = TableStructure.from_pandas(df) + + x = client.new( + "table", + [ + DataSource( + structure_family="table", + structure=structure, + mimetype="text/csv", + ), + ], + key="x", + ) + x.write(df) + + df2 = pandas.DataFrame(file_toappend) + + x.append_partition(df2, 0) + + df3 = pandas.DataFrame(expected_file) + + assert_frame_equal(x.read(), df3, check_dtype=False) diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index 344bbbf5d..f58fb3860 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -1,7 +1,13 @@ +from pathlib import Path + import dask.dataframe from ..server.object_cache import NO_CACHE, get_object_cache -from ..utils import path_from_uri +from ..structures.core import StructureFamily +from ..structures.data_source import Asset, DataSource, Management +from ..structures.table import TableStructure +from ..utils import ensure_uri, path_from_uri +from .array import ArrayAdapter from .dataframe import DataFrameAdapter @@ -47,3 +53,125 @@ def read_csv( """ + ( dask.dataframe.read_csv.__doc__ or "" ) + + +class CSVAdapter: + structure_family = StructureFamily.table + + def __init__( + self, + data_uris, + structure=None, + metadata=None, + specs=None, + access_policy=None, + ): + # TODO Store data_uris instead and generalize to non-file schemes. + self._partition_paths = [path_from_uri(uri) for uri in data_uris] + self._metadata = metadata or {} + if structure is None: + table = dask.dataframe.read_csv(self._partition_paths) + structure = TableStructure.from_dask_dataframe(table) + self._structure = structure + self.specs = list(specs or []) + self.access_policy = access_policy + + def metadata(self): + return self._metadata + + @property + def dataframe_adapter(self): + partitions = [] + for path in self._partition_paths: + if not Path(path).exists(): + partition = None + else: + partition = dask.dataframe.read_csv(path) + partitions.append(partition) + return DataFrameAdapter(partitions, self._structure) + + @classmethod + def init_storage(cls, data_uri, structure): + from ..server.schemas import Asset + + directory = path_from_uri(data_uri) + directory.mkdir(parents=True, exist_ok=True) + assets = [ + Asset( + data_uri=f"{data_uri}/partition-{i}.csv", + is_directory=False, + parameter="data_uris", + num=i, + ) + for i in range(structure.npartitions) + ] + return assets + + def append_partition(self, data, partition): + uri = self._partition_paths[partition] + data.to_csv(uri, index=False, mode="a", header=False) + + def write_partition(self, data, partition): + uri = self._partition_paths[partition] + data.to_csv(uri, index=False) + + def write(self, data): + if self.structure().npartitions != 1: + raise NotImplementedError + uri = self._partition_paths[0] + data.to_csv(uri, index=False) + + def read(self, *args, **kwargs): + return self.dataframe_adapter.read(*args, **kwargs) + + def read_partition(self, *args, **kwargs): + return self.dataframe_adapter.read_partition(*args, **kwargs) + + def structure(self): + return self._structure + + def get(self, key): + if key not in self.structure().columns: + return None + return ArrayAdapter.from_array(self.read([key])[key].values) + + def generate_data_sources(self, mimetype, dict_or_none, item, is_directory): + return [ + DataSource( + structure_family=self.dataframe_adapter.structure_family, + mimetype=mimetype, + structure=dict_or_none(self.dataframe_adapter.structure()), + parameters={}, + management=Management.external, + assets=[ + Asset( + data_uri=ensure_uri(item), + is_directory=is_directory, + parameter="data_uris", # <-- PLURAL! + num=0, # <-- denoting that the Adapter expects a list, and this is the first element + ) + ], + ) + ] + + @classmethod + def from_single_file( + cls, data_uri, structure=None, metadata=None, specs=None, access_policy=None + ): + return cls( + [data_uri], + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + + def __getitem__(self, key): + # Must compute to determine shape. + return ArrayAdapter.from_array(self.read([key])[key].values) + + def items(self): + yield from ( + (key, ArrayAdapter.from_array(self.read([key])[key].values)) + for key in self._structure.columns + ) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 661f7c414..909fabc21 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -72,18 +72,21 @@ StructureFamily.table: PARQUET_MIMETYPE, StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE, } -DEFAULT_INIT_STORAGE = OneShotCachedMap( +INIT_STORAGE = OneShotCachedMap( { - StructureFamily.array: lambda: importlib.import_module( + ZARR_MIMETYPE: lambda: importlib.import_module( "...adapters.zarr", __name__ ).ZarrArrayAdapter.init_storage, - StructureFamily.awkward: lambda: importlib.import_module( + AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module( "...adapters.awkward_buffers", __name__ ).AwkwardBuffersAdapter.init_storage, - StructureFamily.table: lambda: importlib.import_module( + PARQUET_MIMETYPE: lambda: importlib.import_module( "...adapters.parquet", __name__ ).ParquetDatasetAdapter.init_storage, - StructureFamily.sparse: lambda: importlib.import_module( + "text/csv": lambda: importlib.import_module( + "...adapters.csv", __name__ + ).CSVAdapter.init_storage, + SPARSE_BLOCKS_PARQUET_MIMETYPE: lambda: importlib.import_module( "...adapters.sparse_blocks_parquet", __name__ ).SparseBlocksParquetAdapter.init_storage, } @@ -412,62 +415,54 @@ async def lookup_adapter( ) async def get_adapter(self): - num_data_sources = len(self.data_sources) - if num_data_sources > 1: - raise NotImplementedError - if num_data_sources == 1: - (data_source,) = self.data_sources - try: - adapter_factory = self.context.adapters_by_mimetype[ - data_source.mimetype - ] - except KeyError: - raise RuntimeError( - f"Server configuration has no adapter for mimetype {data_source.mimetype!r}" + (data_source,) = self.data_sources + try: + adapter_factory = self.context.adapters_by_mimetype[data_source.mimetype] + except KeyError: + raise RuntimeError( + f"Server configuration has no adapter for mimetype {data_source.mimetype!r}" + ) + parameters = collections.defaultdict(list) + for asset in data_source.assets: + if asset.parameter is None: + continue + scheme = urlparse(asset.data_uri).scheme + if scheme != "file": + raise NotImplementedError( + f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}" ) - parameters = collections.defaultdict(list) - for asset in data_source.assets: - if asset.parameter is None: - continue - scheme = urlparse(asset.data_uri).scheme - if scheme != "file": - raise NotImplementedError( - f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}" - ) - if scheme == "file": - # Protect against misbehaving clients reading from unintended - # parts of the filesystem. - asset_path = path_from_uri(asset.data_uri) - for readable_storage in self.context.readable_storage: - if Path( - os.path.commonpath( - [path_from_uri(readable_storage), asset_path] - ) - ) == path_from_uri(readable_storage): - break - else: - raise RuntimeError( - f"Refusing to serve {asset.data_uri} because it is outside " - "the readable storage area for this server." + if scheme == "file": + # Protect against misbehaving clients reading from unintended + # parts of the filesystem. + asset_path = path_from_uri(asset.data_uri) + for readable_storage in self.context.readable_storage: + if Path( + os.path.commonpath( + [path_from_uri(readable_storage), asset_path] ) - if asset.num is None: - parameters[asset.parameter] = asset.data_uri + ) == path_from_uri(readable_storage): + break else: - parameters[asset.parameter].append(asset.data_uri) - adapter_kwargs = dict(parameters) - adapter_kwargs.update(data_source.parameters) - adapter_kwargs["specs"] = self.node.specs - adapter_kwargs["metadata"] = self.node.metadata_ - adapter_kwargs["structure"] = data_source.structure - adapter_kwargs["access_policy"] = self.access_policy - adapter = await anyio.to_thread.run_sync( - partial(adapter_factory, **adapter_kwargs) - ) - for query in self.queries: - adapter = adapter.search(query) - return adapter - else: # num_data_sources == 0 - assert False + raise RuntimeError( + f"Refusing to serve {asset.data_uri} because it is outside " + "the readable storage area for this server." + ) + if asset.num is None: + parameters[asset.parameter] = asset.data_uri + else: + parameters[asset.parameter].append(asset.data_uri) + adapter_kwargs = dict(parameters) + adapter_kwargs.update(data_source.parameters) + adapter_kwargs["specs"] = self.node.specs + adapter_kwargs["metadata"] = self.node.metadata_ + adapter_kwargs["structure"] = data_source.structure + adapter_kwargs["access_policy"] = self.access_policy + adapter = await anyio.to_thread.run_sync( + partial(adapter_factory, **adapter_kwargs) + ) + for query in self.queries: + adapter = adapter.search(query) + return adapter def new_variation( self, @@ -597,14 +592,23 @@ async def create_node( if data_source.management != Management.external: if structure_family == StructureFamily.container: raise NotImplementedError(structure_family) - data_source.mimetype = DEFAULT_CREATION_MIMETYPE[ - data_source.structure_family - ] + if data_source.mimetype is None: + data_source.mimetype = DEFAULT_CREATION_MIMETYPE[ + data_source.structure_family + ] data_source.parameters = {} data_uri = str(self.context.writable_storage) + "".join( f"/{quote_plus(segment)}" for segment in (self.segments + [key]) ) - init_storage = DEFAULT_INIT_STORAGE[data_source.structure_family] + if data_source.mimetype not in INIT_STORAGE: + raise HTTPException( + status_code=415, + detail=( + f"The given data source mimetype, {data_source.mimetype}, " + "is not one that the Tiled server knows how to write." + ), + ) + init_storage = INIT_STORAGE[data_source.mimetype] assets = await ensure_awaitable( init_storage, data_uri, data_source.structure ) @@ -954,6 +958,9 @@ class CatalogSparseAdapter(CatalogArrayAdapter): class CatalogTableAdapter(CatalogNodeAdapter): + async def get(self, *args, **kwargs): + return (await self.get_adapter()).get(*args, **kwargs) + async def read(self, *args, **kwargs): return await ensure_awaitable((await self.get_adapter()).read, *args, **kwargs) @@ -970,6 +977,11 @@ async def write_partition(self, *args, **kwargs): (await self.get_adapter()).write_partition, *args, **kwargs ) + async def append_partition(self, *args, **kwargs): + return await ensure_awaitable( + (await self.get_adapter()).append_partition, *args, **kwargs + ) + def delete_asset(data_uri, is_directory): url = urlparse(data_uri) diff --git a/tiled/client/container.py b/tiled/client/container.py index 55a9dc59e..bfb8e821c 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -612,10 +612,13 @@ def new( body = dict(item["attributes"]) if key is not None: body["id"] = key + + # if check: if any(data_source.assets for data_source in data_sources): endpoint = self.uri.replace("/metadata/", "/register/", 1) else: endpoint = self.uri + document = handle_error( self.context.http_client.post( endpoint, diff --git a/tiled/client/dataframe.py b/tiled/client/dataframe.py index dd83b73a0..0287a8773 100644 --- a/tiled/client/dataframe.py +++ b/tiled/client/dataframe.py @@ -221,6 +221,15 @@ def write_partition(self, dataframe, partition): ) ) + def append_partition(self, dataframe, partition): + handle_error( + self.context.http_client.patch( + self.item["links"]["partition"].format(index=partition), + content=bytes(serialize_arrow(dataframe, {})), + headers={"Content-Type": APACHE_ARROW_FILE_MIME_TYPE}, + ) + ) + def export(self, filepath, columns=None, *, format=None): """ Download data in some format and write to a file. diff --git a/tiled/client/register.py b/tiled/client/register.py index f28dbb042..97271e50b 100644 --- a/tiled/client/register.py +++ b/tiled/client/register.py @@ -9,7 +9,10 @@ import httpx import watchfiles -from ..mimetypes import DEFAULT_ADAPTERS_BY_MIMETYPE, DEFAULT_MIMETYPES_BY_FILE_EXT +from ..mimetypes import ( + DEFAULT_MIMETYPES_BY_FILE_EXT, + DEFAULT_REGISTERATION_ADAPTERS_BY_MIMETYPE, +) from ..structures.core import StructureFamily from ..structures.data_source import Asset, DataSource, Management from ..utils import ensure_uri, import_object @@ -107,7 +110,7 @@ def init( if isinstance(value, str): adapters_by_mimetype[key] = import_object(value) merged_adapters_by_mimetype = collections.ChainMap( - adapters_by_mimetype, DEFAULT_ADAPTERS_BY_MIMETYPE + adapters_by_mimetype, DEFAULT_REGISTERATION_ADAPTERS_BY_MIMETYPE ) if isinstance(key_from_filename, str): key_from_filename = import_object(key_from_filename) @@ -301,13 +304,15 @@ async def register_single_item( logger.exception(" SKIPPED: Error constructing adapter for '%s':", item) return key = settings.key_from_filename(item.name) - return await create_node_or_drop_collision( - node, - key=key, - structure_family=adapter.structure_family, - metadata=dict(adapter.metadata()), - specs=adapter.specs, - data_sources=[ + if hasattr(adapter, "generate_data_sources"): + # Let the Adapter describe the DataSouce(s). + data_sources = adapter.generate_data_sources( + mimetype, dict_or_none, item, is_directory + ) + else: + # Back-compat: Assume one Asset passed as a + # parameter named 'data_uri'. + data_sources = [ DataSource( structure_family=adapter.structure_family, mimetype=mimetype, @@ -322,7 +327,14 @@ async def register_single_item( ) ], ) - ], + ] + return await create_node_or_drop_collision( + node, + key=key, + structure_family=adapter.structure_family, + metadata=dict(adapter.metadata()), + specs=adapter.specs, + data_sources=data_sources, ) diff --git a/tiled/mimetypes.py b/tiled/mimetypes.py index ce67040a4..fddb6eda2 100644 --- a/tiled/mimetypes.py +++ b/tiled/mimetypes.py @@ -1,3 +1,4 @@ +import copy import importlib from .serialization.table import XLSX_MIME_TYPE @@ -20,7 +21,10 @@ ).TiffSequenceAdapter.from_uris, "text/csv": lambda: importlib.import_module( "..adapters.csv", __name__ - ).read_csv, + ).CSVAdapter, + # "text/csv": lambda: importlib.import_module( + # "..adapters.csv", __name__ + # ).CSVAdapter.from_single_file, XLSX_MIME_TYPE: lambda: importlib.import_module( "..adapters.excel", __name__ ).ExcelAdapter.from_uri, @@ -45,6 +49,16 @@ } ) +DEFAULT_REGISTERATION_ADAPTERS_BY_MIMETYPE = copy.deepcopy(DEFAULT_ADAPTERS_BY_MIMETYPE) + +DEFAULT_REGISTERATION_ADAPTERS_BY_MIMETYPE.set( + "text/csv", + lambda: importlib.import_module( + "..adapters.csv", __name__ + ).CSVAdapter.from_single_file, +) + + # We can mostly rely on mimetypes.types_map for the common ones # ('.csv' -> 'text/csv', etc.) but we supplement here for some # of the more exotic ones that not all platforms know about. diff --git a/tiled/serialization/table.py b/tiled/serialization/table.py index b205a230c..999ce1aa7 100644 --- a/tiled/serialization/table.py +++ b/tiled/serialization/table.py @@ -43,6 +43,13 @@ def serialize_csv(df, metadata, preserve_index=False): return file.getvalue().encode() +@deserialization_registry.register(StructureFamily.table, "text/csv") +def deserialize_csv(buffer): + import pandas + + return pandas.read_csv(io.BytesIO(buffer), header=None) + + serialization_registry.register(StructureFamily.table, "text/csv", serialize_csv) serialization_registry.register( StructureFamily.table, "text/x-comma-separated-values", serialize_csv diff --git a/tiled/server/router.py b/tiled/server/router.py index f017c144f..a09a85540 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -1298,6 +1298,25 @@ async def put_table_partition( return json_or_msgpack(request, None) +@router.patch("/table/partition/{path:path}") +async def patch_table_partition( + partition: int, + request: Request, + entry=SecureEntry(scopes=["write:data"]), + deserialization_registry=Depends(get_deserialization_registry), +): + if not hasattr(entry, "write_partition"): + raise HTTPException( + status_code=405, detail="This node does not supporting writing a partition." + ) + body = await request.body() + media_type = request.headers["content-type"] + deserializer = deserialization_registry.dispatch(StructureFamily.table, media_type) + data = await ensure_awaitable(deserializer, body) + await ensure_awaitable(entry.append_partition, data, partition) + return json_or_msgpack(request, None) + + @router.put("/awkward/full/{path:path}") async def put_awkward_full( request: Request,