Skip to content

Commit

Permalink
feat: create metadata json from core fields (#1136)
Browse files Browse the repository at this point in the history
* feat: create metadata json from core fields

* feat: check metadata json corrupt

* feat: ignore corrupt core jsons

* refactor: create_metadata_json method
  • Loading branch information
helen-m-lin authored Nov 4, 2024
1 parent 7b2a2a4 commit 3c2c99d
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 7 deletions.
44 changes: 43 additions & 1 deletion src/aind_data_schema/core/metadata.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Generic metadata class for Data Asset Records."""

import inspect
import json
import logging
from datetime import datetime
from enum import Enum
from typing import Dict, List, Literal, Optional, get_args
Expand All @@ -18,7 +20,7 @@
model_validator,
)

from aind_data_schema.base import AindCoreModel
from aind_data_schema.base import AindCoreModel, is_dict_corrupt
from aind_data_schema.core.acquisition import Acquisition
from aind_data_schema.core.data_description import DataDescription
from aind_data_schema.core.instrument import Instrument
Expand Down Expand Up @@ -286,3 +288,43 @@ def validate_rig_session_compatibility(self):
check = RigSessionCompatibility(self.rig, self.session)
check.run_compatibility_check()
return self


def create_metadata_json(
name: str,
location: str,
core_jsons: Dict[str, Optional[dict]],
optional_created: Optional[datetime] = None,
optional_external_links: Optional[dict] = None,
) -> dict:
"""Creates a Metadata dict from dictionary of core schema fields."""
# Extract basic parameters and non-corrupt core schema fields
params = {
"name": name,
"location": location,
}
if optional_created is not None:
params["created"] = optional_created
if optional_external_links is not None:
params["external_links"] = optional_external_links
core_fields = dict()
for key, value in core_jsons.items():
if key in CORE_FILES and value is not None:
if is_dict_corrupt(value):
logging.warning(f"Provided {key} is corrupt! It will be ignored.")
else:
core_fields[key] = value
# Create Metadata object and convert to JSON
# If there are any validation errors, still create it
# but set MetadataStatus as Invalid
try:
metadata = Metadata.model_validate({**params, **core_fields})
metadata_json = json.loads(metadata.model_dump_json(by_alias=True))
except Exception as e:
logging.warning(f"Issue with metadata construction! {e.args}")
metadata = Metadata.model_validate(params)
metadata_json = json.loads(metadata.model_dump_json(by_alias=True))
for key, value in core_fields.items():
metadata_json[key] = value
metadata_json["metadata_status"] = MetadataStatus.INVALID.value
return metadata_json
211 changes: 205 additions & 6 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,89 @@
import json
import re
import unittest
from datetime import time
from datetime import datetime, time, timezone
from unittest.mock import MagicMock, call, patch

from aind_data_schema_models.modalities import Modality
from aind_data_schema_models.organizations import Organization
from aind_data_schema_models.pid_names import PIDName
from aind_data_schema_models.platforms import Platform
from aind_data_schema_models.modalities import Modality
from pydantic import ValidationError
from pydantic import __version__ as pyd_version

from aind_data_schema.components.devices import MousePlatform
from aind_data_schema.core.acquisition import Acquisition
from aind_data_schema.core.data_description import DataDescription
from aind_data_schema.core.data_description import DataDescription, Funding
from aind_data_schema.core.instrument import Instrument
from aind_data_schema.core.metadata import Metadata, MetadataStatus
from aind_data_schema.core.metadata import ExternalPlatforms, Metadata, MetadataStatus, create_metadata_json
from aind_data_schema.core.procedures import (
IontophoresisInjection,
NanojectInjection,
Procedures,
Surgery,
ViralMaterial,
)
from aind_data_schema.core.processing import Processing
from aind_data_schema.core.processing import PipelineProcess, Processing
from aind_data_schema.core.rig import Rig
from aind_data_schema.core.session import Session
from aind_data_schema.core.subject import BreedingInfo, Sex, Species, Subject
from aind_data_schema.core.subject import BreedingInfo, Housing, Sex, Species, Subject

PYD_VERSION = re.match(r"(\d+.\d+).\d+", pyd_version).group(1)


class TestMetadata(unittest.TestCase):
"""Class to test Metadata model"""

@classmethod
def setUpClass(cls) -> None:
"""Set up the test class."""
subject = Subject(
species=Species.MUS_MUSCULUS,
subject_id="12345",
sex=Sex.MALE,
date_of_birth=datetime(2022, 11, 22, 8, 43, 00, tzinfo=timezone.utc).date(),
source=Organization.AI,
breeding_info=BreedingInfo(
breeding_group="Emx1-IRES-Cre(ND)",
maternal_id="546543",
maternal_genotype="Emx1-IRES-Cre/wt; Camk2a-tTa/Camk2a-tTA",
paternal_id="232323",
paternal_genotype="Ai93(TITL-GCaMP6f)/wt",
),
genotype="Emx1-IRES-Cre/wt;Camk2a-tTA/wt;Ai93(TITL-GCaMP6f)/wt",
housing=Housing(home_cage_enrichment=["Running wheel"], cage_id="123"),
background_strain="C57BL/6J",
)
dd = DataDescription(
label="test_data",
modality=[Modality.ECEPHYS],
platform=Platform.ECEPHYS,
subject_id="123456",
data_level="raw",
creation_time=datetime(2022, 11, 22, 8, 43, 00, tzinfo=timezone.utc),
institution=Organization.AIND,
funding_source=[Funding(funder=Organization.NINDS, grant_number="grant001")],
investigators=[PIDName(name="Jane Smith")],
)
procedures = Procedures(
subject_id="12345",
)
processing = Processing(
processing_pipeline=PipelineProcess(processor_full_name="Processor", data_processes=[]),
)

cls.sample_name = "ecephys_655019_2023-04-03_18-17-09"
cls.sample_location = "s3://bucket/ecephys_655019_2023-04-03_18-17-09"
cls.subject = subject
cls.dd = dd
cls.procedures = procedures
cls.processing = processing

cls.subject_json = json.loads(subject.model_dump_json())
cls.dd_json = json.loads(dd.model_dump_json())
cls.procedures_json = json.loads(procedures.model_dump_json())
cls.processing_json = json.loads(processing.model_dump_json())

def test_valid_subject_info(self):
"""Tests that the record is marked as VALID if a valid subject model
is present."""
Expand Down Expand Up @@ -344,6 +396,153 @@ def test_validate_old_schema_version(self):

self.assertIsNotNone(m2)

def test_create_from_core_jsons(self):
"""Tests metadata json can be created with valid inputs"""
core_jsons = {
"subject": self.subject_json,
"data_description": None,
"procedures": self.procedures_json,
"session": None,
"rig": None,
"processing": self.processing_json,
"acquisition": None,
"instrument": None,
"quality_control": None,
}
expected_md = Metadata(
name=self.sample_name,
location=self.sample_location,
subject=self.subject,
procedures=self.procedures,
processing=self.processing,
)
expected_result = json.loads(expected_md.model_dump_json(by_alias=True))
# there are some userwarnings when creating Subject from json
with self.assertWarns(UserWarning):
result = create_metadata_json(
name=self.sample_name,
location=self.sample_location,
core_jsons=core_jsons,
)
# check that metadata was created with expected values
self.assertEqual(self.sample_name, result["name"])
self.assertEqual(self.sample_location, result["location"])
self.assertEqual(MetadataStatus.VALID.value, result["metadata_status"])
self.assertEqual(self.subject_json, result["subject"])
self.assertEqual(self.procedures_json, result["procedures"])
self.assertEqual(self.processing_json, result["processing"])
self.assertIsNone(result["acquisition"])
# also check the other fields
# small hack to mock the _id, created, and last_modified fields
expected_result["_id"] = result["_id"]
expected_result["created"] = result["created"]
expected_result["last_modified"] = result["last_modified"]
self.assertDictEqual(expected_result, result)

def test_create_from_core_jsons_optional_overwrite(self):
"""Tests metadata json creation with created and external links"""
created = datetime(2024, 10, 31, 12, 0, 0, tzinfo=timezone.utc)
external_links = {
ExternalPlatforms.CODEOCEAN.value: ["123", "abc"],
}
# there are some userwarnings when creating from json
with self.assertWarns(UserWarning):
result = create_metadata_json(
name=self.sample_name,
location=self.sample_location,
core_jsons={
"subject": self.subject_json,
},
optional_created=created,
optional_external_links=external_links,
)
self.assertEqual(self.sample_name, result["name"])
self.assertEqual(self.sample_location, result["location"])
self.assertEqual("2024-10-31T12:00:00Z", result["created"])
self.assertEqual(external_links, result["external_links"])

@patch("logging.warning")
def test_create_from_core_jsons_invalid(self, mock_warning: MagicMock):
"""Tests that metadata json is marked invalid if there are errors"""
# data_description triggers cross-validation of other fields to fail
core_jsons = {
"subject": self.subject_json,
"data_description": self.dd_json,
"procedures": self.procedures_json,
"session": None,
"rig": None,
"processing": self.processing_json,
"acquisition": None,
"instrument": None,
"quality_control": None,
}
# there are some userwarnings when creating Subject from json
with self.assertWarns(UserWarning):
result = create_metadata_json(
name=self.sample_name,
location=self.sample_location,
core_jsons=core_jsons,
)
# check that metadata was still created
self.assertEqual(self.sample_name, result["name"])
self.assertEqual(self.sample_location, result["location"])
self.assertEqual(self.subject_json, result["subject"])
self.assertEqual(self.dd_json, result["data_description"])
self.assertEqual(self.procedures_json, result["procedures"])
self.assertEqual(self.processing_json, result["processing"])
self.assertIsNone(result["acquisition"])
# check that metadata was marked as invalid
self.assertEqual(MetadataStatus.INVALID.value, result["metadata_status"])
mock_warning.assert_called_once()
self.assertIn("Issue with metadata construction!", mock_warning.call_args_list[0].args[0])

@patch("logging.warning")
@patch("aind_data_schema.core.metadata.is_dict_corrupt")
def test_create_from_core_jsons_corrupt(
self,
mock_is_dict_corrupt: MagicMock,
mock_warning: MagicMock
):
"""Tests metadata json creation ignores corrupt core jsons"""
# mock corrupt procedures and processing
mock_is_dict_corrupt.side_effect = lambda x: (
x == self.procedures_json or x == self.processing_json
)
core_jsons = {
"subject": self.subject_json,
"data_description": None,
"procedures": self.procedures_json,
"session": None,
"rig": None,
"processing": self.processing_json,
"acquisition": None,
"instrument": None,
"quality_control": None,
}
# there are some userwarnings when creating Subject from json
with self.assertWarns(UserWarning):
result = create_metadata_json(
name=self.sample_name,
location=self.sample_location,
core_jsons=core_jsons,
)
# check that metadata was still created
self.assertEqual(self.sample_name, result["name"])
self.assertEqual(self.sample_location, result["location"])
self.assertEqual(self.subject_json, result["subject"])
self.assertIsNone(result["acquisition"])
self.assertEqual(MetadataStatus.VALID.value, result["metadata_status"])
# check that corrupt core jsons were ignored
self.assertIsNone(result["procedures"])
self.assertIsNone(result["processing"])
mock_warning.assert_has_calls(
[
call("Provided processing is corrupt! It will be ignored."),
call("Provided procedures is corrupt! It will be ignored."),
],
any_order=True,
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 3c2c99d

Please sign in to comment.