Skip to content

Commit

Permalink
Merge pull request #351 from EBI-Metagenomics/bugfix/metagenomics-exc…
Browse files Browse the repository at this point in the history
…hange-bug-fixes

Fixes for a few bugs I encountered while testing the command
  • Loading branch information
mberacochea authored Feb 19, 2024
2 parents f0a72ff + 1b031df commit c98171c
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 130 deletions.
128 changes: 83 additions & 45 deletions emgapi/management/commands/populate_metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
# limitations under the License.

import logging
from datetime import timedelta

from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone
from django.core.paginator import Paginator
from datetime import timedelta
from django.utils import timezone

from emgapi.models import AnalysisJob
from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI
from emgapi.models import AnalysisJob

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,7 +65,9 @@ def handle(self, *args, **options):
self.dry_run = options.get("dry_run")
self.pipeline_version = options.get("pipeline")

self.mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API)
self.mgx_api = MetagenomicsExchangeAPI(
base_url=settings.METAGENOMICS_EXCHANGE_API
)

# never indexed or updated after indexed
analyses_to_index_and_update = AnalysisJob.objects_for_mgx_indexing.to_add()
Expand Down Expand Up @@ -96,63 +98,87 @@ def handle(self, *args, **options):
def process_to_index_and_update_records(self, analyses_to_index_and_update):
logging.info(f"Indexing {len(analyses_to_index_and_update)} new analyses")

for page in Paginator(analyses_to_index_and_update, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER):
for page in Paginator(
analyses_to_index_and_update,
settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER,
):
jobs_to_update = []
for ajob in page:
for annotation_job in page:
sequence_accession = ""
if annotation_job.run:
sequence_accession = annotation_job.run.accession
if annotation_job.assembly:
sequence_accession = annotation_job.assembly.accession

metadata = self.mgx_api.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
mgya=annotation_job.accession, sequence_accession=sequence_accession
)
registry_id, metadata_match = self.mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
mgya=annotation_job.accession,
sequence_accession=sequence_accession,
metadata=metadata,
)
# The job is not registered
if not registry_id:
logging.info(f"Add new {ajob}")
logging.info(f"Add new {annotation_job}")
if self.dry_run:
logging.info(f"Dry-mode run: no addition to real ME for {ajob}")
logging.info(
f"Dry-mode run: no addition to real ME for {annotation_job}"
)
continue

response = self.mgx_api.add_analysis(
mgya=ajob.accession,
run_accession=ajob.run,
public=not ajob.is_private,
mgya=annotation_job.accession,
sequence_accession=sequence_accession,
)
if response.ok:
logging.info(f"Successfully added {ajob}")
logging.info(f"Successfully added {annotation_job}")
registry_id, metadata_match = self.mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run)
ajob.mgx_accession = registry_id
ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1)
jobs_to_update.append(ajob)
mgya=annotation_job.accession,
sequence_accession=sequence_accession,
)
annotation_job.mgx_accession = registry_id
annotation_job.last_mgx_indexed = timezone.now() + timedelta(
minutes=1
)
jobs_to_update.append(annotation_job)
else:
logging.error(f"Error adding {ajob}: {response.message}")
logging.error(
f"Error adding {annotation_job}: {response.message}"
)

# else we have to check if the metadata matches, if not we need to update it
else:
if not metadata_match:
logging.info(f"Patch existing {ajob}")
logging.info(f"Patch existing {annotation_job}")
if self.dry_run:
logging.info(
f"Dry-mode run: no patch to real ME for {ajob}"
f"Dry-mode run: no patch to real ME for {annotation_job}"
)
continue
if self.mgx_api.patch_analysis(
registry_id=registry_id, data=metadata
registry_id=registry_id, data=metadata
):
logging.info(f"Analysis {ajob} updated successfully")
logging.info(
f"Analysis {annotation_job} updated successfully"
)
# Just to be safe, update the MGX accession
ajob.mgx_accession = registry_id
ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1)
jobs_to_update.append(ajob)
annotation_job.mgx_accession = registry_id
annotation_job.last_mgx_indexed = (
timezone.now() + timedelta(minutes=1)
)
jobs_to_update.append(annotation_job)
else:
logging.error(f"Analysis {ajob} update failed")
logging.error(f"Analysis {annotation_job} update failed")
else:
logging.debug(f"No edit for {ajob}, metadata is correct")
logging.debug(
f"No edit for {annotation_job}, metadata is correct"
)

