Skip to content

Commit

Permalink
fix: use dataset_version_id as deep-zoom spatial bucket directory nam…
Browse files Browse the repository at this point in the history
…e for spatial assets (#7107)

Co-authored-by: kaloster <rkalo@contractor.chanzuckerberg.com>
  • Loading branch information
nayib-jose-gloria and kaloster authored May 28, 2024
1 parent 3cf258f commit 3b0a8a7
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 39 deletions.
6 changes: 4 additions & 2 deletions backend/layers/processing/h5ad_data_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def __init__(

self.validate_anndata()

def to_cxg(self, output_cxg_directory, sparse_threshold, convert_anndata_colors_to_cxg_colors=True):
def to_cxg(
self, output_cxg_directory, sparse_threshold, dataset_version_id, convert_anndata_colors_to_cxg_colors=True
):
"""
Writes the following attributes of the anndata to CXG: 1) the metadata as metadata attached to an empty
DenseArray, 2) the obs DataFrame as a DenseArray, 3) the var DataFrame as a DenseArray, 4) all valid
Expand All @@ -80,7 +82,7 @@ def to_cxg(self, output_cxg_directory, sparse_threshold, convert_anndata_colors_
convert_dataframe_to_cxg_array(output_cxg_directory, "var", self.var, self.var_index_column_name, ctx)
logging.info("\t...dataset var dataframe saved")

convert_uns_to_cxg_group(output_cxg_directory, self.anndata.uns, "uns", ctx)
convert_uns_to_cxg_group(output_cxg_directory, self.anndata.uns, dataset_version_id, "uns", ctx)
logging.info("\t...dataset uns dataframe saved")

self.write_anndata_embeddings_to_cxg(output_cxg_directory, ctx)
Expand Down
4 changes: 2 additions & 2 deletions backend/layers/processing/process_cxg.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ def process(
self.process_cxg(labeled_h5ad_filename, dataset_version_id, cellxgene_bucket, current_artifacts)

@logit
def make_cxg(self, local_filename):
def make_cxg(self, local_filename, dataset_version_id):
"""
Convert the uploaded H5AD file to the CXG format servicing the cellxgene Explorer.
"""

cxg_output_container = local_filename.replace(".h5ad", ".cxg")
try:
h5ad_data_file = H5ADDataFile(local_filename, var_index_column_name="feature_name")
h5ad_data_file.to_cxg(cxg_output_container, sparse_threshold=25.0)
h5ad_data_file.to_cxg(cxg_output_container, sparse_threshold=25.0, dataset_version_id=dataset_version_id.id)
except Exception as ex:
# TODO use a specialized exception
msg = "CXG conversion failed."
Expand Down
2 changes: 1 addition & 1 deletion backend/layers/processing/process_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def convert_file(
start = datetime.now()
try:
self.update_processing_status(dataset_version_id, processing_status_key, DatasetConversionStatus.CONVERTING)
file_dir = converter(local_filename)
file_dir = converter(local_filename, dataset_version_id)
self.update_processing_status(dataset_version_id, processing_status_key, DatasetConversionStatus.CONVERTED)
self.logger.info(f"Finished converting {converter} in {datetime.now() - start}")
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion backend/layers/processing/process_seurat.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def process(self, dataset_version_id: DatasetVersionId, artifact_bucket: str, da
)

@logit
def make_seurat(self, local_filename):
def make_seurat(self, local_filename, *args, **kwargs):
"""
Create a Seurat rds file from the AnnData file.
"""
Expand Down
4 changes: 2 additions & 2 deletions backend/layers/processing/utils/cxg_generation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def convert_dictionary_to_cxg_group(cxg_container, metadata_dict, group_metadata
metadata_array.meta[key] = value


def convert_uns_to_cxg_group(cxg_container, metadata_dict, group_metadata_name="uns", ctx=None):
def convert_uns_to_cxg_group(cxg_container, metadata_dict, dataset_version_id, group_metadata_name="uns", ctx=None):
"""
Convert uns (unstructured) metadata to CXG output directory specified
Generate deep zoom assets for spatial data
Expand All @@ -53,7 +53,7 @@ def convert_uns_to_cxg_group(cxg_container, metadata_dict, group_metadata_name="
for object_id, content in value.items():
if object_id not in SPATIAL_KEYS_EXCLUDE:
object_filtered = spatial_processor.filter_spatial_data(content, object_id)
spatial_processor.create_deep_zoom_assets(cxg_container, content)
spatial_processor.create_deep_zoom_assets(cxg_container, content, dataset_version_id)

metadata_array.meta[key] = pickle.dumps(object_filtered)

Expand Down
11 changes: 7 additions & 4 deletions backend/layers/processing/utils/spatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,26 @@ def _generate_deep_zoom_assets(self, image_array, assets_folder):
image = pyvips.Image.new_from_memory(linear.data, w, h, bands, "uchar")
image.dzsave(os.path.join(assets_folder, "spatial"), suffix=".jpeg")

def _upload_assets(self, assets_folder):
def _upload_assets(self, assets_folder, dataset_version_id):
"""
Upload the deep zoom assets to the S3 bucket.
Args:
assets_folder (str): The folder containing the assets.
dataset_version_id (str): The UUID uniquely identifying the dataset version.
"""
s3_uri = f"s3://{self.bucket_name}/{self.asset_directory}/{os.path.basename(assets_folder)}"
version_id = dataset_version_id.replace(".cxg", "")
s3_uri = f"s3://{self.bucket_name}/{self.asset_directory}/{version_id}"
self.s3_provider.upload_directory(assets_folder, s3_uri)

def create_deep_zoom_assets(self, container_name, content):
def create_deep_zoom_assets(self, container_name, content, dataset_version_id):
"""
Create deep zoom assets for a container.
Args:
container_name (str): The name of the container.
content (dict): The content dictionary containing the image array.
dataset_version_id (str): The UUID uniquely identifying the dataset version.
"""
try:
with tempfile.TemporaryDirectory() as temp_dir:
Expand All @@ -142,7 +145,7 @@ def create_deep_zoom_assets(self, container_name, content):
image_array, _ = self._fetch_image(content)
processed_image = self._process_and_flip_image(image_array)
self._generate_deep_zoom_assets(processed_image, assets_folder)
self._upload_assets(assets_folder)
self._upload_assets(assets_folder, dataset_version_id)
except Exception as e:
logger.exception(f"Failed to create and upload deep zoom assets: {e}")
raise
Expand Down
15 changes: 8 additions & 7 deletions tests/unit/processing/test_h5ad_data_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def setUp(self):
self.sample_h5ad_filename = self._write_anndata_to_file(self.sample_anndata)

self.sample_output_directory = path.splitext(self.sample_h5ad_filename)[0] + ".cxg"
self.dataset_version_id = "test_dataset_version_id"

def tearDown(self):
if self.sample_h5ad_filename:
Expand Down Expand Up @@ -108,31 +109,31 @@ def test__create_h5ad_data_file__obs_and_var_index_names_specified_doesnt_exist_

def test__to_cxg__simple_anndata_no_corpora_and_sparse(self):
h5ad_file = H5ADDataFile(self.sample_h5ad_filename)
h5ad_file.to_cxg(self.sample_output_directory, 100)
h5ad_file.to_cxg(self.sample_output_directory, 100, self.dataset_version_id)

self._validate_cxg_and_h5ad_content_match(self.sample_h5ad_filename, self.sample_output_directory, True)

def test__to_cxg__simple_anndata_with_corpora_and_sparse(self):
h5ad_file = H5ADDataFile(self.sample_h5ad_filename)
h5ad_file.to_cxg(self.sample_output_directory, 100)
h5ad_file.to_cxg(self.sample_output_directory, 100, self.dataset_version_id)

self._validate_cxg_and_h5ad_content_match(self.sample_h5ad_filename, self.sample_output_directory, True)

def test__to_cxg__simple_anndata_no_corpora_and_dense(self):
h5ad_file = H5ADDataFile(self.sample_h5ad_filename)
h5ad_file.to_cxg(self.sample_output_directory, 0)
h5ad_file.to_cxg(self.sample_output_directory, 0, self.dataset_version_id)

self._validate_cxg_and_h5ad_content_match(self.sample_h5ad_filename, self.sample_output_directory, False)

def test__to_cxg__simple_anndata_with_corpora_and_dense(self):
h5ad_file = H5ADDataFile(self.sample_h5ad_filename)
h5ad_file.to_cxg(self.sample_output_directory, 0)
h5ad_file.to_cxg(self.sample_output_directory, 0, self.dataset_version_id)

self._validate_cxg_and_h5ad_content_match(self.sample_h5ad_filename, self.sample_output_directory, False)

def test__to_cxg__simple_anndata_with_corpora_and_dense_using_feature_name_var_index(self):
h5ad_file = H5ADDataFile(self.sample_h5ad_filename, var_index_column_name="feature_name")
h5ad_file.to_cxg(self.sample_output_directory, 0)
h5ad_file.to_cxg(self.sample_output_directory, 0, self.dataset_version_id)

self._validate_cxg_and_h5ad_content_match(self.sample_h5ad_filename, self.sample_output_directory, False)
self._validate_cxg_var_index_column_match(
Expand All @@ -142,7 +143,7 @@ def test__to_cxg__simple_anndata_with_corpora_and_dense_using_feature_name_var_i

def test__to_cxg__simple_anndata_with_different_var_index_than_h5ad(self):
h5ad_file = H5ADDataFile(self.sample_h5ad_filename, var_index_column_name="int_category")
h5ad_file.to_cxg(self.sample_output_directory, 0)
h5ad_file.to_cxg(self.sample_output_directory, 0, self.dataset_version_id)

self._validate_cxg_var_index_column_match(
self.sample_output_directory,
Expand All @@ -155,7 +156,7 @@ def test__to_cxg__with_sparse_column_encoding(self):
sparse_with_column_shift_filename = self._write_anndata_to_file(anndata)

h5ad_file = H5ADDataFile(sparse_with_column_shift_filename)
h5ad_file.to_cxg(self.sample_output_directory, 50)
h5ad_file.to_cxg(self.sample_output_directory, 50, self.dataset_version_id)

self._validate_cxg_and_h5ad_content_match(
sparse_with_column_shift_filename, self.sample_output_directory, False, has_column_encoding=True
Expand Down
49 changes: 29 additions & 20 deletions tests/unit/processing/test_spatial_assets_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ def spatial_processor(s3_provider_mock):


@pytest.fixture
def output_folder():
return "test_output"
def asset_folder():
return "test_asset_folder"


@pytest.fixture
def dataset_version_id():
return "test_dataset_version_id"


@pytest.fixture
Expand Down Expand Up @@ -191,7 +196,7 @@ def test__crop_to_aspect_ratio(spatial_processor, width, height):
), "Crop is not centered correctly"


def test__generate_deep_zoom_assets(spatial_processor, output_folder, mocker):
def test__generate_deep_zoom_assets(spatial_processor, asset_folder, mocker):
"""
Test the method to generate deep zoom assets
"""
Expand All @@ -202,7 +207,7 @@ def test__generate_deep_zoom_assets(spatial_processor, output_folder, mocker):
mock_new_from_memory.return_value = mock_image

with tempfile.TemporaryDirectory() as temp_dir:
assets_folder = os.path.join(temp_dir, output_folder)
assets_folder = os.path.join(temp_dir, asset_folder)
os.makedirs(assets_folder)
spatial_processor._generate_deep_zoom_assets(test_image_array, assets_folder)

Expand All @@ -216,34 +221,34 @@ def test__generate_deep_zoom_assets(spatial_processor, output_folder, mocker):
mock_image.dzsave.assert_called_once_with(expected_output_path, suffix=".jpeg")


def test__upload_assets(spatial_processor, output_folder, mocker):
def test__upload_assets(spatial_processor, asset_folder, dataset_version_id, mocker):
"""
Test upload assets to S3
"""
mock_upload = mocker.patch.object(spatial_processor.s3_provider, "upload_directory")

spatial_processor._upload_assets(output_folder)
expected_s3_uri = f"s3://{spatial_processor.bucket_name}/{spatial_processor.asset_directory}/{output_folder}"
spatial_processor._upload_assets(asset_folder, dataset_version_id)
expected_s3_uri = f"s3://{spatial_processor.bucket_name}/{spatial_processor.asset_directory}/{dataset_version_id}"

# verify that upload_directory was called correctly
mock_upload.assert_called_once_with(output_folder, expected_s3_uri)
mock_upload.assert_called_once_with(asset_folder, expected_s3_uri)


def test__upload_assets_failure(spatial_processor, output_folder, mocker):
def test__upload_assets_failure(spatial_processor, asset_folder, dataset_version_id, mocker):
"""
Test upload assets to S3 when the upload fails
"""
mock_upload = mocker.patch.object(spatial_processor.s3_provider, "upload_directory")
mock_upload.side_effect = Exception("Failed to upload")

with pytest.raises(Exception, match="Failed to upload"):
spatial_processor._upload_assets(output_folder)
spatial_processor._upload_assets(asset_folder, dataset_version_id)

expected_s3_uri = f"s3://{spatial_processor.bucket_name}/{spatial_processor.asset_directory}/{output_folder}"
mock_upload.assert_called_once_with(output_folder, expected_s3_uri)
expected_s3_uri = f"s3://{spatial_processor.bucket_name}/{spatial_processor.asset_directory}/{dataset_version_id}"
mock_upload.assert_called_once_with(asset_folder, expected_s3_uri)


def test__create_deep_zoom_assets(spatial_processor, cxg_container, valid_spatial_data, mocker):
def test__create_deep_zoom_assets(spatial_processor, cxg_container, valid_spatial_data, dataset_version_id, mocker):
mock_fetch_image = mocker.patch.object(spatial_processor, "_fetch_image")
mock_process_and_flip_image = mocker.patch.object(spatial_processor, "_process_and_flip_image")
mock_generate_deep_zoom_assets = mocker.patch.object(spatial_processor, "_generate_deep_zoom_assets")
Expand All @@ -259,18 +264,20 @@ def test__create_deep_zoom_assets(spatial_processor, cxg_container, valid_spatia
mock_process_and_flip_image.return_value = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8)

# call the method under test
spatial_processor.create_deep_zoom_assets(cxg_container, valid_spatial_data)
spatial_processor.create_deep_zoom_assets(cxg_container, valid_spatial_data, dataset_version_id)

assets_folder = os.path.join(temp_dir_name, cxg_container.replace(".cxg", ""))

# assertions to ensure each step is called
mock_fetch_image.assert_called_once_with(valid_spatial_data)
mock_process_and_flip_image.assert_called_once_with(mock_fetch_image.return_value[0])
mock_generate_deep_zoom_assets.assert_called_once_with(mock_process_and_flip_image.return_value, assets_folder)
mock_upload_assets.assert_called_once_with(assets_folder)
mock_upload_assets.assert_called_once_with(assets_folder, dataset_version_id)


def test__create_deep_zoom_assets_exception(spatial_processor, cxg_container, valid_spatial_data, mocker):
def test__create_deep_zoom_assets_exception(
spatial_processor, cxg_container, valid_spatial_data, dataset_version_id, mocker
):
mock_fetch_image = mocker.patch.object(spatial_processor, "_fetch_image")
mock_process_and_flip_image = mocker.patch.object(spatial_processor, "_process_and_flip_image")
mock_generate_deep_zoom_assets = mocker.patch.object(spatial_processor, "_generate_deep_zoom_assets")
Expand All @@ -281,15 +288,17 @@ def test__create_deep_zoom_assets_exception(spatial_processor, cxg_container, va

# assert that the method raises an exception
with pytest.raises(Exception, match="Test exception"):
spatial_processor.create_deep_zoom_assets(cxg_container, valid_spatial_data)
spatial_processor.create_deep_zoom_assets(cxg_container, valid_spatial_data, dataset_version_id)

mock_fetch_image.assert_called_once_with(valid_spatial_data)
mock_process_and_flip_image.assert_not_called()
mock_generate_deep_zoom_assets.assert_not_called()
mock_upload_assets.assert_not_called()


def test__convert_uns_to_cxg_group(cxg_container, valid_uns, group_metadata_name, ctx, mock_spatial_processor, mocker):
def test__convert_uns_to_cxg_group(
cxg_container, valid_uns, group_metadata_name, ctx, mock_spatial_processor, dataset_version_id, mocker
):
mock_from_numpy = mocker.patch("backend.layers.processing.utils.cxg_generation_utils.tiledb.from_numpy")
mock_tiledb_open = mocker.patch(
"backend.layers.processing.utils.cxg_generation_utils.tiledb.open", mocker.mock_open()
Expand All @@ -302,7 +311,7 @@ def test__convert_uns_to_cxg_group(cxg_container, valid_uns, group_metadata_name
mock_metadata_array = mock_tiledb_open.return_value.__enter__.return_value
mock_metadata_array.meta = {}

convert_uns_to_cxg_group(cxg_container, valid_uns, group_metadata_name, ctx)
convert_uns_to_cxg_group(cxg_container, valid_uns, dataset_version_id, group_metadata_name, ctx)

# check if from_numpy is called correctly
mock_from_numpy.assert_called_once_with(f"{cxg_container}/{group_metadata_name}", np.zeros((1,)))
Expand All @@ -326,5 +335,5 @@ def test__convert_uns_to_cxg_group(cxg_container, valid_uns, group_metadata_name

# check if create_deep_zoom_assets is called correctly
mock_spatial_processor.create_deep_zoom_assets.assert_called_once_with(
cxg_container, valid_uns["spatial"]["library_id_1"]
cxg_container, valid_uns["spatial"]["library_id_1"], dataset_version_id
)

0 comments on commit 3b0a8a7

Please sign in to comment.