Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions gateway/api/domain/function/filter_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Methods for filtering job logs."""
import re


def extract_public_logs(text: str) -> str:
"""
This filter the logs to get the public ones only.

Args:
text: str

Return:
str -> The log filtered out
"""
pattern = re.compile(r"^\[PUBLIC\]", re.IGNORECASE)
lines = [line[9:] for line in text.splitlines() if pattern.match(line)]
return "\n".join(lines) + "\n" if lines else ""
84 changes: 82 additions & 2 deletions gateway/api/management/commands/free_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
from django.conf import settings
from django.core.management.base import BaseCommand

from api.domain.function import check_logs
from api.domain.function.filter_logs import extract_public_logs
from api.models import ComputeResource, Job
from api.ray import kill_ray_cluster
from api.ray import kill_ray_cluster, get_job_handler
from api.repositories.users import UserRepository
from api.services.storage.enums.working_dir import WorkingDir
from api.services.storage.logs_storage import LogsStorage


logger = logging.getLogger("commands")
Expand Down Expand Up @@ -45,7 +50,7 @@ def handle(self, *args, **options):
if not there_are_alive_jobs:
self.remove_compute_resource(compute_resource)

def remove_compute_resource(self, compute_resource):
def remove_compute_resource(self, compute_resource: ComputeResource):
"""
This method removes a Compute Resource if it's
available in the cluster.
Expand All @@ -68,6 +73,9 @@ def remove_compute_resource(self, compute_resource):
)
return

self.save_logs_to_storage(job=terminated_job, compute_resource=compute_resource)
terminated_job.logs = ""

is_gpu = terminated_job.gpu
should_remove_as_classical = remove_classical_jobs and not is_gpu
should_remove_as_gpu = remove_gpu_jobs and is_gpu
Expand All @@ -83,3 +91,75 @@ def remove_compute_resource(self, compute_resource):
compute_resource.title,
compute_resource.owner,
)

def save_logs_to_storage(self, job: Job, compute_resource: ComputeResource):
"""
Save the logs in the corresponding storages.

Args:
compute_resource: ComputeResource
job: Job
"""
job_handler = get_job_handler(compute_resource.host)
full_logs = job_handler.logs(job.ray_job_id)
full_logs = check_logs(full_logs, job)

user_repository = UserRepository()
author = user_repository.get_or_create_by_id(job.author)
username = author.username

has_provider = job.program.provider is not None

if has_provider:
self._save_logs_with_provider(full_logs, username, job)
else:
self._save_logs_only_user(full_logs, username, job)

def _save_logs_only_user(self, full_logs: str, username: str, job: Job):
"""
Save the logs in the user storage.

Args:
full_logs: str
username: str
job: Job
"""

user_logs_storage = LogsStorage(
username=username,
working_dir=WorkingDir.USER_STORAGE,
function_title=job.program.title,
provider_name=None,
)

user_logs_storage.save(job.id, full_logs)

def _save_logs_with_provider(self, full_logs: str, username: str, job: Job):
"""
Save the logs in the provide storage and filter
for public logs only to save them into the user storage.

Args:
full_logs: str
username: str
job: Job
"""

user_logs_storage = LogsStorage(
username=username,
working_dir=WorkingDir.USER_STORAGE,
function_title=job.program.title,
provider_name=None,
)

provider_logs_storage = LogsStorage(
username=username,
working_dir=WorkingDir.PROVIDER_STORAGE,
function_title=job.program.title,
provider_name=job.program.provider.name,
)

user_logs = extract_public_logs(full_logs)
user_logs_storage.save(job.id, user_logs)

provider_logs_storage.save(job.id, full_logs)
4 changes: 1 addition & 3 deletions gateway/api/management/commands/update_jobs_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from django.conf import settings
from django.core.management.base import BaseCommand

from api.domain.function import check_logs
from api.models import Job
from api.ray import get_job_handler
from api.schedule import (
Expand Down Expand Up @@ -61,10 +60,9 @@ def update_job_status(job: Job):

if job_handler:
logs = job_handler.logs(job.ray_job_id)
job.logs = check_logs(logs, job)
Copy link
Contributor

@avilches avilches Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch @korgan00 ! If we trim the logs, how can we will know later if the no_resources_log message is in the logs? :)

# check if job is resource constrained
no_resources_log = "No available node types can fulfill resource request"
if no_resources_log in job.logs:
if no_resources_log in logs:
job_new_status = fail_job_insufficient_resources(job)
job.status = job_new_status
# cleanup env vars
Expand Down
1 change: 0 additions & 1 deletion gateway/api/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from api.services.storage import ArgumentsStorage
from api.models import ComputeResource, Job, JobConfig, DEFAULT_PROGRAM_ENTRYPOINT
from api.services.storage import FileStorage, WorkingDir
from api.utils import (
Expand Down
2 changes: 1 addition & 1 deletion gateway/api/repositories/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class UserRepository:
The main objective of this class is to manage the access to the model
"""

def get_or_create_by_id(self, user_id: str) -> type[AbstractUser]:
def get_or_create_by_id(self, user_id: str) -> AbstractUser:
"""
This method returns a user by its id. If the user does not
exist its created.
Expand Down
20 changes: 20 additions & 0 deletions gateway/api/services/storage/logs_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,23 @@ def get(self, job_id: str) -> Optional[str]:
str(e),
)
return None

