Skip to content

Commit

Permalink
Minor refactor of the MGX indexation process WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mberacochea committed Jan 17, 2024
1 parent 89d5560 commit 355bee0
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 154 deletions.
143 changes: 84 additions & 59 deletions emgapi/management/commands/populate_metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# limitations under the License.

import logging
import responses

from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone

from emgapi.models import AnalysisJob
from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI
Expand All @@ -27,8 +27,10 @@

RETRY_COUNT = 5


class Command(BaseCommand):
help = "Check and populate metagenomics exchange (ME)."

def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument(
Expand All @@ -37,7 +39,7 @@ def add_arguments(self, parser):
required=False,
type=str,
help="Study accession list (rather than all)",
nargs='+',
nargs="+",
)
parser.add_argument(
"-p",
Expand All @@ -61,12 +63,6 @@ def add_arguments(self, parser):
required=False,
help="Dry mode, no population of ME",
)
# TODO: do I need it?
parser.add_argument(
"--full",
action="store_true",
help="Do a full check of DB",
)

def generate_metadata(self, mgya, run_accession, status):
return {
Expand All @@ -76,74 +72,103 @@ def generate_metadata(self, mgya, run_accession, status):
"sourceID": mgya,
"sequenceID": run_accession,
"status": status,
"brokerID": settings.MGNIFY_BROKER,
"brokerID": settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER,
}

def handle(self, *args, **options):
self.study_accession = options.get("study")
self.dry_run = options.get("dry_run")
self.pipeline_version = options.get("pipeline")
if options.get("dev"):
base_url = settings.ME_API_DEV
else:
base_url = settings.ME_API
ME = MetagenomicsExchangeAPI(base_url=base_url)

new_analyses = AnalysisJob.objects_for_population.to_add()
removals = AnalysisJob.objects_for_population.to_delete()

mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API)

analyses_to_index = AnalysisJob.objects_for_mgx_indexing.to_add()
analyses_to_delete = AnalysisJob.objects_for_mgx_indexing.to_delete()

if self.study_accession:
new_analyses = new_analyses.filter(study__secondary_accession__in=self.study_accession)
removals = removals.filter(study__secondary_accession__in=self.study_accession)
analyses_to_index = analyses_to_index.filter(
study__secondary_accession__in=self.study_accession
)
analyses_to_delete = analyses_to_delete.filter(
study__secondary_accession__in=self.study_accession
)

if self.pipeline_version:
new_analyses = new_analyses.filter(pipeline__pipeline_id=self.pipeline_version)
removals = removals.filter(pipeline__pipeline_id=self.pipeline_version)
logging.info(f"Processing {len(new_analyses)} new analyses")
for ajob in new_analyses:
metadata = self.generate_metadata(mgya=ajob.accession, run_accession=ajob.run,
status="public" if not ajob.is_private else "private")
registryID, metadata_match = ME.check_analysis(source_id=ajob.accession, sequence_id=ajob.run,
metadata=metadata)
if not registryID:
analyses_to_index = analyses_to_index.filter(
pipeline__pipeline_id=self.pipeline_version
)
analyses_to_delete = analyses_to_delete.filter(pipeline__pipeline_id=self.pipeline_version)

logging.info(f"Indexig {len(analyses_to_index)} new analyses")

jobs_to_update = []

for ajob in analyses_to_index:
metadata = self.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
)
registry_id, metadata_match = mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
# The job is not registered
if not registry_id:
logging.debug(f"Add new {ajob}")
if not self.dry_run:
response = ME.add_analysis(mgya=ajob.accession, run_accession=ajob.run, public=not ajob.is_private)
if response.ok:
logging.debug(f"Added {ajob}")
else:
logging.debug(f"Error adding {ajob}: {response.message}")
else:
if self.dry_run:
logging.info(f"Dry-mode run: no addition to real ME for {ajob}")
continue

response = mgx_api.add_analysis(
mgya=ajob.accession,
run_accession=ajob.run,
public=not ajob.is_private,
)
if response.ok:
logging.debug(f"Added {ajob}")
ajob.last_mgx_indexed = timezone.now()
jobs_to_update.append(ajob)
else:
logging.error(f"Error adding {ajob}: {response.message}")

