Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/qiskit-serverless/charts/gateway/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ application:
corsOrigins: "http://localhost"
dependencies:
dynamicDependencies: "requirements-dynamic-dependencies.txt"
logsMaximumSize: 50 # in MB
logsMaximumSize: 52428800 # 50Mb in bytes

cos:
claimName: gateway-claim
Expand Down
2 changes: 1 addition & 1 deletion client/qiskit_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@
from .exception import QiskitServerlessException
from .core.function import QiskitPattern, QiskitFunction
from .serializers import get_arguments
from .utils import ServerlessRuntimeService
from .utils import ServerlessRuntimeService, get_logger, get_provider_logger
from .version import __version__
1 change: 1 addition & 0 deletions client/qiskit_serverless/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@
from .errors import ErrorCodes
from .formatting import format_provider_name_and_title
from .runtime_service_client import ServerlessRuntimeService
from .loggers import get_logger, get_provider_logger
77 changes: 77 additions & 0 deletions client/qiskit_serverless/utils/loggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# This code is a Qiskit project.
#
# (C) Copyright IBM 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
======================================================================
Loggers utilities (:mod:`qiskit_serverless.utils.loggers`)
======================================================================

.. currentmodule:: qiskit_serverless.utils.loggers
"""
import logging


class PrefixFormatter(logging.Formatter):
"""Formater to add a prefix to logger and split in lines."""

def __init__(self, prefix, *args, **kwargs):
super().__init__(*args, **kwargs)
self.prefix = prefix

def format(self, record):
"""format the record to add a prefix to logger and split in lines."""
original_msg = record.getMessage()
lines = original_msg.splitlines()

formatted_lines = []
for line in lines:
temp = logging.LogRecord(
name=record.name,
level=record.levelno,
pathname=record.pathname,
lineno=record.lineno,
msg=line,
args=None,
exc_info=None,
)

base = super().format(temp)
formatted_lines.append(f"{self.prefix} {base}")

return "\n".join(formatted_lines)


def _create_logger(name, prefix):
"""creates a logger with the prefix formatter"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
logger.propagate = False

