Skip to content

Commit

Permalink
unit tests for task APIs (#2168)
Browse files Browse the repository at this point in the history
  • Loading branch information
salabh-aot authored Apr 30, 2024
1 parent 83ab15a commit 5ba4608
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 14 deletions.
4 changes: 3 additions & 1 deletion epictrack-api/src/api/resources/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

from flask import jsonify, request
from flask_restx import Namespace, Resource, cors

from api.schemas import request as req
from api.schemas import response as res
from api.services import TaskService
from api.utils import auth, profiletime
from api.utils.util import cors_preflight


API = Namespace("tasks", description="Tasks")


Expand Down Expand Up @@ -99,7 +101,7 @@ def put(event_id):
def get(event_id):
"""Gets the task event"""
req.TaskEventIdPathParameterSchema().load(request.view_args)
task_event = TaskService.find_task_event(event_id)
task_event = TaskService.find_task_event(event_id, exclude_deleted=True)
return res.TaskEventResponseSchema().dump(task_event), HTTPStatus.OK


Expand Down
26 changes: 15 additions & 11 deletions epictrack-api/src/api/services/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
from sqlalchemy import and_, tuple_
from sqlalchemy.orm import contains_eager, lazyload

from api.exceptions import UnprocessableEntityError
from api.exceptions import ResourceNotFoundError, UnprocessableEntityError
from api.models import StaffWorkRole, StatusEnum, TaskEvent, TaskEventAssignee, WorkPhase, db
from api.models.task_event_responsibility import TaskEventResponsibility
from . import authorisation

from ..utils.roles import Membership
from ..utils.roles import Role as KeycloakRole
from . import authorisation
from .task_template import TaskTemplateService
from ..utils.roles import Membership, Role as KeycloakRole


class TaskService:
Expand Down Expand Up @@ -82,9 +83,7 @@ def update_task_event(cls, data: dict, task_event_id) -> TaskEvent:
)
task_event.update(data, commit=False)
cls._handle_assignees(data.get("assignee_ids"), [task_event.id])
cls._handle_responsibilities(
data.get("responsibility_ids"), [task_event.id]
)
cls._handle_responsibilities(data.get("responsibility_ids"), [task_event.id])
db.session.commit()
return task_event

Expand All @@ -105,9 +104,9 @@ def find_task_events(cls, work_phase_id: int) -> [TaskEvent]:
)

@classmethod
def find_task_event(cls, event_id: int) -> TaskEvent:
def find_task_event(cls, event_id: int, exclude_deleted: bool = False) -> TaskEvent:
"""Get the task event"""
return (
query = (
db.session.query(TaskEvent)
.outerjoin(
TaskEventAssignee,
Expand All @@ -117,9 +116,14 @@ def find_task_event(cls, event_id: int) -> TaskEvent:
),
)
.filter(TaskEvent.id == event_id)
.options(contains_eager(TaskEvent.assignees))
.scalar()
)
if exclude_deleted:
query = query.filter(TaskEvent.is_deleted.is_(False))
task_event = query.options(contains_eager(TaskEvent.assignees)).scalar()
print(f"Task event in service {task_event}")
if task_event:
return task_event
raise ResourceNotFoundError(f"TaskEvent with id '{event_id}' not found.")