# else we have to check if the metadata matches, if not we need to update it
else:
if not metadata_match:
logging.debug(f"Patch existing {ajob}")
if not self.dry_run:
if ME.patch_analysis(registry_id=registryID, data=metadata):
logging.info(f"Analysis {ajob} updated successfully")
else:
logging.info(f"Analysis {ajob} update failed")
else:
if self.dry_run:
logging.info(f"Dry-mode run: no patch to real ME for {ajob}")
continue
if mgx_api.patch_analysis(registry_id=registry_id, data=metadata):
logging.info(f"Analysis {ajob} updated successfully")
ajob.last_mgx_indexed = timezone.now()
jobs_to_update.append(ajob)
else:
logging.error(f"Analysis {ajob} update failed")
else:
logging.debug(f"No edit for {ajob}, metadata is correct")

logging.info(f"Processing {len(removals)} analyses to remove")
for ajob in removals:
metadata = self.generate_metadata(mgya=ajob.accession, run_accession=ajob.run,
status="public" if not ajob.is_private else "private")
registryID, _ = ME.check_analysis(source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata)
if registryID:
if not self.dry_run:
if ME.delete_analysis(registryID):
logging.info(f"{ajob} successfully deleted")
else:
logging.info(f"{ajob} failed on delete")
else:
# BULK UPDATE #
AnalysisJob.objects.bulk_update(jobs_to_update, ["last_mgx_indexed"])

logging.info(f"Processing {len(analyses_to_delete)} analyses to remove")
for ajob in analyses_to_delete:
metadata = self.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
)
registry_id, _ = mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
if registry_id:
if self.dry_run:
logging.info(f"Dry-mode run: no delete from real ME for {ajob}")
else:
logging.info(f"No {ajob} in ME, nothing to delete")
logging.info("Done")




if mgx_api.delete_analysis(registry_id):
logging.info(f"{ajob} successfully deleted")
else:
logging.info(f"{ajob} failed on delete")
else:
logging.info(f"{ajob} doesn't exist in the registry, nothing to delete")

logging.info("Done")
4 changes: 2 additions & 2 deletions emgapi/metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class MetagenomicsExchangeAPI:
"""Metagenomics Exchange API Client"""

def __init__(self, base_url=None):
self.base_url = base_url if base_url else settings.ME_API
self.__token = settings.ME_API_TOKEN
self.base_url = base_url if base_url else settings.METAGENOMICS_EXCHANGE_API
self.__token = settings.METAGENOMICS_EXCHANGE_API_TOKEN
self.broker = settings.MGNIFY_BROKER

def get_request(self, endpoint: str, params: dict):
Expand Down
125 changes: 46 additions & 79 deletions emgapi/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ class Meta:
abstract = True


class EbiSearchIndexQueryset(models.QuerySet):
class IndexableModel(models.Model):
last_update = models.DateTimeField(
db_column='LAST_UPDATE',
auto_now=True
)

class IndexableModelQueryset(models.QuerySet):
"""
to_delete: Objects that have been suppressed since they were last indexed,
or that have been indexed but updated since.
Expand All @@ -170,7 +176,8 @@ class EbiSearchIndexQueryset(models.QuerySet):
or that have been indexed but updated since.
"""
def to_delete(self):
updated_after_indexing = Q(last_update__gte=F("last_indexed"), last_indexed__isnull=False)
not_indexed_filter = {f"{self._index_field__isnull}": False}
updated_after_indexing = Q(last_update__gte=F(self._index_field), **not_indexed_filter)

try:
self.model._meta.get_field("suppressed_at")
Expand All @@ -180,11 +187,12 @@ def to_delete(self):
)
else:
return self.filter(
Q(suppressed_at__gte=F("last_indexed")) | updated_after_indexing
Q(suppressed_at__gte=F(self._index_field)) | updated_after_indexing
)

def to_add(self):
updated_after_indexing = Q(last_update__gte=F("last_indexed"), last_indexed__isnull=False)
not_indexed_filter = {f"{self._index_field__isnull}": False}
updated_after_indexing = Q(last_update__gte=F(self._index_field), **not_indexed_filter)
never_indexed = Q(last_indexed__isnull=True)

try:
Expand All @@ -204,19 +212,43 @@ def to_add(self):
return self.filter(never_indexed | updated_after_indexing, not_suppressed, not_private)