if not logger.handlers:
handler = logging.StreamHandler()
formatter = PrefixFormatter(
prefix=prefix, fmt="%(levelname)s:%(name)s: %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)

return logger


def get_logger():
"""creates a logger for the user public logs"""
return _create_logger("user", "[PUBLIC]")


def get_provider_logger():
"""creates a logger for the provider private logs"""
return _create_logger("provider", "[PRIVATE]")
3 changes: 2 additions & 1 deletion gateway/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,5 @@ GitHub.sublime-settings
!.vscode/extensions.json
.history

tests/resources/fake_media/*
tests/resources/fake_media/**/arguments/*
!tests/resources/fake_media/**/logs/*.log
22 changes: 22 additions & 0 deletions gateway/api/access_policies/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ def can_read_result(user: type[AbstractUser], job: Job) -> bool:
)
return has_access

@staticmethod
def can_read_logs(user: type[AbstractUser], job: Job) -> bool:
"""
Checks if the user has permissions to read the result of a job:

Args:
user: Django user from the request
job: Job instance against to check the permission

Returns:
bool: True or False in case the user has permissions
"""

has_access = user.id == job.author.id
if not has_access:
logger.warning(
"User [%s] has no access to read the result of the job [%s].",
user.username,
job.author,
)
return has_access

@staticmethod
def can_save_result(user: type[AbstractUser], job: Job) -> bool:
"""
Expand Down
2 changes: 1 addition & 1 deletion gateway/api/access_policies/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def can_access(user, provider: Provider) -> bool:
"""

user_groups = set(user.groups.all())
admin_groups = set(provider.admin_groups.all() if provider else [])
admin_groups = set(provider.admin_groups.all())
user_is_admin = bool(user_groups.intersection(admin_groups))
if not user_is_admin:
logger.warning(
Expand Down
35 changes: 17 additions & 18 deletions gateway/api/domain/function/check_logs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""This class contain methods to manage the logs in the application."""

import logging
import sys
from typing import Union

from django.conf import settings
Expand All @@ -27,35 +26,35 @@ def check_logs(logs: Union[str, None], job: Job) -> str:
logs with error message and metadata.
"""

max_mb = int(settings.FUNCTIONS_LOGS_SIZE_LIMIT)
max_bytes = max_mb * 1024**2

if job.status == Job.FAILED and logs in ["", None]:
if job.status == Job.FAILED and not logs:
logs = f"Job {job.id} failed due to an internal error."
logger.warning("Job %s failed due to an internal error.", job.id)

return logs

logs_size = sys.getsizeof(logs)
if logs_size == 0:
return logs
if not logs:
return ""

max_bytes = int(settings.FUNCTIONS_LOGS_SIZE_LIMIT)

logs_size = len(logs)

if logs_size > max_bytes:
logger.warning(
"Job %s is exceeding the maximum size for logs %s MB > %s MB.",
job.id,
logs_size,
max_mb,
max_bytes,
)
ratio = max_bytes / logs_size
new_length = max(1, int(len(logs) * ratio))

# truncate logs depending of the ratio
logs = logs[:new_length]
logs += (
"\nLogs exceeded maximum allowed size ("
+ str(max_mb)
+ " MB) and could not be stored."

# truncate logs discarding older
logs = logs[-max_bytes:]

logs = (
"[Logs exceeded maximum allowed size ("
+ str(max_bytes / (1024**2))
+ " MB). Logs have been truncated, discarding the oldest entries first.]\n"
+ logs
)

return logs
17 changes: 17 additions & 0 deletions gateway/api/domain/function/filter_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Methods for filtering job logs."""
import re


def extract_public_logs(text: str) -> str:
"""
This filter the logs to get the public ones only.

Args:
text: str

Return:
str -> The log filtered out
"""
pattern = re.compile(r"^\[PUBLIC\]", re.IGNORECASE)
lines = [line[9:] for line in text.splitlines() if pattern.match(line)]
return "\n".join(lines) + "\n" if lines else ""
84 changes: 82 additions & 2 deletions gateway/api/management/commands/free_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
from django.conf import settings
from django.core.management.base import BaseCommand

from api.domain.function import check_logs
from api.domain.function.filter_logs import extract_public_logs
from api.models import ComputeResource, Job
from api.ray import kill_ray_cluster
from api.ray import kill_ray_cluster, get_job_handler
from api.repositories.users import UserRepository
from api.services.storage.enums.working_dir import WorkingDir
from api.services.storage.logs_storage import LogsStorage


logger = logging.getLogger("commands")
Expand Down Expand Up @@ -45,7 +50,7 @@ def handle(self, *args, **options):
if not there_are_alive_jobs:
self.remove_compute_resource(compute_resource)

def remove_compute_resource(self, compute_resource):
def remove_compute_resource(self, compute_resource: ComputeResource):
"""
This method removes a Compute Resource if it's
available in the cluster.
Expand All @@ -68,6 +73,9 @@ def remove_compute_resource(self, compute_resource):
)
return

self.save_logs_to_storage(job=terminated_job, compute_resource=compute_resource)
terminated_job.logs = ""

is_gpu = terminated_job.gpu
should_remove_as_classical = remove_classical_jobs and not is_gpu
should_remove_as_gpu = remove_gpu_jobs and is_gpu
Expand All @@ -83,3 +91,75 @@ def remove_compute_resource(self, compute_resource):
compute_resource.title,
compute_resource.owner,
)

def save_logs_to_storage(self, job: Job, compute_resource: ComputeResource):
"""
Save the logs in the corresponding storages.

Args:
compute_resource: ComputeResource
job: Job
"""
job_handler = get_job_handler(compute_resource.host)
full_logs = job_handler.logs(job.ray_job_id)
full_logs = check_logs(full_logs, job)

user_repository = UserRepository()
author = user_repository.get_or_create_by_id(job.author)
username = author.username

has_provider = job.program.provider is not None

if has_provider:
self._save_logs_with_provider(full_logs, username, job)
else:
self._save_logs_only_user(full_logs, username, job)

def _save_logs_only_user(self, full_logs: str, username: str, job: Job):
"""
Save the logs in the user storage.

Args:
full_logs: str
username: str
job: Job
"""

user_logs_storage = LogsStorage(
username=username,
working_dir=WorkingDir.USER_STORAGE,
function_title=job.program.title,
provider_name=None,
)

user_logs_storage.save(job.id, full_logs)

def _save_logs_with_provider(self, full_logs: str, username: str, job: Job):
"""
Save the logs in the provide storage and filter
for public logs only to save them into the user storage.

Args:
full_logs: str
username: str
job: Job
"""

user_logs_storage = LogsStorage(
username=username,
working_dir=WorkingDir.USER_STORAGE,
function_title=job.program.title,
provider_name=None,
)

provider_logs_storage = LogsStorage(
username=username,
working_dir=WorkingDir.PROVIDER_STORAGE,
function_title=job.program.title,
provider_name=job.program.provider.name,
)

user_logs = extract_public_logs(full_logs)
user_logs_storage.save(job.id, user_logs)

provider_logs_storage.save(job.id, full_logs)
4 changes: 1 addition & 3 deletions gateway/api/management/commands/update_jobs_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from django.conf import settings
from django.core.management.base import BaseCommand

from api.domain.function import check_logs
from api.models import Job
from api.ray import get_job_handler
from api.schedule import (
Expand Down Expand Up @@ -61,10 +60,9 @@ def update_job_status(job: Job):

if job_handler:
logs = job_handler.logs(job.ray_job_id)
job.logs = check_logs(logs, job)
# check if job is resource constrained
no_resources_log = "No available node types can fulfill resource request"
if no_resources_log in job.logs:
if no_resources_log in logs:
job_new_status = fail_job_insufficient_resources(job)
job.status = job_new_status
# cleanup env vars
Expand Down
2 changes: 1 addition & 1 deletion gateway/api/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from api.models import ComputeResource, Job, JobConfig, DEFAULT_PROGRAM_ENTRYPOINT
from api.services.file_storage import FileStorage, WorkingDir
from api.services.storage import FileStorage, WorkingDir
from api.utils import (
retry_function,
decrypt_env_vars,
Expand Down
2 changes: 1 addition & 1 deletion gateway/api/repositories/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class UserRepository:
The main objective of this class is to manage the access to the model
"""

def get_or_create_by_id(self, user_id: str) -> type[AbstractUser]:
def get_or_create_by_id(self, user_id: str) -> AbstractUser:
"""
This method returns a user by its id. If the user does not
exist its created.
Expand Down
2 changes: 1 addition & 1 deletion gateway/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Tuple, Union
from django.conf import settings
from rest_framework import serializers
from api.services.arguments_storage import ArgumentsStorage
from api.services.storage import ArgumentsStorage

from api.repositories.functions import FunctionRepository
from api.repositories.users import UserRepository
Expand Down
11 changes: 11 additions & 0 deletions gateway/api/services/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
Qiskit Serverless storage services classes

Storage classes are used to manage the content in Object Storage
"""

from .arguments_storage import ArgumentsStorage
from .file_storage import FileStorage
from .logs_storage import LogsStorage
from .result_storage import ResultStorage
from .enums.working_dir import WorkingDir
Loading
Loading