Skip to content

Commit

Permalink
Merge pull request #2256 from fractal-analytics-platform/improve-logg…
Browse files Browse the repository at this point in the history
…ers-in-task-collection

Add missing `reset_logger_handlers` to task lifecycle background tasks
  • Loading branch information
tcompa authored Feb 6, 2025
2 parents 92a114f + cedd17a commit 429d326
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* Review configuration variables for email-sending (\#2241).
* Database:
* Remove `run_migrations_offline` from `env.py` and make `run_migrations_online` sync (\#2239).
* Task lifecycle:
* Reset logger handlers upon success of a background lifecycle operation, to avoid open file descriptors (\#2256).
* Runner
* Sudo/SLURM executor checks the fractal-server version using `FRACTAL_SLURM_WORKER_PYTHON` config variable, if set (\#2240).
* Add `uname -n` to SLURM submission scripts (\#2247).
Expand Down
17 changes: 9 additions & 8 deletions fractal_server/tasks/v2/local/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from fractal_server.app.schemas.v2 import TaskGroupActivityStatusV2
from fractal_server.app.schemas.v2 import WheelFile
from fractal_server.app.schemas.v2.manifest import ManifestV2
from fractal_server.logger import reset_logger_handlers
from fractal_server.logger import set_logger
from fractal_server.tasks.utils import get_log_path
from fractal_server.tasks.v2.local._utils import check_task_files_exist
Expand Down Expand Up @@ -80,7 +81,7 @@ def collect_local(
return

# Log some info
logger.debug("START")
logger.info("START")
for key, value in task_group.model_dump().items():
logger.debug(f"task_group.{key}: {value}")

Expand All @@ -101,17 +102,15 @@ def collect_local(
try:
# Create task_group.path folder
Path(task_group.path).mkdir(parents=True)
logger.debug(f"Created {task_group.path}")
logger.info(f"Created {task_group.path}")

# Write wheel file and set task_group.wheel_path
if wheel_file is not None:

wheel_path = (
Path(task_group.path) / wheel_file.filename
).as_posix()
logger.debug(
f"Write wheel-file contents into {wheel_path}"
)
logger.info(f"Write wheel-file contents into {wheel_path}")
with open(wheel_path, "wb") as f:
f.write(wheel_file.contents)
task_group.wheel_path = wheel_path
Expand Down Expand Up @@ -256,12 +255,14 @@ def collect_local(
)

# Finalize (write metadata to DB)
logger.debug("finalising - START")
logger.info("finalising - START")
activity.status = TaskGroupActivityStatusV2.OK
activity.timestamp_ended = get_timestamp()
activity = add_commit_refresh(obj=activity, db=db)
logger.debug("finalising - END")
logger.debug("END")
logger.info("finalising - END")
logger.info("END")

reset_logger_handlers(logger)

except Exception as collection_e:
# Delete corrupted package dir
Expand Down
3 changes: 3 additions & 0 deletions fractal_server/tasks/v2/local/deactivate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from fractal_server.app.schemas.v2 import TaskGroupActivityActionV2
from fractal_server.app.schemas.v2 import TaskGroupV2OriginEnum
from fractal_server.app.schemas.v2.task_group import TaskGroupActivityStatusV2
from fractal_server.logger import reset_logger_handlers
from fractal_server.logger import set_logger
from fractal_server.tasks.utils import FORBIDDEN_DEPENDENCY_STRINGS
from fractal_server.tasks.utils import get_log_path
Expand Down Expand Up @@ -217,6 +218,8 @@ def deactivate_local(
activity.timestamp_ended = get_timestamp()
activity = add_commit_refresh(obj=activity, db=db)

reset_logger_handlers(logger)

except Exception as e:
fail_and_cleanup(
task_group=task_group,
Expand Down
3 changes: 3 additions & 0 deletions fractal_server/tasks/v2/local/reactivate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from fractal_server.app.models.v2 import TaskGroupV2
from fractal_server.app.schemas.v2 import TaskGroupActivityActionV2
from fractal_server.app.schemas.v2.task_group import TaskGroupActivityStatusV2
from fractal_server.logger import reset_logger_handlers
from fractal_server.logger import set_logger
from fractal_server.tasks.utils import get_log_path
from fractal_server.tasks.v2.utils_background import get_current_log
Expand Down Expand Up @@ -134,6 +135,8 @@ def reactivate_local(
task_group = add_commit_refresh(obj=task_group, db=db)
logger.debug("END")

reset_logger_handlers(logger)

except Exception as reactivate_e:
# Delete corrupted venv_path
try:
Expand Down
16 changes: 8 additions & 8 deletions fractal_server/tasks/v2/ssh/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from fractal_server.app.schemas.v2 import TaskGroupActivityStatusV2
from fractal_server.app.schemas.v2 import WheelFile
from fractal_server.app.schemas.v2.manifest import ManifestV2
from fractal_server.logger import reset_logger_handlers
from fractal_server.logger import set_logger
from fractal_server.ssh._fabric import FractalSSH
from fractal_server.tasks.v2.ssh._utils import _customize_and_run_template
Expand Down Expand Up @@ -85,7 +86,7 @@ def collect_ssh(
return

# Log some info
logger.debug("START")
logger.info("START")
for key, value in task_group.model_dump().items():
logger.debug(f"task_group.{key}: {value}")

Expand Down Expand Up @@ -137,7 +138,7 @@ def collect_ssh(
Path(task_group.path) / wheel_filename
).as_posix()
tmp_wheel_path = (Path(tmpdir) / wheel_filename).as_posix()
logger.debug(
logger.info(
f"Write wheel-file contents into {tmp_wheel_path}"
)
with open(tmp_wheel_path, "wb") as f:
Expand Down Expand Up @@ -171,7 +172,7 @@ def collect_ssh(
logger_name=LOGGER_NAME,
)

logger.debug("installing - START")
logger.info("installing - START")

# Set status to ONGOING and refresh logs
activity.status = TaskGroupActivityStatusV2.ONGOING
Expand Down Expand Up @@ -286,14 +287,13 @@ def collect_ssh(
)

# Finalize (write metadata to DB)
logger.debug("finalising - START")
logger.info("finalising - START")
activity.status = TaskGroupActivityStatusV2.OK
activity.timestamp_ended = get_timestamp()
activity = add_commit_refresh(obj=activity, db=db)
logger.debug("finalising - END")
logger.debug("END")

logger.debug("END")
logger.info("finalising - END")
logger.info("END")
reset_logger_handlers(logger)

except Exception as collection_e:
# Delete corrupted package dir
Expand Down
3 changes: 3 additions & 0 deletions fractal_server/tasks/v2/ssh/deactivate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from fractal_server.app.schemas.v2 import TaskGroupActivityActionV2
from fractal_server.app.schemas.v2 import TaskGroupV2OriginEnum
from fractal_server.app.schemas.v2.task_group import TaskGroupActivityStatusV2
from fractal_server.logger import reset_logger_handlers
from fractal_server.logger import set_logger
from fractal_server.ssh._fabric import FractalSSH
from fractal_server.tasks.utils import FORBIDDEN_DEPENDENCY_STRINGS
Expand Down Expand Up @@ -252,6 +253,8 @@ def deactivate_ssh(
activity.timestamp_ended = get_timestamp()
activity = add_commit_refresh(obj=activity, db=db)

reset_logger_handlers(logger)

except Exception as e:
fail_and_cleanup(
task_group=task_group,
Expand Down
15 changes: 9 additions & 6 deletions fractal_server/tasks/v2/ssh/reactivate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fractal_server.app.models.v2 import TaskGroupV2
from fractal_server.app.schemas.v2 import TaskGroupActivityActionV2
from fractal_server.app.schemas.v2.task_group import TaskGroupActivityStatusV2
from fractal_server.logger import reset_logger_handlers
from fractal_server.logger import set_logger
from fractal_server.ssh._fabric import FractalSSH
from fractal_server.tasks.utils import get_log_path
Expand Down Expand Up @@ -69,7 +70,7 @@ def reactivate_ssh(
return

# Log some info
logger.debug("START")
logger.info("START")
for key, value in task_group.model_dump().items():
logger.debug(f"task_group.{key}: {value}")

Expand Down Expand Up @@ -152,28 +153,30 @@ def reactivate_ssh(
# Create remote directory for scripts
fractal_ssh.mkdir(folder=script_dir_remote)

logger.debug("start - create venv")
logger.info("start - create venv")
_customize_and_run_template(
template_filename="1_create_venv.sh",
**common_args,
)
logger.debug("end - create venv")
logger.info("end - create venv")
activity.log = get_current_log(log_file_path)
activity = add_commit_refresh(obj=activity, db=db)

logger.debug("start - install from pip freeze")
logger.info("start - install from pip freeze")
_customize_and_run_template(
template_filename="6_pip_install_from_freeze.sh",
**common_args,
)
logger.debug("end - install from pip freeze")
logger.info("end - install from pip freeze")
activity.log = get_current_log(log_file_path)
activity.status = TaskGroupActivityStatusV2.OK
activity.timestamp_ended = get_timestamp()
activity = add_commit_refresh(obj=activity, db=db)
task_group.active = True
task_group = add_commit_refresh(obj=task_group, db=db)
logger.debug("END")
logger.info("END")

reset_logger_handlers(logger)

except Exception as reactivate_e:
# Delete corrupted venv_path
Expand Down

0 comments on commit 429d326

Please sign in to comment.