class EbiSearchIndexedModel(models.Model):
last_update = models.DateTimeField(
db_column='LAST_UPDATE',
auto_now=True
)
last_indexed = models.DateTimeField(
db_column='LAST_INDEXED',

class EBISearchIndexQueryset(IndexableModelQueryset):

_index_field = "last_ebi_search_indexed"


class EBISearchIndexedModel(IndexableModel):

last_ebi_search_indexed = models.DateTimeField(
db_column='LAST_EBI_SEARCH_INDEXED',
null=True,
blank=True,
help_text="Date at which this model was last included in an EBI Search initial/incremental index."
)

objects_for_indexing = EbiSearchIndexQueryset.as_manager()
objects_for_ebisearch_indexing = EBISearchIndexQueryset.as_manager()

class Meta:
abstract = True


class MetagenomicsExchangeQueryset(IndexableModelQueryset):

_index_field = "last_mgx_indexed"


class MetagenomicsExchangeIndexedModel(IndexableModel):
"""Model to track Metagenomics Exchange indexation of analysis jobs
"""
last_mgx_indexed = models.DateTimeField(
db_column='LAST_MGX_INDEXED',
null=True,
blank=True,
help_text="Date at which this model was last indexed in the Metagenomics Exchange"
)

objects_for_mgx_indexing = MetagenomicsExchangeQueryset.as_manager()

class Meta:
abstract = True
Expand Down Expand Up @@ -904,7 +936,7 @@ def mydata(self, request):
return self.get_queryset().mydata(request)


class Study(ENASyncableModel, EbiSearchIndexedModel):
class Study(ENASyncableModel, EBISearchIndexedModel):

def __init__(self, *args, **kwargs):
super(Study, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -1505,71 +1537,6 @@ class Meta:
def __str__(self):
return 'Assembly:{} - Sample:{}'.format(self.assembly, self.sample)


class MetagenomicsExchangeQueryset(models.QuerySet):
"""
to_delete: Objects that have been suppressed since they were last populated,
or that have been added but updated since.
to_add: Objects that have never been added,
or that have been added but updated since.
"""
def to_delete(self):
updated_after_populating = Q(last_updated_me__gte=F("last_populated_me"), last_populated_me__isnull=False)

try:
self.model._meta.get_field("suppressed_at")
except FieldDoesNotExist:
return self.filter(
updated_after_populating
)
else:
return self.filter(
Q(suppressed_at__gte=F("last_populated_me"))
)

def to_add(self):
updated_after_populating = Q(last_updated_me__gte=F("last_populated_me"), last_populated_me__isnull=False)
never_populated = Q(last_populated_me__isnull=True)

try:
self.model._meta.get_field("is_suppressed")
except FieldDoesNotExist:
not_suppressed = Q()
else:
not_suppressed = Q(is_suppressed=False)

try:
self.model._meta.get_field("is_private")
except FieldDoesNotExist:
not_private = Q()
else:
not_private = Q(is_private=False)

return self.filter(never_populated | updated_after_populating, not_suppressed, not_private)


class MetagenomicsExchangeModel(models.Model):
"""Model to track Metagenomics Exchange population
https://www.ebi.ac.uk/ena/registry/metagenome/api/
"""
last_updated_me = models.DateTimeField(
db_column='LAST_UPDATED_ME',
auto_now=True
)
last_populated_me = models.DateTimeField(
db_column='LAST_POPULATED_ME',
null=True,
blank=True,
help_text="Date at which this model was last appeared in Metagenomics Exchange"
)

objects_for_population = MetagenomicsExchangeQueryset.as_manager()

class Meta:
abstract = True


class AnalysisJobQuerySet(BaseQuerySet, MySQLQuerySet, SuppressQuerySet):

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -1686,7 +1653,7 @@ class MetagenomicsExchange(models.Model):
last_update = models.DateTimeField(db_column='LAST_UPDATE', auto_now=True)


class AnalysisJob(SuppressibleModel, PrivacyControlledModel, EbiSearchIndexedModel, MetagenomicsExchangeModel):
class AnalysisJob(SuppressibleModel, PrivacyControlledModel, EBISearchIndexedModel, MetagenomicsExchangeModel):
def __init__(self, *args, **kwargs):
super(AnalysisJob, self).__init__(*args, **kwargs)
setattr(self, 'accession',
Expand Down
Loading

0 comments on commit 355bee0

Please sign in to comment.