@classmethod
def create_task_events_from_template(
Expand Down Expand Up @@ -240,7 +244,7 @@ def _get_task_count(cls, work_phase_id: int):
.filter(
TaskEvent.is_active.is_(True),
TaskEvent.is_deleted.is_(False),
TaskEvent.work_phase_id == work_phase_id
TaskEvent.work_phase_id == work_phase_id,
)
.count()
)
Expand Down
183 changes: 183 additions & 0 deletions epictrack-api/tests/unit/apis/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Copyright © 2019 Province of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test suite for task."""

from copy import copy
from datetime import datetime
from http import HTTPStatus
from pathlib import Path
from urllib.parse import urljoin

from werkzeug.datastructures import FileStorage

from api.utils.constants import CANADA_TIMEZONE
from tests.utilities.factory_scenarios import TestTaskEnum, TestTaskTemplateEnum
from tests.utilities.factory_utils import (
factory_staff_model, factory_staff_work_role_model, factory_task_model, factory_work_model,
factory_work_phase_model)


API_BASE_URL = "/api/v1/"


def test_get_tasks(client, auth_header):
"""Test get tasks."""
work_phase = factory_work_phase_model()

# Get tasks for the work phase
url = urljoin(API_BASE_URL, f"tasks/events?work_phase_id={work_phase.id}")
response = client.get(url, headers=auth_header)
assert response.status_code == HTTPStatus.OK


def test_create_task(client, auth_header):
"""Test create task"""
work_phase = factory_work_phase_model()
task_data = TestTaskEnum.task1.value
task_data["work_phase_id"] = work_phase.id

url = urljoin(API_BASE_URL, "tasks/events")
response = client.post(url, json=task_data, headers=auth_header)
assert response.status_code == HTTPStatus.CREATED
response_json = response.json
responsibility_ids_response = [
x["responsibility_id"] for x in response.json["responsibilities"]
]
assert "id" in response_json
assert task_data["name"] == response_json["name"]
assert task_data["work_phase_id"] == response_json["work_phase_id"]
assert task_data["status"] == response_json["status"]
assert task_data["responsibility_ids"] == responsibility_ids_response
start_date = datetime.fromisoformat(response_json["start_date"])
start_date = start_date.astimezone(CANADA_TIMEZONE)
value = datetime.fromisoformat(task_data["start_date"])
assert value.date() == start_date.date()


def test_get_task_details(client, auth_header):
"""Test get tasks by id"""
task = factory_task_model()
# Get tasks
url = urljoin(API_BASE_URL, f"tasks/events/{task.id}")
response = client.get(url, headers=auth_header)
assert response.status_code == HTTPStatus.OK
response_json = response.json
assert task.name == response_json["name"]
assert task.work_phase_id == response_json["work_phase_id"]
assert task.status.value == response_json["status"]
start_date = datetime.fromisoformat(response_json["start_date"])
start_date = start_date.astimezone(CANADA_TIMEZONE)
task_start_date = task.start_date.astimezone(CANADA_TIMEZONE)
assert task_start_date.date() == start_date.date()


def test_bulk_update_tasks(client, auth_header):
"""Test bulk update tasks"""
work = factory_work_model()
task1 = factory_task_model(work_id=work.id)
task2 = factory_task_model(work_id=work.id)
responsibility_ids = TestTaskEnum.task1.value["responsibility_ids"]
assignee = factory_staff_model()
status = TestTaskEnum.task1.value["status"]
payload = {
"task_ids": [task1.id, task2.id],
"responsibility_ids": responsibility_ids,
"assignee_ids": [assignee.id],
"status": status,
}
url = urljoin(API_BASE_URL, "tasks/events")
response = client.patch(url, json=payload, headers=auth_header)
assert response.status_code == HTTPStatus.OK


def test_bulk_delete_tasks(client, auth_header):
"""Test bulk delete tasks"""
work = factory_work_model()
task1 = factory_task_model(work_id=work.id)
task2 = factory_task_model(work_id=work.id)
url = urljoin(API_BASE_URL, "tasks/events")

query = {"task_ids": f"{task1.id},{task2.id}", "work_id": work.id}
response = client.delete(url, query_string=query, headers=auth_header)
assert response.status_code == HTTPStatus.OK

url = urljoin(API_BASE_URL, f"tasks/events/{task1.id}")
response = client.get(url, headers=auth_header)
assert response.status_code == HTTPStatus.NOT_FOUND

url = urljoin(API_BASE_URL, f"tasks/events/{task2.id}")
response = client.get(url, headers=auth_header)
assert response.status_code == HTTPStatus.NOT_FOUND


def test_update_task(client, auth_header):
"""Test update task"""
work = factory_work_model()
task = factory_task_model(work_id=work.id)
work_staff = factory_staff_work_role_model(work_id=work.id)
assignee = work_staff.staff

updated_data = copy(TestTaskEnum.task1.value)
updated_data.update(
{
"name": "New Task Name",
"assignee_ids": [assignee.id],
}
)
url = urljoin(API_BASE_URL, f"tasks/events/{task.id}")
response = client.put(url, headers=auth_header, json=updated_data)
response_json = response.json

assert response.status_code == HTTPStatus.OK
assert response_json["id"] == task.id
assert response_json["name"] == updated_data["name"]
assignee_ids_response = [x["assignee_id"] for x in response_json["assignees"]]
assert assignee.id in assignee_ids_response


def test_import_template_tasks(client, auth_header):
"""Test import template task events"""
# Create a valid template with tasks.
task_template_data = TestTaskTemplateEnum.task_template1.value
file_path = Path("./src/api/templates/task_template/task_template.xlsx")
file_path = file_path.resolve()
file = FileStorage(
stream=open(file_path, "rb"),
filename="task_template.xlsx",
)
task_template_data["template_file"] = file

url = urljoin(API_BASE_URL, "task-templates")
template_response = client.post(
url,
data=task_template_data,
headers=auth_header,
content_type="multipart/form-data",
)
assert template_response.status_code == HTTPStatus.CREATED

# Mark the template as active
data = {"is_active": True}
url = urljoin(API_BASE_URL, f"task-templates/{template_response.json['id']}")
response = client.patch(url, json=data, headers=auth_header)
assert response.status_code == HTTPStatus.OK

work_phase = factory_work_phase_model()
data = {"work_phase_id": work_phase.id}
url = urljoin(
API_BASE_URL, f"tasks/templates/{template_response.json['id']}/events"
)
response = client.post(url, json=data, headers=auth_header)
assert response.status_code == HTTPStatus.CREATED
assert len(response.json) > 0
25 changes: 25 additions & 0 deletions epictrack-api/tests/utilities/factory_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from api.config import get_named_config
from api.models.special_field import EntityEnum
from api.models.task_event import StatusEnum
from api.utils.constants import CANADA_TIMEZONE
from tests.constants import ASSESSMENT_WORK_TYPE
from tests.utilities.providers import FormattedPhoneNumberProvider
Expand Down Expand Up @@ -307,3 +308,27 @@ class TestTaskTemplateEnum(Enum):
"work_type_id": 1,
"phase_id": 1,
}


class WorkPhaseEnum(Enum):
"""Test scenarios for work phase"""

work_phase1 = {
"start_date": (start_date := fake.date_time_this_decade(tzinfo=CANADA_TIMEZONE)).isoformat(),
"end_date": fake.date_time_between_dates(datetime_start=start_date, tzinfo=CANADA_TIMEZONE).isoformat(),
"phase_id": 1,
"sort_order": 1
}


class TestTaskEnum(Enum):
"""Test scenarios for tasks"""

task1 = {
"name": fake.word(),
"responsibility_ids": fake.random_elements(elements=(
1, 2, 3, 4, 5, 6, 7, 8, 9
), unique=True),
"start_date": fake.date_time_this_decade(tzinfo=CANADA_TIMEZONE).isoformat(),
"status": StatusEnum.NOT_STARTED.value
}
36 changes: 34 additions & 2 deletions epictrack-api/tests/utilities/factory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
from api.models.proponent import Proponent
from api.models.special_field import SpecialField
from api.models.staff_work_role import StaffWorkRole
from api.models.task_event import TaskEvent
from api.models.task_template import TaskTemplate
from api.models.work_phase import WorkPhase
from tests.utilities.factory_scenarios import (
TestFirstNation, TestPipOrgType, TestProjectInfo, TestProponent, TestRoleEnum, TestSpecialField, TestStaffInfo,
TestStatus, TestTaskTemplateEnum, TestWorkFirstNationEnum, TestWorkInfo, TestWorkIssuesInfo,
TestWorkIssueUpdatesInfo)
TestStatus, TestTaskEnum, TestTaskTemplateEnum, TestWorkFirstNationEnum, TestWorkInfo, TestWorkIssuesInfo,
TestWorkIssueUpdatesInfo, WorkPhaseEnum)


CONFIG = get_named_config("testing")
Expand Down Expand Up @@ -261,3 +263,33 @@ def factory_task_template_model(data=TestTaskTemplateEnum.task_template1.value):
)
task_template.save()
return task_template


def factory_work_phase_model(data=WorkPhaseEnum.work_phase1.value, work_id=None):
"""Produce a work phase model"""
if work_id is None:
work = factory_work_model()
work_id = work.id
work_phase = WorkPhase(
work_id=work_id,
phase_id=data["phase_id"],
start_date=data["start_date"],
end_date=data["end_date"],
sort_order=data["sort_order"]
)
work_phase.save()
return work_phase


def factory_task_model(data=TestTaskEnum.task1.value, work_id: int = None):
"""Produce a task model"""
work_phase = factory_work_phase_model(work_id=work_id)
data["work_phase_id"] = work_phase.id
task_event = TaskEvent(
work_phase_id=data["work_phase_id"],
start_date=data["start_date"],
status=data["status"],
name=data["name"]
)
task_event.save()
return task_event

0 comments on commit 5ba4608

Please sign in to comment.