Skip to content

Commit

Permalink
Show subprocess logs if verbose flag is set.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Dec 13, 2024
1 parent d5ef9aa commit 9e4c9cb
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 1 deletion.
2 changes: 1 addition & 1 deletion tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ def from_uri(
stdout = process.stdout.decode()
stderr = process.stderr.decode()
logger.info(f"Subprocess stdout: {stdout}")
logger.error(f"Subprocess stderr: {stderr}")
logger.info(f"Subprocess stderr: {stderr}")

parsed_url = make_url(uri)
if (parsed_url.get_dialect().name == "sqlite") and (
Expand Down
12 changes: 12 additions & 0 deletions tiled/client/local_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ..structures.core import StructureFamily


def local_copy(src_node, dest_node):
src_node = src_node.include_data_sources()
if (src_node.structure_family == StructureFamily.container) and (
src_node.data_sources is None
):
for key, value in src_node.items():
local_copy(value, dest_node)
else:
dest_node
237 changes: 237 additions & 0 deletions tiled/client/ragged.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import itertools

import dask
import dask.array
import numpy

from .base import BaseClient
from .utils import export_util, handle_error, params_from_slice


class RaggedClient(BaseClient):
"Describe and read (slices of) a ragged array"

def __init__(self, *args, item, **kwargs):
super().__init__(*args, item=item, **kwargs)

@property
def dims(self):
return self.structure().dims

@property
def shape(self):
return self.structure().shape

@property
def size(self):
return self.structure().size

@property
def dtype(self):
return self.structure().data_type.to_numpy_dtype()

@property
def nbytes(self):
structure = self.structure()
itemsize = structure.data_type.to_numpy_dtype().itemsize
return self.size * itemsize

@property
def chunks(self):
return self.structure().chunks

@property
def ndim(self):
return len(self.structure().shape)

def __repr__(self):
structure = self.structure()
attrs = {
"shape": structure.shape,
"chunks": structure.chunks,
"dtype": structure.data_type.to_numpy_dtype(),
}
if structure.dims:
attrs["dims"] = structure.dims
return (
f"<{type(self).__name__}"
+ "".join(f" {k}={v}" for k, v in attrs.items())
+ ">"
)

def __array__(self, *args, **kwargs):
return self.read().__array__(*args, **kwargs)

def _get_block(self, block, dtype, shape, slice=None):
"""
Fetch the actual data for one block in a chunked (dask) array.
See read_block() for a public version of this. This private version
enables more efficient multi-block access by requiring the caller to
pass in the structure (dtype, shape).
"""
media_type = "application/octet-stream"
if slice is not None:
# TODO The server accepts a slice parameter but we'll need to write
# careful code here to determine what the new shape will be.
raise NotImplementedError(
"Slicing less than one block is not yet supported."
)
if shape:
# Check for special case of shape with 0 in it.
if 0 in shape:
# This is valid, and it has come up in the wild. An array with
# 0 as one of the dimensions never contains data, so we can
# short-circuit here without any further information from the
# service.
return numpy.array([], dtype=dtype).reshape(shape)
expected_shape = ",".join(map(str, shape))
else:
expected_shape = "scalar"
content = handle_error(
self.context.http_client.get(
self.item["links"]["block"],
headers={"Accept": media_type},
params={
"block": ",".join(map(str, block)),
"expected_shape": expected_shape,
},
)
).read()
return numpy.frombuffer(content, dtype=dtype).reshape(shape)

def read_block(self, block, slice=None):
"""
Access the data for one block of this chunked (dask) array.
Optionally, access only a slice *within* this block.
"""
structure = self.structure()
chunks = structure.chunks
dtype = structure.data_type.to_numpy_dtype()
try:
shape = tuple(chunks[dim][i] for dim, i in enumerate(block))
except IndexError:
raise IndexError(f"Block index {block} out of range")
dask_array = dask.array.from_delayed(
dask.delayed(self._get_block)(block, dtype, shape), dtype=dtype, shape=shape
)
# TODO Make the request in _get_block include the slice so that we only
# fetch exactly the data that we want. This will require careful code
# to determine what the shape will be.
if slice is not None:
dask_array = dask_array[slice]
return dask_array

def read(self, slice=None):
"""
Access the entire array or a slice.
The array will be internally chunked with dask.
"""
structure = self.structure()
shape = structure.shape
dtype = structure.data_type.to_numpy_dtype()
# Build a client-side dask array whose chunks pull from a server-side
# dask array.
name = "remote-dask-array-" f"{self.uri}"
chunks = structure.chunks
# Count the number of blocks along each axis.
num_blocks = (range(len(n)) for n in chunks)
# Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... ---
# and build a dask task encoding the method for fetching its data from
# the server.
dask_tasks = {
(name,)
+ block: (
self._get_block,
block,
dtype,
tuple(chunks[dim][i] for dim, i in enumerate(block)),
)
for block in itertools.product(*num_blocks)
}
dask_array = dask.array.Array(
dask=dask_tasks, name=name, chunks=chunks, dtype=dtype, shape=shape
)
if slice is not None:
dask_array = dask_array[slice]
return dask_array

def write(self, array):
handle_error(
self.context.http_client.put(
self.item["links"]["full"],
content=array.tobytes(),
headers={"Content-Type": "application/octet-stream"},
)
)

def write_block(self, array, block):
handle_error(
self.context.http_client.put(
self.item["links"]["block"].format(*block),
content=array.tobytes(),
headers={"Content-Type": "application/octet-stream"},
)
)

def __getitem__(self, slice):
return self.read(slice)

# The default object.__iter__ works as expected here, no need to
# implemented it specifically.

def __len__(self):
# As with numpy, len(arr) is the size of the zeroth axis.
return self.structure().shape[0]

def export(
self, filepath, *, format=None, slice=None, link=None, template_vars=None
):
"""
Download data in some format and write to a file.
Parameters
----------
file: str or buffer
Filepath or writeable buffer.
format : str, optional
If format is None and `file` is a filepath, the format is inferred
from the name, like 'table.csv' implies format="text/csv". The format
may be given as a file extension ("csv") or a media type ("text/csv").
slice : List[slice], optional
List of slice objects. A convenient way to generate these is shown
in the examples.
link: str, optional
Used internally. Refers to a key in the dictionary of links sent
from the server.
template_vars: dict, optional
Used internally.
Examples
--------
Export all.
>>> a.export("numbers.csv")
Export an N-dimensional slice.
>>> import numpy
>>> a.export("numbers.csv", slice=numpy.s_[:10, 50:100])
"""
# For array, this is always 'full', but xarray clients set a custom link.
if link is None:
link = "full"
template_vars = template_vars or {}
params = params_from_slice(slice)
return export_util(
filepath,
format,
self.context.http_client.get,
self.item["links"][link].format(**template_vars),
params=params,
)


16 changes: 16 additions & 0 deletions tiled/commandline/_serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def serve_directory(
adapters_by_mimetype=adapters_by_mimetype,
)
if verbose:
from tiled.catalog.adapter import logger as catalog_logger

catalog_logger.addHandler(StreamHandler())
catalog_logger.setLevel("INFO")
register_logger.addHandler(StreamHandler())
register_logger.setLevel("INFO")
# Set the API key manually here, rather than letting the server do it,
Expand Down Expand Up @@ -344,6 +348,11 @@ def serve_catalog(
),
log_timestamps: bool = typer.Option(
False, help="Include timestamps in log output."
verbose: bool = typer.Option(
False,
"--verbose",
"-v",
help=("Log details of catalog creation."),
),
):
"Serve a catalog."
Expand Down Expand Up @@ -401,6 +410,13 @@ def serve_catalog(
write.mkdir()
# TODO Hook into server lifecycle hooks to delete this at shutdown.
elif database is None:
if verbose:
from logging import StreamHandler

from tiled.catalog.adapter import logger as catalog_logger

catalog_logger.addHandler(StreamHandler())
catalog_logger.setLevel("INFO")
typer.echo(
"""A catalog must be specified. Either use a temporary catalog:
Expand Down

0 comments on commit 9e4c9cb

Please sign in to comment.