diff --git a/gateway/api/domain/function/filter_logs.py b/gateway/api/domain/function/filter_logs.py new file mode 100644 index 000000000..dbd2f8503 --- /dev/null +++ b/gateway/api/domain/function/filter_logs.py @@ -0,0 +1,17 @@ +"""Methods for filtering job logs.""" +import re + + +def extract_public_logs(text: str) -> str: + """ + This filter the logs to get the public ones only. + + Args: + text: str + + Return: + str -> The log filtered out + """ + pattern = re.compile(r"^\[PUBLIC\]", re.IGNORECASE) + lines = [line[9:] for line in text.splitlines() if pattern.match(line)] + return "\n".join(lines) + "\n" if lines else "" diff --git a/gateway/api/management/commands/free_resources.py b/gateway/api/management/commands/free_resources.py index 9d1de4e5c..57b3c8980 100644 --- a/gateway/api/management/commands/free_resources.py +++ b/gateway/api/management/commands/free_resources.py @@ -5,8 +5,13 @@ from django.conf import settings from django.core.management.base import BaseCommand +from api.domain.function import check_logs +from api.domain.function.filter_logs import extract_public_logs from api.models import ComputeResource, Job -from api.ray import kill_ray_cluster +from api.ray import kill_ray_cluster, get_job_handler +from api.repositories.users import UserRepository +from api.services.storage.enums.working_dir import WorkingDir +from api.services.storage.logs_storage import LogsStorage logger = logging.getLogger("commands") @@ -45,7 +50,7 @@ def handle(self, *args, **options): if not there_are_alive_jobs: self.remove_compute_resource(compute_resource) - def remove_compute_resource(self, compute_resource): + def remove_compute_resource(self, compute_resource: ComputeResource): """ This method removes a Compute Resource if it's available in the cluster. @@ -68,6 +73,9 @@ def remove_compute_resource(self, compute_resource): ) return + self.save_logs_to_storage(job=terminated_job, compute_resource=compute_resource) + terminated_job.logs = "" + is_gpu = terminated_job.gpu should_remove_as_classical = remove_classical_jobs and not is_gpu should_remove_as_gpu = remove_gpu_jobs and is_gpu @@ -83,3 +91,75 @@ def remove_compute_resource(self, compute_resource): compute_resource.title, compute_resource.owner, ) + + def save_logs_to_storage(self, job: Job, compute_resource: ComputeResource): + """ + Save the logs in the corresponding storages. + + Args: + compute_resource: ComputeResource + job: Job + """ + job_handler = get_job_handler(compute_resource.host) + full_logs = job_handler.logs(job.ray_job_id) + full_logs = check_logs(full_logs, job) + + user_repository = UserRepository() + author = user_repository.get_or_create_by_id(job.author) + username = author.username + + has_provider = job.program.provider is not None + + if has_provider: + self._save_logs_with_provider(full_logs, username, job) + else: + self._save_logs_only_user(full_logs, username, job) + + def _save_logs_only_user(self, full_logs: str, username: str, job: Job): + """ + Save the logs in the user storage. + + Args: + full_logs: str + username: str + job: Job + """ + + user_logs_storage = LogsStorage( + username=username, + working_dir=WorkingDir.USER_STORAGE, + function_title=job.program.title, + provider_name=None, + ) + + user_logs_storage.save(job.id, full_logs) + + def _save_logs_with_provider(self, full_logs: str, username: str, job: Job): + """ + Save the logs in the provide storage and filter + for public logs only to save them into the user storage. + + Args: + full_logs: str + username: str + job: Job + """ + + user_logs_storage = LogsStorage( + username=username, + working_dir=WorkingDir.USER_STORAGE, + function_title=job.program.title, + provider_name=None, + ) + + provider_logs_storage = LogsStorage( + username=username, + working_dir=WorkingDir.PROVIDER_STORAGE, + function_title=job.program.title, + provider_name=job.program.provider.name, + ) + + user_logs = extract_public_logs(full_logs) + user_logs_storage.save(job.id, user_logs) + + provider_logs_storage.save(job.id, full_logs) diff --git a/gateway/api/management/commands/update_jobs_statuses.py b/gateway/api/management/commands/update_jobs_statuses.py index b55a4e0f0..1ad61591d 100644 --- a/gateway/api/management/commands/update_jobs_statuses.py +++ b/gateway/api/management/commands/update_jobs_statuses.py @@ -6,7 +6,6 @@ from django.conf import settings from django.core.management.base import BaseCommand -from api.domain.function import check_logs from api.models import Job from api.ray import get_job_handler from api.schedule import ( @@ -61,10 +60,9 @@ def update_job_status(job: Job): if job_handler: logs = job_handler.logs(job.ray_job_id) - job.logs = check_logs(logs, job) # check if job is resource constrained no_resources_log = "No available node types can fulfill resource request" - if no_resources_log in job.logs: + if no_resources_log in logs: job_new_status = fail_job_insufficient_resources(job) job.status = job_new_status # cleanup env vars diff --git a/gateway/api/ray.py b/gateway/api/ray.py index fdca3e245..157227ff0 100644 --- a/gateway/api/ray.py +++ b/gateway/api/ray.py @@ -21,7 +21,6 @@ from opentelemetry import trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from api.services.storage import ArgumentsStorage from api.models import ComputeResource, Job, JobConfig, DEFAULT_PROGRAM_ENTRYPOINT from api.services.storage import FileStorage, WorkingDir from api.utils import ( diff --git a/gateway/api/repositories/users.py b/gateway/api/repositories/users.py index 5a3f682be..2d2f1ecfc 100644 --- a/gateway/api/repositories/users.py +++ b/gateway/api/repositories/users.py @@ -21,7 +21,7 @@ class UserRepository: The main objective of this class is to manage the access to the model """ - def get_or_create_by_id(self, user_id: str) -> type[AbstractUser]: + def get_or_create_by_id(self, user_id: str) -> AbstractUser: """ This method returns a user by its id. If the user does not exist its created. diff --git a/gateway/api/services/storage/logs_storage.py b/gateway/api/services/storage/logs_storage.py index 07973d4ac..3db574a53 100644 --- a/gateway/api/services/storage/logs_storage.py +++ b/gateway/api/services/storage/logs_storage.py @@ -90,3 +90,23 @@ def get(self, job_id: str) -> Optional[str]: str(e), ) return None + + def save(self, job_id: str, logs: str): + """ + Creates and writes a log file for the given job id + + Args: + job_id (str): the id for the job to save the logs + logs (str): the logs to be saved + """ + log_path = self._get_logs_path(job_id) + + try: + with open(log_path, "w+", encoding=self.ENCODING) as log_file: + log_file.write(logs) + except (UnicodeDecodeError, IOError) as e: + logger.error( + "Failed to write log file for job ID '%s': %s", + job_id, + str(e), + ) diff --git a/gateway/api/use_cases/jobs/get_logs.py b/gateway/api/use_cases/jobs/get_logs.py index 21cccc0a7..3742c2321 100644 --- a/gateway/api/use_cases/jobs/get_logs.py +++ b/gateway/api/use_cases/jobs/get_logs.py @@ -9,13 +9,16 @@ from api.access_policies.jobs import JobAccessPolicies from api.domain.exceptions.not_found_error import NotFoundError from api.domain.exceptions.forbidden_error import ForbiddenError +from api.domain.function import check_logs +from api.domain.function.filter_logs import extract_public_logs +from api.ray import get_job_handler from api.repositories.jobs import JobsRepository -from api.access_policies.providers import ProviderAccessPolicy from api.services.storage.enums.working_dir import WorkingDir from api.services.storage.logs_storage import LogsStorage NO_LOGS_MSG: Final[str] = "No available logs" +NO_LOGS_MSG_2: Final[str] = "No logs yet." class GetJobLogsUseCase: @@ -52,10 +55,22 @@ def execute(self, job_id: UUID, user: AbstractUser) -> str: logs = logs_storage.get(job_id) - if logs is None: - raise NotFoundError(f"Logs for job[{job_id}] are not found") + # No logs stored. + if logs: + return logs + + # Get from Ray if it is already running. + if job.compute_resource and job.compute_resource.active: + job_handler = get_job_handler(job.compute_resource.host) + logs = job_handler.logs(job.ray_job_id) + logs = check_logs(logs, job) + logs = extract_public_logs(logs) + return logs - if len(logs) == 0: - return "No logs available" + logs = job.logs + + if not logs or logs == NO_LOGS_MSG or logs == NO_LOGS_MSG_2: + raise NotFoundError(f"Logs for job[{job_id}] are not found") + # Legacy: Get from db. return logs diff --git a/gateway/api/use_cases/jobs/provider_logs.py b/gateway/api/use_cases/jobs/provider_logs.py index 6f84bc3b6..edfd03af7 100644 --- a/gateway/api/use_cases/jobs/provider_logs.py +++ b/gateway/api/use_cases/jobs/provider_logs.py @@ -6,9 +6,10 @@ from django.contrib.auth.models import AbstractUser -from api.access_policies.jobs import JobAccessPolicies from api.domain.exceptions.not_found_error import NotFoundError from api.domain.exceptions.forbidden_error import ForbiddenError +from api.domain.function import check_logs +from api.ray import get_job_handler from api.repositories.jobs import JobsRepository from api.access_policies.providers import ProviderAccessPolicy from api.services.storage.enums.working_dir import WorkingDir @@ -55,10 +56,21 @@ def execute(self, job_id: UUID, user: AbstractUser) -> str: logs = logs_storage.get(job_id) - if logs is None: - logs = job.logs + # No logs stored. + if logs: + return logs + + # Get from Ray if it is already running. + if job.compute_resource and job.compute_resource.active: + job_handler = get_job_handler(job.compute_resource.host) + logs = job_handler.logs(job.ray_job_id) + logs = check_logs(logs, job) + return logs + + logs = job.logs if not logs or logs == NO_LOGS_MSG or logs == NO_LOGS_MSG_2: raise NotFoundError(f"Logs for job[{job_id}] are not found") + # Legacy: Get from db. return logs diff --git a/gateway/tests/api/authentication/__init__.py b/gateway/tests/api/domain/__init__.py similarity index 100% rename from gateway/tests/api/authentication/__init__.py rename to gateway/tests/api/domain/__init__.py diff --git a/gateway/tests/api/authentication/custom_token/__init__.py b/gateway/tests/api/domain/functions/__init__.py similarity index 100% rename from gateway/tests/api/authentication/custom_token/__init__.py rename to gateway/tests/api/domain/functions/__init__.py diff --git a/gateway/tests/api/domain/functions/test_filter_logs.py b/gateway/tests/api/domain/functions/test_filter_logs.py new file mode 100644 index 000000000..9d27337da --- /dev/null +++ b/gateway/tests/api/domain/functions/test_filter_logs.py @@ -0,0 +1,49 @@ +"""Tests for filter_logs.""" + +from rest_framework.test import APITestCase + +from api.domain.function.filter_logs import extract_public_logs + + +class TestFilterLogs(APITestCase): + """Tests for filter_logs.""" + + def test_extract_public_logs(self): + """Tests compute resource creation command.""" + + log = """ + +third_party.run_function:INFO:2024-11-15 11:30:32,124: third party setting up... +system 1 up... +system 2 up... +Setup complete! +[PUBLIC] sim_entrypoint.run_function:INFO:2024-10-15 11:30:32,123: Starting application +[private] sim_entrypoint.run_function:INFO:2024-10-15 11:30:32,123: Mapping +[PUBLIC] sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Backend = { +[PUBLIC] "name": "ibm_123" +[PUBLIC] } +third_party.run_function:INFO:2024-11-15 11:30:32,124: running_options = { + "options_group": { + "important": true, + "very_important": true, + "more_options": { + "gigabytes": 512, + "also_gigabytes": 512 + } + }, +} +[puBLic] sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Starting +[PRIVATE] sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Private information + +""" + + expected_output = """sim_entrypoint.run_function:INFO:2024-10-15 11:30:32,123: Starting application +sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Backend = { + "name": "ibm_123" +} +sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Starting +""" + + output_log = extract_public_logs(log) + + self.assertEquals(output_log, expected_output)