def save(self, job_id: str, logs: str):
"""
Creates and writes a log file for the given job id

Args:
job_id (str): the id for the job to save the logs
logs (str): the logs to be saved
"""
log_path = self._get_logs_path(job_id)

try:
with open(log_path, "w+", encoding=self.ENCODING) as log_file:
log_file.write(logs)
except (UnicodeDecodeError, IOError) as e:
logger.error(
"Failed to write log file for job ID '%s': %s",
job_id,
str(e),
)
25 changes: 20 additions & 5 deletions gateway/api/use_cases/jobs/get_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
from api.access_policies.jobs import JobAccessPolicies
from api.domain.exceptions.not_found_error import NotFoundError
from api.domain.exceptions.forbidden_error import ForbiddenError
from api.domain.function import check_logs
from api.domain.function.filter_logs import extract_public_logs
from api.ray import get_job_handler
from api.repositories.jobs import JobsRepository
from api.access_policies.providers import ProviderAccessPolicy
from api.services.storage.enums.working_dir import WorkingDir
from api.services.storage.logs_storage import LogsStorage


NO_LOGS_MSG: Final[str] = "No available logs"
NO_LOGS_MSG_2: Final[str] = "No logs yet."


class GetJobLogsUseCase:
Expand Down Expand Up @@ -52,10 +55,22 @@ def execute(self, job_id: UUID, user: AbstractUser) -> str:

logs = logs_storage.get(job_id)

if logs is None:
raise NotFoundError(f"Logs for job[{job_id}] are not found")
# No logs stored.
if logs:
return logs

# Get from Ray if it is already running.
if job.compute_resource and job.compute_resource.active:
job_handler = get_job_handler(job.compute_resource.host)
logs = job_handler.logs(job.ray_job_id)
logs = check_logs(logs, job)
logs = extract_public_logs(logs)
return logs

if len(logs) == 0:
return "No logs available"
logs = job.logs

if not logs or logs == NO_LOGS_MSG or logs == NO_LOGS_MSG_2:
raise NotFoundError(f"Logs for job[{job_id}] are not found")

# Legacy: Get from db.
return logs
18 changes: 15 additions & 3 deletions gateway/api/use_cases/jobs/provider_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

from django.contrib.auth.models import AbstractUser

from api.access_policies.jobs import JobAccessPolicies
from api.domain.exceptions.not_found_error import NotFoundError
from api.domain.exceptions.forbidden_error import ForbiddenError
from api.domain.function import check_logs
from api.ray import get_job_handler
from api.repositories.jobs import JobsRepository
from api.access_policies.providers import ProviderAccessPolicy
from api.services.storage.enums.working_dir import WorkingDir
Expand Down Expand Up @@ -55,10 +56,21 @@ def execute(self, job_id: UUID, user: AbstractUser) -> str:

logs = logs_storage.get(job_id)

if logs is None:
logs = job.logs
# No logs stored.
if logs:
return logs

# Get from Ray if it is already running.
if job.compute_resource and job.compute_resource.active:
job_handler = get_job_handler(job.compute_resource.host)
logs = job_handler.logs(job.ray_job_id)
logs = check_logs(logs, job)
return logs

logs = job.logs

if not logs or logs == NO_LOGS_MSG or logs == NO_LOGS_MSG_2:
raise NotFoundError(f"Logs for job[{job_id}] are not found")

# Legacy: Get from db.
return logs
49 changes: 49 additions & 0 deletions gateway/tests/api/domain/functions/test_filter_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Tests for filter_logs."""

from rest_framework.test import APITestCase

from api.domain.function.filter_logs import extract_public_logs


class TestFilterLogs(APITestCase):
"""Tests for filter_logs."""

def test_extract_public_logs(self):
"""Tests compute resource creation command."""

log = """

third_party.run_function:INFO:2024-11-15 11:30:32,124: third party setting up...
system 1 up...
system 2 up...
Setup complete!
[PUBLIC] sim_entrypoint.run_function:INFO:2024-10-15 11:30:32,123: Starting application
[private] sim_entrypoint.run_function:INFO:2024-10-15 11:30:32,123: Mapping
[PUBLIC] sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Backend = {
[PUBLIC] "name": "ibm_123"
[PUBLIC] }
third_party.run_function:INFO:2024-11-15 11:30:32,124: running_options = {
"options_group": {
"important": true,
"very_important": true,
"more_options": {
"gigabytes": 512,
"also_gigabytes": 512
}
},
}
[puBLic] sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Starting
[PRIVATE] sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Private information

"""

expected_output = """sim_entrypoint.run_function:INFO:2024-10-15 11:30:32,123: Starting application
sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Backend = {
"name": "ibm_123"
}
sim_entrypoint.run_function:INFO:2024-11-15 11:30:32,124: Starting
"""

output_log = extract_public_logs(log)

self.assertEquals(output_log, expected_output)
Loading