Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata): parse unencrypted metadata #21

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions pims/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,23 @@

cytomine_logger = logging.getLogger("pims.cytomine")

REQUIRED_DIRECTORIES = ["images", "metadata"]
REQUIRED_DIRECTORIES = ["IMAGES", "METADATA"]
WRITING_PATH = get_settings().writing_path


def is_dataset_structured(dataset_path: str) -> bool:
"""Check the structure of a dataset."""

actual_directories = {
d.upper()
for d in os.listdir(dataset_path)
if os.path.isdir(os.path.join(dataset_path, d))
}

missing_directories = [
directory
for directory in REQUIRED_DIRECTORIES
if not os.path.isdir(os.path.join(dataset_path, directory))
if directory not in actual_directories
]

return missing_directories == []
Expand Down Expand Up @@ -135,6 +141,8 @@ def import_dataset(
if not storage:
raise CytomineProblem(f"Storage {storage_id} not found")

dataset_uploaded = []
metadata_uploaded = []
for dataset in datasets:
uploaded_files = run_import_from_path(
dataset,
Expand All @@ -151,17 +159,22 @@ def import_dataset(
)
abstract_images.append(AbstractImage().populate(data))

dataset_name = os.path.basename(dataset)
success = import_metadata(dataset, abstract_images)
if success:
metadata_uploaded.append(dataset_name)

project = Project(name=os.path.basename(dataset)).save()
project = Project(name=dataset_name).save()

for image in abstract_images:
ImageInstance(id_abstract_image=image.id, id_project=project.id).save()

dataset_uploaded.append(dataset_name)

return JSONResponse(
content={
"image_upload": len(uploaded_files) != 0,
"metadata_upload": success,
"dataset_uploaded": dataset_uploaded,
"metadata_uploaded": metadata_uploaded,
}
)

Expand Down
67 changes: 48 additions & 19 deletions pims/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
import os
import shutil
from base64 import b64decode
from datetime import datetime
from tempfile import TemporaryDirectory
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple
from uuid import UUID

from bigpicture_metadata_interface import BPInterface
from celery import group, signature
from celery.result import allow_join_result
from crypt4gh_fsspec import Crypt4GHFileSystem
from cytomine.cytomine import Cytomine
from crypt4gh_fsspec.crypt4gh_file import Crypt4GHMagic
from cytomine.models import (
AbstractImage,
ProjectCollection,
PropertyCollection,
Property,
UploadedFile,
)
from isodate.duration import Duration
from nacl.public import PrivateKey
from nacl.secret import SecretBox

Expand Down Expand Up @@ -643,7 +646,7 @@ def run_import_from_path(
"""Run importer from a given path."""

uploaded_files = []
images_path = Path(os.path.join(dataset_path, "images"))
images_path = Path(os.path.join(dataset_path, "IMAGES"))
for item in images_path.iterdir():
if not item.is_dir():
continue
Expand Down Expand Up @@ -680,22 +683,40 @@ def run_import_from_path(
return uploaded_files


def import_metadata(dataset_path: str, abstract_images: List[AbstractImage]) -> bool:
"""Import metadata from a given path."""
def is_encrypted(file_path: Path) -> bool:
"""Check if the file is encrypted."""

with open(file_path, "rb") as file:
if Crypt4GHMagic(file).is_crypt4gh():
return True

return False


metadata_path = os.path.join(dataset_path, "metadata")
def parse_metadata(dataset_path: str) -> Optional[Tuple[Any, Any, Any]]:
"""Parse metadata from a given path."""

metadata_path = os.path.join(dataset_path, "METADATA")
files = [
file
for file in os.listdir(metadata_path)
if os.path.isfile(os.path.join(metadata_path, file))
]
encrypted = any(is_encrypted(os.path.join(metadata_path, file)) for file in files)

if not encrypted:
if not BPInterface.validate(dataset_path):
return None

return BPInterface.parse_xml_files(dataset_path)

settings = get_settings()
fs = Crypt4GHFileSystem(
decode_key(settings.crypt4gh_private_key),
)

with TemporaryDirectory() as tmp_dir:
metadata_directory_path = os.path.join(tmp_dir, "metadata")
metadata_directory_path = os.path.join(tmp_dir, "METADATA")
os.makedirs(metadata_directory_path, exist_ok=True)

for file in files:
Expand All @@ -706,36 +727,44 @@ def import_metadata(dataset_path: str, abstract_images: List[AbstractImage]) ->
fp.write(decrypted_data)

with fs.open(
os.path.join(dataset_path, "private", "dac.xml.c4gh"),
os.path.join(dataset_path, "PRIVATE", "dac.xml.c4gh"),
"rb",
) as fp:
decrypted_data = fp.read()

private_directory_path = os.path.join(tmp_dir, "metadata")
private_directory_path = os.path.join(tmp_dir, "PRIVATE")
os.makedirs(private_directory_path, exist_ok=True)
with open(os.path.join(private_directory_path, "dac.xml"), "wb") as fp:
fp.write(decrypted_data)

if not BPInterface.validate(tmp_dir):
return False
return None

studies, beings, datasets = BPInterface.parse_xml_files(tmp_dir)
return BPInterface.parse_xml_files(tmp_dir)

return None


def import_metadata(dataset_path: str, abstract_images: List[AbstractImage]) -> bool:
"""Import metadata from a given path."""

data = parse_metadata(dataset_path)
if data is None:
return False

studies, beings, datasets = data
metadata_parser = BPMetadataParser(studies, beings, datasets)

# Upload metadata file
for ai in abstract_images:
metadata = metadata_parser.parse({"image": ai.originalFilename})

properties = PropertyCollection(ai)
for key, value in metadata.items():
properties.append(
v = str(value) if isinstance(value, (datetime, Duration, UUID)) else value
Property(
ai,
f"MSMDAD.{key}",
value,
)

properties.save()
v,
).save()

return True

Expand Down
35 changes: 22 additions & 13 deletions pims/processing/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@
CodeAttributes,
DACContact,
Dataset,
File,
Sample,
Stain,
Statement,
Study,
Reference,
)

BP_MODEL = (
AttributesObject,
BaseObject,
Code,
DACContact,
File,
Sample,
Stain,
Statement,
Reference,
)


Expand Down Expand Up @@ -92,17 +96,17 @@ def _parse_primitive(self, primitive: Any, prefix: str) -> None:
self.parsed[prefix] = primitive

# Filters
substring = f"{self.filters['image']}.slide.alias"
substring = f"{self.filters['image']}.slide.identifier"
if self.filters["image"] and substring in prefix:
self.filters["slide"] = self.parsed[prefix]

substring = f"slides.{self.filters['slide']}.id"
if self.filters["slide"] and substring in prefix:
if "Observation_" in prefix:
start = prefix.find("Observation_")
if "OBSERVATION_" in prefix:
start = prefix.find("OBSERVATION_")
self.filters["obs"] = prefix[start : start + 22]
else:
start = prefix.find("BiologicalBeing_")
start = prefix.find("BIOLOGICAL_BEING_")
self.filters["being"] = prefix[start : start + 26]

def _parse_dict(self, d: dict, prefix: str) -> None:
Expand All @@ -129,7 +133,11 @@ def _parse_list(self, l: list, prefix: str) -> None:
self._parse_primitive(l, prefix)

for item in l:
suffix = item.alias if hasattr(item, "alias") else type(item).__name__
suffix = (
item.reference.identifier
if hasattr(item, "reference")
else type(item).__name__
)
self.parser.get(type(item), self.parser["any"])(item, f"{prefix}.{suffix}")

def parse_dataclass(self, data: dataclass, prefix: str = None) -> None:
Expand All @@ -144,13 +152,14 @@ def parse_dataclass(self, data: dataclass, prefix: str = None) -> None:
"""

if prefix is None:
prefix = data.alias
prefix = data.reference.identifier

for field in fields(data):
attribute = getattr(data, field.name)

self.parser.get(type(attribute), self.parser["any"])(
attribute, f"{prefix}.{field.name}"
attribute,
f"{prefix}.{field.name}",
)

def _filter(self) -> dict:
Expand All @@ -165,10 +174,10 @@ def _filter(self) -> dict:
if self.filters["image"] is None:
return self.parsed

dataset = {k: v for k, v in self.parsed.items() if k.startswith("Dataset")}
base = {k: v for k, v in self.parsed.items() if k.startswith("Study")}
dataset = {k: v for k, v in self.parsed.items() if k.startswith("DATASET")}
base = {k: v for k, v in self.parsed.items() if k.startswith("STUDY")}
beings = {
k: v for k, v in self.parsed.items() if k.startswith("BiologicalBeing")
k: v for k, v in self.parsed.items() if k.startswith("BIOLOGICAL_BEING")
}

# Filter the Dataset with the Observation linked to the slide
Expand Down Expand Up @@ -208,13 +217,13 @@ def parse(self, filters: dict = None) -> None:
self.filters.update(filters)

for dataset in self.datasets:
self.parser.get("any")(dataset, dataset.alias)
self.parser.get("any")(dataset, dataset.reference.identifier)

for study in self.studies:
self.parser.get("any")(study, study.alias)
self.parser.get("any")(study, study.reference.identifier)

for being in self.beings:
self.parser.get("any")(being, being.alias)
self.parser.get("any")(being, being.reference.identifier)

self._remove_empty_variables()

Expand Down
Loading