Skip to content

Commit

Permalink
wip: update db
Browse files Browse the repository at this point in the history
  • Loading branch information
korikuzma committed Jan 25, 2025
1 parent e761a2f commit 7c0db36
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 50 deletions.
4 changes: 3 additions & 1 deletion src/metakb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _get_credentials(


_CONSTRAINTS = {
"coding_constraint": "CREATE CONSTRAINT coding_constraint IF NOT EXISTS FOR (c:Coding) REQUIRE (c.code, c.label, c.system) IS UNIQUE;",
"strength_constraint": "CREATE CONSTRAINT coding_constraint IF NOT EXISTS FOR (n:Strength) REQUIRE (n.label, n.primaryCode) IS UNIQUE;",
"gene_id_constraint": "CREATE CONSTRAINT gene_id_constraint IF NOT EXISTS FOR (n:Gene) REQUIRE n.id IS UNIQUE;",
"disease_id_constraint": "CREATE CONSTRAINT disease_id_constraint IF NOT EXISTS FOR (n:Disease) REQUIRE n.id IS UNIQUE;",
"therapy_id_constraint": "CREATE CONSTRAINT therapy_id_constraint IF NOT EXISTS FOR (n:Therapy) REQUIRE n.id IS UNIQUE;",
Expand All @@ -84,6 +84,8 @@ def _get_credentials(
"document_id_constraint": "CREATE CONSTRAINT document_id_constraint IF NOT EXISTS FOR (n:Document) REQUIRE n.id IS UNIQUE;",
"statement_id_constraint": "CREATE CONSTRAINT statement_id_constraint IF NOT EXISTS FOR (n:Statement) REQUIRE n.id IS UNIQUE;",
"method_id_constraint": "CREATE CONSTRAINT method_id_constraint IF NOT EXISTS FOR (n:Method) REQUIRE n.id IS UNIQUE;",
"classification_constraint": "CREATE CONSTRAINT classification_constraint IF NOT EXISTS FOR (n:Classification) REQUIRE n.primaryCode IS UNIQUE;",
"evidence_line_id_constraint": "CREATE CONSTRAINT evidence_line_id_constraint IF NOT EXISTS FOR (n:EvidenceLine) REQUIRE n.id IS UNIQUE;",
}


Expand Down
142 changes: 103 additions & 39 deletions src/metakb/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
import uuid
from pathlib import Path

from neo4j import Driver, ManagedTransaction
Expand Down Expand Up @@ -394,28 +395,17 @@ def _add_obj_id_to_set(obj: dict, ids_set: set[str]) -> None:
return ids_in_stmts


def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships
def _get_statement_query(statement: dict) -> str:
"""Generate the initial Cypher query to create a statement node and its
relationships, based on shared properties of evidence and assertion records.
:param tx: Transaction object provided to transaction functions
:param statement_in: Statement CDM object
:param statement: Statement record
:return: The base Cypher query string for creating the statement node and
relationships
"""
statement = statement_in.copy()
statement_type = statement["type"]
statement_keys = _create_parameterized_query(
statement, ("id", "description", "direction", "type")
)

match_line = ""
rel_line = ""

is_reported_in_docs = statement.get("reportedIn", [])
for ri_doc in is_reported_in_docs:
ri_doc_id = ri_doc["id"]
name = f"doc_{ri_doc_id.split(':')[-1]}"
match_line += f"MERGE ({name} {{ id: '{ri_doc_id}'}})\n"
rel_line += f"MERGE (s) -[:IS_REPORTED_IN] -> ({name})\n"

proposition = statement["proposition"]
statement["propositionType"] = proposition["type"]
match_line += "SET s.propositionType=$propositionType\n"
Expand All @@ -439,21 +429,6 @@ def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
match_line += f"MERGE (m {{ id: '{method_id}' }})\n"
rel_line += "MERGE (s) -[:IS_SPECIFIED_BY] -> (m)\n"

strength = statement.get("strength")
if strength:
strength_key_fields = ("primaryCode", "label")

strength_keys = _create_parameterized_query(
strength, strength_key_fields, entity_param_prefix="strength_"
)
for k in strength_key_fields:
v = strength.get(k)
if v:
statement[f"strength_{k}"] = v

match_line += f"MERGE (mc:MappableConcept {{ {strength_keys} }})\n"
rel_line += "MERGE (s) -[:HAS_STRENGTH] -> (mc)\n"

variant_id = proposition["subjectVariant"]["id"]
match_line += f"MERGE (v:Variation {{ id: '{variant_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_VARIANT] -> (v)\n"
Expand All @@ -471,12 +446,96 @@ def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
match_line += f"MERGE (tt:Condition {{ id: '{tumor_type_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_TUMOR_TYPE] -> (tt)\n"

query = f"""
MERGE (s:{statement_type}:StudyStatement {{ {statement_keys} }})
statement_keys = _create_parameterized_query(
statement, ("id", "description", "direction", "type")
)

return f"""
MERGE (s:{statement['type']}:StudyStatement {{ {statement_keys} }})
{match_line}
{rel_line}
{rel_line}\n
"""


def _add_statement_evidence(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships for evidence records
:param tx: Transaction object provided to transaction functions
:param statement_in: Statement CDM object for evidence items
"""
statement = statement_in.copy()
query = _get_statement_query(statement)

is_reported_in_docs = statement.get("reportedIn", [])
for ri_doc in is_reported_in_docs:
ri_doc_id = ri_doc["id"]
name = f"doc_{ri_doc_id.split(':')[-1]}"
query += f"""
MERGE ({name} {{ id: '{ri_doc_id}'}})
MERGE (s) -[:IS_REPORTED_IN] -> ({name})
"""

strength = statement.get("strength")
if strength:
strength_key_fields = ("primaryCode", "label")

strength_keys = _create_parameterized_query(
strength, strength_key_fields, entity_param_prefix="strength_"
)
for k in strength_key_fields:
v = strength.get(k)
if v:
statement[f"strength_{k}"] = v

query += f"""
MERGE (strength:Strength {{ {strength_keys} }})
MERGE (s) -[:HAS_STRENGTH] -> (strength)
"""
tx.run(query, **statement)


def _add_statement_assertion(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships for assertion records
:param tx: Transaction object provided to transaction functions
:param statement_in: Statement CDM object for assertions
"""
statement = statement_in.copy()
query = _get_statement_query(statement)

classification = statement["classification"]
classification_keys = [
_create_parameterized_query(
classification, ("primaryCode",), entity_param_prefix="classification_"
)
]
statement["classification_primaryCode"] = classification["primaryCode"]
_add_mappings_and_exts_to_obj(classification, classification_keys)
statement.update(classification)
classification_keys = ", ".join(classification_keys)

query += f"""
MERGE (classification:Classification {{ {classification_keys} }})
MERGE (s) -[:HAS_CLASSIFICATION] -> (classification)
"""

evidence_lines = statement.get("hasEvidenceLines", [])
if evidence_lines:
for el in evidence_lines:
el["evidence_line_id"] = str(uuid.uuid4())
el["evidence_item_ids"] = [ev["id"] for ev in el["hasEvidenceItems"]]

query += """
WITH s
UNWIND $hasEvidenceLines AS el
MERGE (evidence_line:EvidenceLine {id: el.evidence_line_id, direction: el.directionOfEvidenceProvided})
MERGE (s)-[:HAS_EVIDENCE_LINE]->(evidence_line)
WITH evidence_line, el.evidence_item_ids AS evidence_item_ids
UNWIND evidence_item_ids AS evidence_item_id
MERGE (evidence:Statement {id: evidence_item_id})
MERGE (evidence_line)-[:HAS_EVIDENCE_ITEM]->(evidence)
"""

tx.run(query, **statement)


Expand All @@ -488,8 +547,9 @@ def add_transformed_data(driver: Driver, data: dict) -> None:
"""
# Used to keep track of IDs that are in statements. This is used to prevent adding
# nodes that aren't associated to statements
statements = data.get("statements", [])
ids_in_stmts = _get_ids_from_stmts(statements)
statements_evidence = data.get("statements_evidence", [])
statements_assertions = data.get("statements_assertions", [])
ids_in_stmts = _get_ids_from_stmts(statements_evidence + statements_assertions)

with driver.session() as session:
loaded_stmt_count = 0
Expand All @@ -511,8 +571,12 @@ def add_transformed_data(driver: Driver, data: dict) -> None:
session.execute_write(_add_therapy_or_group, tp, ids_in_stmts)

# This should always be done last
for statement in statements:
session.execute_write(_add_statement, statement)
for statement_evidence_item in statements_evidence:
session.execute_write(_add_statement_evidence, statement_evidence_item)
loaded_stmt_count += 1

for statement_assertion in statements_assertions:
session.execute_write(_add_statement_assertion, statement_assertion)
loaded_stmt_count += 1

_logger.info("Successfully loaded %s statements.", loaded_stmt_count)
Expand Down
11 changes: 8 additions & 3 deletions src/metakb/transformers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ga4gh.va_spec.base import Document, Method, TherapyGroup
from ga4gh.vrs.models import Allele
from gene.schemas import NormalizeService as NormalizedGene
from pydantic import BaseModel, StrictStr, ValidationError
from pydantic import BaseModel, Field, StrictStr, ValidationError
from therapy.schemas import NormalizationService as NormalizedTherapy

from metakb import APP_ROOT, DATE_FMT
Expand Down Expand Up @@ -111,11 +111,16 @@ class ViccConceptVocab(BaseModel):
class TransformedData(BaseModel):
"""Define model for transformed data"""

statements: list[
statements_evidence: list[
VariantTherapeuticResponseStudyStatement
| VariantPrognosticStudyStatement
| VariantDiagnosticStudyStatement
] = []
] = Field([], description="Statement objects for evidence records")
statements_assertions: list[
VariantTherapeuticResponseStudyStatement
| VariantPrognosticStudyStatement
| VariantDiagnosticStudyStatement
] = Field([], description="Statement objects for assertion records")
categorical_variants: list[CategoricalVariant] = []
variations: list[Allele] = []
genes: list[MappableConcept] = []
Expand Down
7 changes: 4 additions & 3 deletions src/metakb/transformers/civic.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,8 @@ def _add_variant_study_stmt(
return

evidence_lines = []
evidence_ids = []
for eid in record["evidence_ids"]:
civic_eid = f"civic.eid:{eid}"
evidence_ids.append(civic_eid)
evidence_item = self._evidence_cache.get(civic_eid)
if evidence_item:
evidence_lines.append(
Expand Down Expand Up @@ -411,6 +409,7 @@ def _add_variant_study_stmt(
mappings = [
ConceptMapping(
coding=Coding(
id=statement_id,
code=str(record["id"]),
system="https://civicdb.org/evidence/"
if is_evidence
Expand Down Expand Up @@ -456,7 +455,9 @@ def _add_variant_study_stmt(

if is_evidence:
self._evidence_cache[statement_id] = statement
self.processed_data.statements.append(statement)
self.processed_data.statements_evidence.append(statement)
else:
self.processed_data.statements_assertions.append(statement)

@staticmethod
def _get_classification(amp_level: str) -> MappableConcept | None:
Expand Down
2 changes: 1 addition & 1 deletion src/metakb/transformers/moa.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async def _add_variant_study_stmt(self, assertion: dict) -> None:
stmt_params["proposition"] = VariantPrognosticProposition(**prop_params)
statement = VariantPrognosticStudyStatement(**stmt_params)

self.processed_data.statements.append(statement)
self.processed_data.statements_evidence.append(statement)

async def _add_categorical_variants(self, variants: list[dict]) -> None:
"""Create Categorical Variant objects for all MOA variant records.
Expand Down
Loading

0 comments on commit 7c0db36

Please sign in to comment.