From f4b3af6ed1ec074eb333ba7f77729e488c76424b Mon Sep 17 00:00:00 2001 From: Martin Beracochea Date: Wed, 28 Apr 2021 17:22:26 +0100 Subject: [PATCH 1/2] Fix for private/public analysis issue. Also initial refactor of the auth mechanism. --- emgapi/models.py | 153 ++++-- tests/api/test_permissions.py | 526 ++++++++++++++++++--- tests/webuploader/test_import_checksums.py | 2 +- 3 files changed, 577 insertions(+), 104 deletions(-) diff --git a/emgapi/models.py b/emgapi/models.py index fccc57d75..858ea5e48 100644 --- a/emgapi/models.py +++ b/emgapi/models.py @@ -51,9 +51,23 @@ def __init__(self, **kwargs): class BaseQuerySet(models.QuerySet): """Auth mechanism to filter private models """ + # TODO: the QuerySet should not have to handle the request + # if should recieve the username + # move the requests bits to the filters and serializers as needed def available(self, request=None): - + """ + Filter data based on the status or other properties. + Status table: + 1 draft + 2 private + 3 cancelled + 4 public + 5 suppressed + 6 killed + 7 temporary_suppressed + 8 temporary_killed + """ _query_filters = { 'StudyQuerySet': { 'all': [Q(is_public=1), ], @@ -74,18 +88,11 @@ def available(self, request=None): Q(status_id=4), ], }, - 'AnalysisJobQuerySet': { - 'all': [ - # TMP: IS_PUBLIC = 5 is suppressed - ~Q(sample__is_public=5), - Q(run__status_id=4) | Q(assembly__status_id=4), - Q(analysis_status_id=3) | Q(analysis_status_id=6) - ], - }, 'AnalysisJobDownloadQuerySet': { 'all': [ # TMP: IS_PUBLIC = 5 is suppressed ~Q(job__sample__is_public=5), + Q(job__study__is_public=1), Q(job__run__status_id=4) | Q(job__assembly__status_id=4), Q(job__analysis_status_id=3) | Q(job__analysis_status_id=6) ], @@ -108,33 +115,29 @@ def available(self, request=None): [Q(samples__studies__submission_account_id=_username, status_id=2) | Q(status_id=4)] - _query_filters['AnalysisJobQuerySet']['authenticated'] = \ - [Q(study__submission_account_id=_username, - run__status_id=2) | - Q(study__submission_account_id=_username, - assembly__status_id=2) | - Q(run__status_id=4) | Q(assembly__status_id=4)] _query_filters['AnalysisJobDownloadQuerySet']['authenticated'] = \ [Q(job__study__submission_account_id=_username, job__run__status_id=2) | Q(job__study__submission_account_id=_username, job__assembly__status_id=2) | Q(job__run__status_id=4) | Q(job__assembly__status_id=4)] + + filters = _query_filters.get(self.__class__.__name__) + + if filters: + return self._apply_filters(filters, request) + return self + def _apply_filters(self, filters, request): + """Apply the QS filters for "all" and "authenticated" users + """ q = list() - try: - _instance = _query_filters[self.__class__.__name__] - if isinstance(self, self.__class__): - if request is not None and request.user.is_authenticated(): - if not request.user.is_superuser: - q.extend(_instance['authenticated']) - else: - q.extend(_instance['all']) - return self.distinct().filter(*q) - except KeyError: - pass - # TODO: should return nothing - return self + if request is not None and request.user.is_authenticated(): + if not request.user.is_superuser: + q.extend(filters['authenticated']) + else: + q.extend(filters['all']) + return self.distinct().filter(*q) class PipelineTool(models.Model): @@ -171,12 +174,13 @@ def multiple_pk(self): class PipelineManager(models.Manager): def get_queryset(self): - return BaseQuerySet(self.model, using=self._db) + return super().get_queryset() class PipelineAnnotatedManager(models.Manager): def get_queryset(self): - return BaseQuerySet(self.model, using=self._db) \ + return super() \ + .get_queryset() \ .annotate( analyses_count=Count( 'analyses', filter=( @@ -240,6 +244,26 @@ class Meta: class AnalysisStatus(models.Model): + """ + Current values: + 1 scheduled + 2 running + 3 completed + 4 failed + 5 suppressed + 6 QC not passed + 7 Unable to process + 8 unknown + """ + SCHEDULED = 1 + RUNNING = 2 + COMPLETED = 3 + FAILED = 4 + SUPPRESSED = 5 + QC_NOT_PASSED = 6 + UNABLE_TO_PROCESS = 7 + UNKNOWN = 8 + analysis_status_id = models.AutoField( db_column='ANALYSIS_STATUS_ID', primary_key=True) analysis_status = models.CharField( @@ -506,7 +530,7 @@ class AnalysisJobDownloadQuerySet(BaseQuerySet): class AnalysisJobDownloadManager(models.Manager): def get_queryset(self): - return BaseQuerySet(self.model, using=self._db).\ + return AnalysisJobDownloadQuerySet(self.model, using=self._db).\ select_related( 'job', 'pipeline', @@ -840,6 +864,11 @@ def available(self, request, prefetch=False): class Sample(models.Model): + # is_public possible values + PUBLIC = 1 + PRIVATE = 0 + SUPPRESSED = 5 # TODO: this should not be in is_public + sample_id = models.AutoField( db_column='SAMPLE_ID', primary_key=True) accession = models.CharField( @@ -1006,6 +1035,19 @@ def __str__(self): class Status(models.Model): + """Status + Current values as constants to make + the code easier to read + """ + DRAFT = 1 + PRIVATE = 2 + CANCELLED = 3 + PUBLIC = 4 + SUPRESSED = 5 + KILLED = 6 + TEMPORARY_SUPPRESSED = 7 + TEMPORARY_KILLED = 8 + status_id = models.SmallIntegerField( db_column='STATUS_ID', primary_key=True) status = models.CharField( @@ -1019,9 +1061,13 @@ def __str__(self): return self.status +class RunQuerySet(BaseQuerySet): + pass + + class RunManager(models.Manager): def get_queryset(self): - return BaseQuerySet(self.model, using=self._db) \ + return RunQuerySet(self.model, using=self._db) \ .select_related( 'status_id', 'sample', @@ -1158,11 +1204,46 @@ def __str__(self): return 'Assembly:{} - Sample:{}'.format(self.assembly, self.sample) +class AnalysisJobQuerySet(BaseQuerySet): + def available(self, request=None): + """Override BaseQuerySet with the AnalysisJob auth rules + Use cases + - all | has access to public analyses + - authenticated | has access to public and private analyses they own + + Filtered out analyses for SUPPRESSED samples + """ + query_filters = { + "all": [ + Q(study__is_public=1), + ~Q(sample__is_public=Sample.SUPPRESSED), + Q(run__status_id=Status.PUBLIC) | Q(assembly__status_id=Status.PUBLIC), + Q(analysis_status_id=AnalysisStatus.COMPLETED) + | Q(analysis_status_id=AnalysisStatus.QC_NOT_PASSED), + ], + } + + if request is not None and request.user.is_authenticated(): + username = request.user.username + query_filters["authenticated"] = [ + ~Q(sample__is_public=Sample.SUPPRESSED), + Q(study__submission_account_id=username, run__status_id=Status.PRIVATE) + | Q( + study__submission_account_id=username, + assembly__status_id=Status.PRIVATE, + ) + | Q(run__status_id=Status.PUBLIC) + | Q(assembly__status_id=Status.PUBLIC) + ] + + return self._apply_filters(query_filters, request) + + class AnalysisJobManager(models.Manager): def get_queryset(self): _qs = AnalysisJobAnn.objects.all() \ .select_related('var') - return BaseQuerySet(self.model, using=self._db) \ + return AnalysisJobQuerySet(self.model, using=self._db) \ .select_related( 'pipeline', 'analysis_status', @@ -1601,10 +1682,14 @@ class Meta: db_table = 'GENOME_GEOGRAPHIC_RANGE' +class ReleaseManagerQuerySet(BaseQuerySet): + pass + + class ReleaseManager(models.Manager): def get_queryset(self): - qs = BaseQuerySet(self.model, using=self._db) + qs = ReleaseManagerQuerySet(self.model, using=self._db) return qs.annotate(genome_count=Count('genomes')) def available(self, request): diff --git a/tests/api/test_permissions.py b/tests/api/test_permissions.py index 8770a717d..92e419967 100644 --- a/tests/api/test_permissions.py +++ b/tests/api/test_permissions.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright 2020 EMBL - European Bioinformatics Institute +# Copyright 2017-2021 EMBL - European Bioinformatics Institute # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,69 +16,219 @@ import pytest - from django.core.urlresolvers import reverse - -from rest_framework import status - +from emgapi import models as emg_models from model_bakery import baker - -from test_utils.emg_fixtures import * # noqa +from rest_framework import status +from test_utils.emg_fixtures import * # noqa @pytest.mark.django_db -class TestPermissionsAPI(object): - +class TestPermissionsAPI: @pytest.fixture(autouse=True) - def setup_method(self, db): - _biome = baker.make('emgapi.Biome', biome_name="foo", - lineage="root:foo", pk=123) + def setup_method(self, db, pipelines, experiment_type, analysis_status): + biome = baker.make("emgapi.Biome", biome_name="foo", lineage="root:foo", pk=123) + pipeline_v5 = pipelines.filter(release_version="5.0")[0] # Webin-000 public - baker.make("emgapi.Study", pk=111, secondary_accession="SRP0111", - is_public=1, submission_account_id='Webin-000', - biome=_biome) - baker.make("emgapi.Study", pk=112, secondary_accession="SRP0112", - is_public=1, submission_account_id='Webin-000', - biome=_biome) + webin_zero_pub_one = baker.make( + "emgapi.Study", + pk=111, + secondary_accession="SRP0111", + is_public=1, + submission_account_id="Webin-000", + biome=biome, + ) + webin_zero_pub_two = baker.make( + "emgapi.Study", + pk=112, + secondary_accession="SRP0112", + is_public=1, + submission_account_id="Webin-000", + biome=biome, + ) # Webin-000 private - baker.make("emgapi.Study", pk=113, secondary_accession="SRP0113", - is_public=0, submission_account_id='Webin-000', - biome=_biome) + webin_zero_priv = baker.make( + "emgapi.Study", + pk=113, + secondary_accession="SRP0113", + is_public=0, + submission_account_id="Webin-000", + biome=biome, + ) # Webin-111 public - baker.make("emgapi.Study", pk=114, secondary_accession="SRP0114", - is_public=1, submission_account_id='Webin-111', - biome=_biome) + webin_one_pub = baker.make( + "emgapi.Study", + pk=114, + secondary_accession="SRP0114", + is_public=1, + submission_account_id="Webin-111", + biome=biome, + ) # Webin-111 private - baker.make("emgapi.Study", pk=115, secondary_accession="SRP0115", - is_public=0, submission_account_id='Webin-111', - biome=_biome) + webin_one_priv = baker.make( + "emgapi.Study", + pk=115, + secondary_accession="SRP0115", + is_public=0, + submission_account_id="Webin-111", + biome=biome, + ) # unknown public - baker.make("emgapi.Study", pk=120, secondary_accession="SRP0120", - is_public=1, submission_account_id=None, biome=_biome) + baker.make( + "emgapi.Study", + pk=120, + secondary_accession="SRP0120", + is_public=1, + submission_account_id=None, + biome=biome, + ) # unknown private - baker.make("emgapi.Study", pk=121, secondary_accession="SRP0121", - is_public=0, submission_account_id=None, biome=_biome) + baker.make( + "emgapi.Study", + pk=121, + secondary_accession="SRP0121", + is_public=0, + submission_account_id=None, + biome=biome, + ) + + # public run (needeed for auth filtering) + public_status = baker.make( + "emgapi.Status", + pk=4, + status="public", + ) + public_run = baker.make( + "emgapi.Run", + run_id=1234, + accession="ABC01234", + status_id=public_status, + experiment_type=experiment_type, + ) + + # private (needeed for auth filtering) + private_status = baker.make( + "emgapi.Status", + pk=2, + status="private", + ) + run_private = baker.make( + "emgapi.Run", + run_id=9821, + accession="ABC01234", + status_id=private_status, + experiment_type=experiment_type, + ) + + # public analyses + public_id = 1 + # public analyses (ids) + # - MGYA00000001 + # - MGYA00000002 + # - MGYA00000003 + # AJ Downloads (ids) + # - id_1 + # - id_2 + # - id_3 + for study in [webin_zero_pub_one, webin_zero_pub_two, webin_one_pub]: + aj = baker.make( + "emgapi.AnalysisJob", + pk=public_id, + study=study, + analysis_status=analysis_status, # 3 + run_status_id=emg_models.Status.PUBLIC, + run=public_run, + experiment_type=experiment_type, + pipeline=pipeline_v5, + ) + baker.make( + "emgapi.AnalysisJobDownload", + pk=public_id, + job=aj, + pipeline=pipeline_v5, + alias="id_" + str(public_id), + ) + public_id += 1 + + # private analyses + private_id = 10 + # private analyses (ids) + # - MGYA00000010 - Webin-000 + # - MGYA00000011 - Webin-111 + # AJ Downloads (ids) + # - id_10 - Webin-000 + # - id_11 - Webin-111 + for study in [webin_zero_priv, webin_one_priv]: + aj = baker.make( + "emgapi.AnalysisJob", + pk=private_id, + study=study, + analysis_status=analysis_status, # 3 + run_status_id=emg_models.Status.PRIVATE, + run=run_private, + experiment_type=experiment_type, + pipeline=pipeline_v5, + ) + baker.make( + "emgapi.AnalysisJobDownload", + pk=private_id, + job=aj, + pipeline=pipeline_v5, + alias="id_" + str(private_id), + ) + private_id += 1 + + # AnalysisJob with run_status_id = Public + # but in a private study (should not be visible) + # belongs to Webin-000 + baker.make( + "emgapi.AnalysisJob", + pk=999, + study=webin_zero_priv, + analysis_status=analysis_status, # 3 + run_status_id=emg_models.Status.PUBLIC, + run=run_private, + experiment_type=experiment_type, + pipeline=pipeline_v5, + ) @pytest.mark.parametrize( - 'view, username, count, ids, bad_ids', + "view, username, count, ids, bad_ids", [ # private - ('emgapi_v1:studies-list', 'Webin-111', 5, - ['MGYS00000111', 'MGYS00000112', 'MGYS00000114', 'MGYS00000115', - 'MGYS00000120'], - ['MGYS00000113', 'MGYS00000121']), + ( + "emgapi_v1:studies-list", + "Webin-111", + 5, + [ + "MGYS00000111", + "MGYS00000112", + "MGYS00000114", + "MGYS00000115", + "MGYS00000120", + ], + ["MGYS00000113", "MGYS00000121"], + ), # mydata - ('emgapi_v1:mydata-list', 'Webin-111', 2, - ['MGYS00000114', 'MGYS00000115'], - []), + ( + "emgapi_v1:mydata-list", + "Webin-111", + 2, + ["MGYS00000114", "MGYS00000115"], + [], + ), # public - ('emgapi_v1:studies-list', None, 4, - ['MGYS00000111', 'MGYS00000112', 'MGYS00000114', 'MGYS00000120'], - ['MGYS00000113', 'MGYS00000115', 'MGYS00000121']), - ] + ( + "emgapi_v1:studies-list", + None, + 4, + ["MGYS00000111", "MGYS00000112", "MGYS00000114", "MGYS00000120"], + ["MGYS00000113", "MGYS00000115", "MGYS00000121"], + ), + ], ) def test_list(self, apiclient, view, username, count, ids, bad_ids): auth = None @@ -88,9 +238,10 @@ def test_list(self, apiclient, view, username, count, ids, bad_ids): "password": "secret", } rsp = apiclient.post( - reverse('obtain_jwt_token_v1'), data=data, format='json') - token = rsp.json()['data']['token'] - auth = 'Bearer {}'.format(token) + reverse("obtain_jwt_token_v1"), data=data, format="json" + ) + token = rsp.json()["data"]["token"] + auth = "Bearer {}".format(token) url = reverse(view) if auth is not None: @@ -101,49 +252,286 @@ def test_list(self, apiclient, view, username, count, ids, bad_ids): rsp = response.json() # Meta - assert rsp['meta']['pagination']['page'] == 1 - assert rsp['meta']['pagination']['pages'] == 1 - assert rsp['meta']['pagination']['count'] == count + assert rsp["meta"]["pagination"]["page"] == 1 + assert rsp["meta"]["pagination"]["pages"] == 1 + assert rsp["meta"]["pagination"]["count"] == count # Data - assert len(rsp['data']) == count - assert set(ids) - set([d['id'] for d in rsp['data']]) == set() + assert len(rsp["data"]) == count + assert set(ids) - set([d["id"] for d in rsp["data"]]) == set() ids.extend(bad_ids) - assert set(ids) - set([d['id'] for d in rsp['data']]) == set(bad_ids) + assert set(ids) - set([d["id"] for d in rsp["data"]]) == set(bad_ids) def test_detail(self, apiclient): data = { "username": "Webin-000", "password": "secret", } - rsp = apiclient.post( - reverse('obtain_jwt_token_v1'), data=data, format='json') - token = rsp.json()['data']['token'] + rsp = apiclient.post(reverse("obtain_jwt_token_v1"), data=data, format="json") + token = rsp.json()["data"]["token"] - url = reverse("emgapi_v1:studies-detail", args=['SRP0113']) - response = apiclient.get( - url, HTTP_AUTHORIZATION='Bearer {}'.format(token)) + url = reverse("emgapi_v1:studies-detail", args=["SRP0113"]) + response = apiclient.get(url, HTTP_AUTHORIZATION="Bearer {}".format(token)) assert response.status_code == status.HTTP_200_OK rsp = response.json() - assert rsp['data']['id'] == 'MGYS00000113' + assert rsp["data"]["id"] == "MGYS00000113" - url = reverse("emgapi_v1:studies-detail", args=['MGYS00000115']) - response = apiclient.get( - url, HTTP_AUTHORIZATION='Bearer {}'.format(token)) + url = reverse("emgapi_v1:studies-detail", args=["MGYS00000115"]) + response = apiclient.get(url, HTTP_AUTHORIZATION="Bearer {}".format(token)) assert response.status_code == status.HTTP_404_NOT_FOUND - url = reverse("emgapi_v1:studies-detail", args=['MGYS00000121']) - response = apiclient.get( - url, HTTP_AUTHORIZATION='Bearer {}'.format(token)) + url = reverse("emgapi_v1:studies-detail", args=["MGYS00000121"]) + response = apiclient.get(url, HTTP_AUTHORIZATION="Bearer {}".format(token)) assert response.status_code == status.HTTP_404_NOT_FOUND - @pytest.mark.parametrize('accession', [ - 'MGYS00000113', 'MGYS00000115', 'MGYS00000121', - 'SRP0113', 'SRP0115', 'SRP0121' - ]) + @pytest.mark.parametrize( + "accession", + [ + "MGYS00000113", + "MGYS00000115", + "MGYS00000121", + "SRP0113", + "SRP0115", + "SRP0121", + ], + ) def test_not_found(self, apiclient, accession): url = reverse("emgapi_v1:studies-detail", args=[accession]) response = apiclient.get(url) assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_private_analyses(self, apiclient): + """Test that the analyses list endpoint returns public and + the private analyses that correspond to a particular user. + """ + data = { + "username": "Webin-000", + "password": "secret", + } + rsp = apiclient.post(reverse("obtain_jwt_token_v1"), data=data, format="json") + token = rsp.json()["data"]["token"] + auth = "Bearer {}".format(token) + + url = reverse("emgapi_v1:analyses-list") + + response = apiclient.get(url, HTTP_AUTHORIZATION=auth) + assert response.status_code == status.HTTP_200_OK + + response_data = response.json() + count = 5 + + assert response_data["meta"]["pagination"]["page"] == 1 + assert response_data["meta"]["pagination"]["pages"] == 1 + assert response_data["meta"]["pagination"]["count"] == count + assert len(response_data["data"]) == count + assert ( + set( + [ + "MGYA00000001", + "MGYA00000002", + "MGYA00000003", + "MGYA00000010", + "MGYA00000999", # AJ with tagged as Public but with private Study + ] + ) + == set([d["attributes"]["accession"] for d in response_data["data"]]) + ) + + def test_private_analyses_anonymous(self, apiclient): + """Test that the analyses list endpoint is filtering out + private analyses for anonymous users + """ + url = reverse("emgapi_v1:analyses-list") + response = apiclient.get(url) + assert response.status_code == status.HTTP_200_OK + + rsp = response.json() + count = 3 + + assert rsp["meta"]["pagination"]["page"] == 1 + assert rsp["meta"]["pagination"]["pages"] == 1 + assert rsp["meta"]["pagination"]["count"] == count + assert len(rsp["data"]) == count + assert ( + set( + [ + "MGYA00000001", + "MGYA00000002", + "MGYA00000003", + ] + ) + == set([d["attributes"]["accession"] for d in rsp["data"]]) + ) + + @pytest.mark.parametrize("username", ["Webin-000", "Webin-000", None]) + @pytest.mark.parametrize("analysis", ["MGYA00000001", "MGYA00000002"]) + def test_public_analysis(self, apiclient, username, analysis): + """Test that anonymous and authenticated users can + access public analysis. + """ + auth_token = None + url = reverse("emgapi_v1:analyses-detail", args=[analysis]) + if username: + data = { + "username": username, + "password": "secret", + } + token_response = apiclient.post( + reverse("obtain_jwt_token_v1"), data=data, format="json" + ) + token = token_response.json()["data"]["token"] + auth_token = "Bearer {}".format(token) + + if auth_token: + response = apiclient.get(url, HTTP_AUTHORIZATION=auth_token) + else: + response = apiclient.get(url) + assert response.status_code == status.HTTP_200_OK + + response_data = response.json() + attributes = response_data["data"]["attributes"] + assert attributes["analysis-status"] == "3" + assert attributes["accession"] == analysis + assert attributes["experiment-type"] == "metagenomic" + + @pytest.mark.parametrize( + "username,accession_and_expected_status", + [ + ( + "Webin-000", + ["MGYA00000010", status.HTTP_200_OK], + ), + ( + "Webin-000", + ["MGYA00000011", status.HTTP_404_NOT_FOUND], + ), + ( + "Webin-111", + ["MGYA00000011", status.HTTP_200_OK], + ), + ( + "Webin-111", + ["MGYA00000010", status.HTTP_404_NOT_FOUND], + ), + # anonymous + ( + None, + ["MGYA00000011", status.HTTP_404_NOT_FOUND], + ), + ( + None, + ["MGYA00000010", status.HTTP_404_NOT_FOUND], + ), + ], + ) + def test_private_analysis_detail( + self, apiclient, username, accession_and_expected_status + ): + """Test that only authenticated users can their private analysis.""" + analysis, expected_status = accession_and_expected_status + url = reverse("emgapi_v1:analyses-detail", args=[analysis]) + + auth_token = None + if username: + data = { + "username": username, + "password": "secret", + } + token_response = apiclient.post( + reverse("obtain_jwt_token_v1"), data=data, format="json" + ) + token = token_response.json()["data"]["token"] + auth_token = "Bearer {}".format(token) + + if auth_token: + response = apiclient.get( + url, args=[analysis], HTTP_AUTHORIZATION=auth_token + ) + else: + response = apiclient.get(url) + assert response.status_code == expected_status + + if expected_status == status.HTTP_200_OK: + response_data = response.json() + assert response_data["data"]["attributes"]["accession"] == analysis + + @pytest.mark.parametrize( + "username_result", [("Webin-000", 1), ("Webin-111", 0), (None, 0)] + ) + def test_private_analysis_download_list(self, apiclient, username_result): + """Test that private analysis job download list is avaiable for the users + that own the analysis + """ + username, count = username_result + url = reverse("emgapi_v1:analysisdownload-list", args=["MGYA00000010"]) + + if username: + data = { + "username": username, + "password": "secret", + } + token_response = apiclient.post( + reverse("obtain_jwt_token_v1"), data=data, format="json" + ) + token = token_response.json()["data"]["token"] + auth_token = "Bearer {}".format(token) + + if username: + response = apiclient.get(url, HTTP_AUTHORIZATION=auth_token) + else: + response = apiclient.get(url) + + assert response.status_code == status.HTTP_200_OK + + response_data = response.json() + + assert response_data["meta"]["pagination"]["page"] == 1 + assert response_data["meta"]["pagination"]["pages"] == 1 + assert response_data["meta"]["pagination"]["count"] == count + assert len(response_data["data"]) == count + + @pytest.mark.parametrize( + "test_data", + [ + # username, analysis, analysis_job, expected_response_status + # privates + ("Webin-000", "MGYA00000010", "id_10", status.HTTP_200_OK), + ("Webin-111", "MGYA00000011", "id_11", status.HTTP_200_OK), + # private - wrong MGYA - AJ download id + ("Webin-111", "MGYA00000011", "id_10", status.HTTP_404_NOT_FOUND), + # private - but for a different user + ("Webin-111", "MGYA00000010", "id_10", status.HTTP_404_NOT_FOUND), + # public - logged in + ("Webin-111", "MGYA00000001", "id_1", status.HTTP_200_OK), + # public - anonymous + (None, "MGYA00000002", "id_2", status.HTTP_200_OK), + # private - anonymous + (None, "MGYA00000010", "id_10", status.HTTP_404_NOT_FOUND), + ], + ) + def test_private_analysis_download_detail(self, apiclient, test_data): + """Test that private analysis job download detail page is avaiable + for the users that own the analysis (or anyone if public) + """ + username, aj_accession, ad_id, expected_status = test_data + url = reverse("emgapi_v1:analysisdownload-detail", args=[aj_accession, ad_id]) + + if username: + data = { + "username": username, + "password": "secret", + } + token_response = apiclient.post( + reverse("obtain_jwt_token_v1"), data=data, format="json" + ) + token = token_response.json()["data"]["token"] + auth_token = "Bearer {}".format(token) + + if username: + response = apiclient.get(url, HTTP_AUTHORIZATION=auth_token) + else: + response = apiclient.get(url) + + assert response.status_code == expected_status diff --git a/tests/webuploader/test_import_checksums.py b/tests/webuploader/test_import_checksums.py index 272e7868d..bc9a6b15f 100644 --- a/tests/webuploader/test_import_checksums.py +++ b/tests/webuploader/test_import_checksums.py @@ -75,7 +75,7 @@ def test_empty_checksums(self, client, pipelines, analysis_results): @pytest.mark.django_db def test_import_checksums(self, client, pipelines, analysis_results): - """Assert tha the import worked for genome "" + """Assert that the import worked for genome """ fasta_gz, _ = emg_models.FileFormat.objects.get_or_create(format_name="FASTA", compression=True) tsv, _ = emg_models.FileFormat.objects.get_or_create(format_name="TSV", compression=False) From 37538d417bfc9402a526107270bb84052111aaad Mon Sep 17 00:00:00 2001 From: Martin Beracochea Date: Thu, 29 Apr 2021 18:38:27 +0100 Subject: [PATCH 2/2] Amend test permissions int to constants --- tests/api/test_permissions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/api/test_permissions.py b/tests/api/test_permissions.py index 92e419967..a3b8394b6 100644 --- a/tests/api/test_permissions.py +++ b/tests/api/test_permissions.py @@ -98,7 +98,7 @@ def setup_method(self, db, pipelines, experiment_type, analysis_status): # public run (needeed for auth filtering) public_status = baker.make( "emgapi.Status", - pk=4, + pk=emg_models.Status.PUBLIC, status="public", ) public_run = baker.make( @@ -112,7 +112,7 @@ def setup_method(self, db, pipelines, experiment_type, analysis_status): # private (needeed for auth filtering) private_status = baker.make( "emgapi.Status", - pk=2, + pk=emg_models.Status.PRIVATE, status="private", ) run_private = baker.make(