AnalysisJob.objects.bulk_update(
jobs_to_update, ["last_mgx_indexed", "mgx_accession"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER
jobs_to_update,
["last_mgx_indexed", "mgx_accession"],
batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER,
)

def process_to_delete_records(self, analyses_to_delete):
Expand All @@ -161,36 +187,48 @@ def process_to_delete_records(self, analyses_to_delete):
"""
logging.info(f"Processing {len(analyses_to_delete)} analyses to remove")

for page in Paginator(analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER):
for page in Paginator(
analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER
):
jobs_to_update = []

for ajob in page:
for annotation_job in page:
sequence_accession = ""
if annotation_job.run:
sequence_accession = annotation_job.run.accession
if annotation_job.assembly:
sequence_accession = annotation_job.assembly.accession

metadata = self.mgx_api.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
mgya=annotation_job.accession, sequence_accession=sequence_accession
)
registry_id, _ = self.mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
mgya=annotation_job.accession,
sequence_accession=sequence_accession,
metadata=metadata,
)
if registry_id:
logging.info(f"Deleting {ajob}")
logging.info(f"Deleting {annotation_job}")
if self.dry_run:
logging.info(f"Dry-mode run: no delete from real ME for {ajob}")
logging.info(
f"Dry-mode run: no delete from real ME for {annotation_job}"
)
continue

if self.mgx_api.delete_analysis(registry_id):
logging.info(f"{ajob} successfully deleted")
ajob.last_mgx_indexed = timezone.now()
jobs_to_update.append(ajob)
logging.info(f"{annotation_job} successfully deleted")
annotation_job.last_mgx_indexed = timezone.now()
jobs_to_update.append(annotation_job)
else:
logging.info(f"{ajob} failed on delete")
logging.info(f"{annotation_job} failed on delete")
else:
logging.info(
f"{ajob} doesn't exist in the registry, nothing to delete"
f"{annotation_job} doesn't exist in the registry, nothing to delete"
)

# BULK UPDATE #
AnalysisJob.objects.bulk_update(
jobs_to_update, ["last_mgx_indexed"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER
jobs_to_update,
["last_mgx_indexed"],
batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER,
)
131 changes: 99 additions & 32 deletions emgapi/metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
# limitations under the License.

import logging
import requests

import requests
from django.conf import settings
from requests.exceptions import HTTPError


class MetagenomicsExchangeAPI:
"""Metagenomics Exchange API Client"""

def __init__(self, base_url=None):
self.base_url = base_url or settings.METAGENOMICS_EXCHANGE_API
self.__token = settings.METAGENOMICS_EXCHANGE_API_TOKEN
self.__token = f"mgx {settings.METAGENOMICS_EXCHANGE_API_TOKEN}"
self.broker = settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER

def get_request(self, endpoint: str, params: dict):
Expand Down Expand Up @@ -68,69 +69,135 @@ def patch_request(self, endpoint: str, data: dict):
)
return response

def generate_metadata(self, mgya, run_accession, status):
def generate_metadata(self, mgya, sequence_accession):
"""Generate the metadata object for the Metagenomics Exchange API.
Parameters:
mgya : str
The MGnify Analysis accession.
sequence_accession : str
Either the Run accession or the Assembly accession related to the MGYA.
Returns:
dict
A dictionary containing metadata for the Metagenomics Exchange API.
"""
return {
"confidence": "full",
"endPoint": f"https://www.ebi.ac.uk/metagenomics/analyses/{mgya}",
"method": ["other_metadata"],
"sourceID": mgya,
"sequenceID": run_accession,
"status": status,
"sequenceID": sequence_accession,
"status": "public",
"brokerID": self.broker,
}

def add_analysis(self, mgya: str, run_accession: str, public: bool):
data = self.generate_metadata(mgya, run_accession, public)
response = self.post_request(endpoint="datasets", data=data)
def add_analysis(self, mgya: str, sequence_accession: str):
"""Add an analysis to the M. Exchange
Parameters:
mgya : str
The MGnify Analysis accession.
sequence_accession : str
Either the Run accession or the Assembly accession related to the MGYA.
Returns:
requests.models.Response
The response object from the API request.
"""
data = self.generate_metadata(mgya, sequence_accession)
try:
response = self.post_request(endpoint="datasets", data=data)
except HTTPError as http_error:
try:
response_json = http_error.response.json()
logging.error(f"API response content: {response_json}")
except:
pass
raise http_error
return response

def check_analysis(
self, source_id: str, sequence_id: str, public=None, metadata=None
):
logging.info(f"Check {source_id} {sequence_id}")
params = {}
if public:
params = {
"status": "public" if public else "private",
"broker": self.broker,
}
endpoint = f"sequences/{sequence_id}/datasets"
def check_analysis(self, mgya: str, sequence_accession: str, metadata=None):
"""Check if a sequence exists in the M. Exchange
Parameters:
mgya : str
The MGnify Analysis accession.
sequence_accession : str
Either the Run accession or the Assembly accession related to the MGYA.
Returns:
tuple
A tuple containing two elements:
- analysis_registry_id : str
The analysis registry ID.
- metadata_match : boolean
True, if the metadata matchs.
"""
if not mgya:
raise ValueError(f"mgya is mandatory.")
if not sequence_accession:
raise ValueError(f"sequence_accession is mandatory.")

logging.info(f"Checking {mgya} - {sequence_accession}")

params = {
"broker": self.broker,
}

endpoint = f"sequences/{sequence_accession}/datasets"
analysis_registry_id = None
metadata_match = True
metadata_match = False

try:
response = self.get_request(endpoint=endpoint, params=params)
except:
logging.error(f"Get API request failed")
except HTTPError as http_error:
logging.error(f"Get API request failed. HTTP Error: {http_error}")
try:
response_json = http_error.response.json()
logging.error(f"API response content: {response_json}")
except:
pass
return analysis_registry_id, metadata_match

data = response.json()
datasets = data.get("datasets", [])

# The API will return an emtpy datasets array if it can find the accession
if not len(datasets):
logging.info(f"{source_id} does not exist in ME")
logging.info(f"{mgya} does not exist in ME")
return analysis_registry_id, metadata_match

# TODO: this code needs some refactoring to improve it:
"""
try:
found_record = next(item for item in datasets if item.get("sourceID") == mgya)
except StopIteration
...
"""
sourceIDs = [item.get("sourceID") for item in datasets]
if source_id in sourceIDs:
found_record = [
item for item in datasets if item.get("sourceID") == source_id
][0]
logging.info(f"{source_id} exists in ME")
if mgya in sourceIDs:
found_record = [item for item in datasets if item.get("sourceID") == mgya][
0
]
logging.info(f"{mgya} exists in ME")
analysis_registry_id = found_record.get("registryID")
if not analysis_registry_id:
raise ValueError(f"The Metagenomics Exchange 'registryID' for {mgya} is null.")

if metadata:
for metadata_record in metadata:
if not (metadata_record in found_record):
metadata_match = False
return analysis_registry_id, metadata_match
return analysis_registry_id, False
else:
if metadata[metadata_record] != found_record[metadata_record]:
metadata_match = False
logging.info(
f"Incorrect field {metadata[metadata_record]} != {found_record[metadata_record]})"
f"The metadata doesn't match, for field {metadata[metadata_record]} != {found_record[metadata_record]})"
)
return analysis_registry_id, metadata_match
else:
metadata_match = True
return analysis_registry_id, metadata_match
return analysis_registry_id, metadata_match

return analysis_registry_id, metadata_match
Expand Down
Loading

0 comments on commit c98171c

Please sign in to comment.