From f706599df2b8dc8bc615ec513a7eb41d883b2150 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 13 Jan 2025 08:52:22 +0100 Subject: [PATCH] Adapt to master changes. --- law/contrib/cms/job.py | 59 +++++++++++++------ law/contrib/cms/scripts/delegate_myproxy.py | 65 +++++++++++++++------ law/contrib/cms/util.py | 36 ++++++++---- law/contrib/cms/workflow.py | 14 +++-- law/contrib/wlcg/util.py | 30 +++++----- law/patches.py | 18 +++++- law/task/base.py | 2 +- law/util.py | 6 +- law/workflow/local.py | 16 +++-- 9 files changed, 170 insertions(+), 76 deletions(-) diff --git a/law/contrib/cms/job.py b/law/contrib/cms/job.py index 787e2fd7..af6d6b85 100644 --- a/law/contrib/cms/job.py +++ b/law/contrib/cms/job.py @@ -48,6 +48,7 @@ class CrabJobManager(BaseJobManager): submission_task_name_cre = re.compile(r"^Task\s+name\s*\:\s+([^\s]+)\s*$") submission_log_file_cre = re.compile(r"^Log\s+file\s+is\s+([^\s]+\.log)\s*$") query_server_status_cre = re.compile(r"^Status\s+on\s+the\s+CRAB\s+server\s*\:\s+([^\s].*)$") + query_server_failure_cre = re.compile(r"^Failure\s+message\s+from\s+server\s*\:\s+([^\s].*)$") query_user_cre = re.compile(r"^Task\s+name\s*:\s+\d+_\d+\:([^_]+)_.+$") query_scheduler_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+([^\s]+).+$") query_scheduler_id_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+crab.*\@.+[^\d](\d+)\..+$") # noqa @@ -70,7 +71,8 @@ class CrabJobManager(BaseJobManager): def __init__( self, sandbox_name: str | None = None, - proxy: str | None = None, + proxy_file: str | None = None, + myproxy_username: str | None = None, instance: str | None = None, threads: int = 1, ) -> None: @@ -89,7 +91,8 @@ def __init__( ) # store attributes - self.proxy = proxy + self.proxy_file = proxy_file + self.myproxy_username = myproxy_username self.instance = instance self.threads = threads @@ -177,17 +180,19 @@ def submit( # type: ignore[override] job_file: str | pathlib.Path, *, job_files: Sequence[str | pathlib.Path] | None = None, - proxy: str | None = None, - instance: str | None = None, + proxy_file: str | None = None, myproxy_username: str | None = None, + instance: str | None = None, retries: int = 0, retry_delay: int | float = 3, silent: bool = False, _processes: list | None = None, ) -> list[JobId] | None: # default arguments - if proxy is None: - proxy = self.proxy + if proxy_file is None: + proxy_file = self.proxy_file + if myproxy_username is None: + myproxy_username = self.myproxy_username if instance is None: instance = self.instance @@ -198,8 +203,8 @@ def submit( # type: ignore[override] while True: # build the command cmd = ["crab", "submit", "--config", job_file_name] - if proxy: - cmd += ["--proxy", proxy] + if proxy_file: + cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd_str = quote_cmd(cmd) @@ -280,21 +285,28 @@ def cancel( # type: ignore[override] proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, - proxy: str | None = None, - instance: str | None = None, + proxy_file: str | None = None, myproxy_username: str | None = None, + instance: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[JobId, None]: self._check_proj_dir(proj_dir) + # default arguments if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir) + if proxy_file is None: + proxy_file = self.proxy_file + if myproxy_username is None: + myproxy_username = self.myproxy_username + if instance is None: + instance = self.instance # build the command cmd = ["crab", "kill", "--dir", str(proj_dir)] - if proxy: - cmd += ["--proxy", proxy] + if proxy_file: + cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd_str = quote_cmd(cmd) @@ -326,9 +338,9 @@ def cleanup( # type: ignore[override] proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, - proxy: str | None = None, - instance: str | None = None, + proxy_file: str | None = None, myproxy_username: str | None = None, + instance: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[JobId, None]: @@ -347,19 +359,26 @@ def query( # type: ignore[override] proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, - proxy: str | None = None, - instance: str | None = None, + proxy_file: str | None = None, myproxy_username: str | None = None, + instance: str | None = None, skip_transfers: bool | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[JobId, dict[str, Any]] | None: self._check_proj_dir(proj_dir) + # default arguments proj_dir = str(proj_dir) log_data = self._parse_log_file(os.path.join(proj_dir, "crab.log")) if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir, log_data=log_data) + if proxy_file is None: + proxy_file = self.proxy_file + if myproxy_username is None: + myproxy_username = self.myproxy_username + if instance is None: + instance = self.instance # when output collection is disabled, we can consider all "transferring" states as finished if skip_transfers is None: @@ -367,8 +386,8 @@ def query( # type: ignore[override] # build the command cmd = ["crab", "status", "--dir", proj_dir, "--json"] - if proxy: - cmd += ["--proxy", proxy] + if proxy_file: + cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd_str = quote_cmd(cmd) @@ -427,6 +446,7 @@ def parse_query_output( cls.query_scheduler_status_cre, cls.query_json_line_cre, cls.query_monitoring_url_cre, + cls.query_server_failure_cre, ] values: list[str | None] = len(cres) * [None] # type: ignore[assignment] for line in out.replace("\r", "").split("\n"): @@ -446,6 +466,7 @@ def parse_query_output( scheduler_status, json_line, monitoring_url, + server_failure, ) = values # helper to build extra info @@ -482,7 +503,7 @@ def extra( status = cls.PENDING elif server_status in failed_server_states: status = cls.FAILED - error = "submission failed" + error = server_failure or "submission failed" else: s = ",".join(map("'{}'".format, pending_server_states | failed_server_states)) raise Exception( diff --git a/law/contrib/cms/scripts/delegate_myproxy.py b/law/contrib/cms/scripts/delegate_myproxy.py index dfe3b623..4893e0d7 100755 --- a/law/contrib/cms/scripts/delegate_myproxy.py +++ b/law/contrib/cms/scripts/delegate_myproxy.py @@ -4,43 +4,68 @@ Script to trigger a myproxy delegation, e.g. for crab submission. """ - -def delegate(renew, endpoint, username, password_file, vo, voms_proxy): +from __future__ import annotations + + +def delegate( + renew: bool, + endpoint: str, + username: str | None, + password_file: str | None, + vo: str | None, + voms_proxy: bool, + crab: bool, + /, +) -> None: import law law.contrib.load("cms", "wlcg") + # settings + encode_username = False + retrievers: list[str] | None = None + + # crab mode + if crab: + encode_username = True + voms_proxy = True + username = None + retrievers = law.cms.util._default_crab_receivers # type: ignore[attr-defined] + # when not renewing, check if a previous delegation exists if not renew: - info = law.wlcg.get_myproxy_info( + info = law.wlcg.get_myproxy_info( # type: ignore[attr-defined] endpoint=endpoint, username=username, + encode_username=encode_username, silent=True, ) if info: - print("existing myproxy delegation found for username {}".format(info["username"])) + print(f"existing myproxy delegation found for username {info['username']}") return - if voms_proxy and (renew or not law.wlcg.check_vomsproxy_validity()): + if voms_proxy and (renew or not law.wlcg.check_vomsproxy_validity()): # type: ignore[attr-defined] # noqa print("\nrenewing voms-proxy") - law.cms.renew_vomsproxy(vo=vo, password_file=password_file) + law.cms.renew_vomsproxy(vo=vo, password_file=password_file) # type: ignore[attr-defined] # create a new delegation - print("\ndelegating to {}".format(endpoint)) - law.cms.delegate_myproxy( + print(f"\ndelegating to {endpoint}") + law.cms.delegate_myproxy( # type: ignore[attr-defined] endpoint=endpoint, username=username, + encode_username=encode_username, password_file=password_file, vo=vo, + retrievers=retrievers, ) -def main(): +def main() -> int: from argparse import ArgumentParser import law - default_pf = law.config.get_expanded("job", "crab_password_file") + cfg = law.config.Config.instance() parser = ArgumentParser( prog="law_cms_delegate_myproxy", @@ -56,15 +81,13 @@ def main(): "--endpoint", "-e", default="myproxy.cern.ch", - help="the server endpoint; default: myproxy.cern.ch", + help="the server endpoint; default: %(default)s", ) parser.add_argument( "--password-file", "-p", - help="a file containing the certificate password" + ( - "; default: {}".format(default_pf) if default_pf else "" - ), - default=default_pf, + default=(pf := cfg.get_expanded("job", "crab_password_file")), + help="a file containing the certificate password" + (f"; default: {pf}" if pf else ""), ) parser.add_argument( "--username", @@ -74,8 +97,8 @@ def main(): parser.add_argument( "--vo", "-m", - default="cms", - help="virtual organization to use; default: cms", + default=law.cms.util._default_vo(), # type: ignore[attr-defined] + help="virtual organization to use; default: %(default)s", ) parser.add_argument( "--voms-proxy", @@ -83,6 +106,11 @@ def main(): action="store_true", help="create a voms-proxy prior to the delegation", ) + parser.add_argument( + "--crab", + action="store_true", + help="adds crab-specific defaults", + ) args = parser.parse_args() delegate( @@ -92,8 +120,11 @@ def main(): args.password_file, args.vo, args.voms_proxy, + args.crab, ) + return 0 + # entry hook if __name__ == "__main__": diff --git a/law/contrib/cms/util.py b/law/contrib/cms/util.py index 796dc758..f2f5808e 100644 --- a/law/contrib/cms/util.py +++ b/law/contrib/cms/util.py @@ -12,10 +12,19 @@ import law - law.contrib.load("wlcg") +# obtained via _get_crab_receivers below +_default_crab_receivers = [ + "/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod)-tw(01|02).cern.ch|/DC=ch/DC=cern/OU=computers/CN=crab-dev-tw(01|02|03|04).cern.ch|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=cmscrab/CN=(817881|373708)/CN=Robot: cms crab|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=crabint1/CN=373708/CN=Robot: CMS CRAB Integration 1", # noqa +] + + +def _default_vo() -> str: + return os.getenv("LAW_CMS_VO", "cms") + + class Site(object): """ Helper class that provides site-related data, mostly via simple properties. When *name* is @@ -135,21 +144,28 @@ def lfn_to_pfn(lfn: str, redirector: str = "global") -> str: def renew_vomsproxy(**kwargs) -> str | None: """ Renews a VOMS proxy in the exact same way that :py:func:`law.wlcg.renew_vomsproxy` does, but - with the *vo* attribute set to ``"cms"`` by default. + with the *vo* argument default to the environment variable LAW_CMS_VO or ``"cms"`` when empty. """ - kwargs.setdefault("vo", "cms") + if "vo" not in kwargs: + kwargs["vo"] = _default_vo() return law.wlcg.renew_vomsproxy(**kwargs) # type: ignore[attr-defined] def delegate_myproxy(**kwargs) -> str | None: """ Delegates a X509 proxy to a myproxy server in the exact same way that - :py:func:`law.wlcg.delegate_myproxy` does, but with the *retrievers* argument set to a value - that is usually expected for crab submissions and the vo set to "cms". + :py:func:`law.wlcg.delegate_myproxy` does, but with the *vo* argument default to the environment + variable LAW_CMS_VO or ``"cms"`` when empty. """ - kwargs.setdefault( - "retrievers", - "/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod|dev)-tw(01|02|03).cern.ch|/DC=ch/DC=cern/OU=computers/CN=stefanov(m|m2).cern.ch|/DC=ch/DC=cern/OU=computers/CN=dciangot-tw.cern.ch", # noqa - ) - kwargs.setdefault("vo", "cms") + if "vo" not in kwargs: + kwargs["vo"] = _default_vo() return law.wlcg.delegate_myproxy(**kwargs) # type: ignore[attr-defined] + + +def _get_crab_receivers() -> None: + from CRABClient.ClientUtilities import initLoggers, server_info # type: ignore[import-not-found] # noqa + from CRABClient.Commands.createmyproxy import createmyproxy # type: ignore[import-not-found] + + cmd = createmyproxy(logger=initLoggers()[1]) + alldns = server_info(crabserver=cmd.crabserver, subresource="delegatedn") + print(alldns.get("services")) diff --git a/law/contrib/cms/workflow.py b/law/contrib/cms/workflow.py index 36729f83..eb95ca72 100644 --- a/law/contrib/cms/workflow.py +++ b/law/contrib/cms/workflow.py @@ -54,7 +54,7 @@ def setup_job_manager(self) -> dict[str, Any]: renew_vomsproxy(proxy_file=proxy_file, password_file=password_file) # ensure that it has been delegated to the myproxy server - info = get_myproxy_info(proxy_file=proxy_file, silent=True) + info = get_myproxy_info(proxy_file=proxy_file, encode_username=True, silent=True) delegate = False if not info: delegate = True @@ -64,18 +64,22 @@ def setup_job_manager(self) -> dict[str, Any]: elif "timeleft" not in info: logger.warning("field 'timeleft' not in myproxy info") delegate = True - elif info["timeleft"] < 86400: # type: ignore[operator] + elif info["timeleft"] < 5 * 86400: # type: ignore[operator] timeleft = human_duration(seconds=info["timeleft"]) - logger.warning(f"myproxy lifetime below 24h ({timeleft})") + logger.warning(f"myproxy lifetime below 5 days ({timeleft})") delegate = True # actual delegation if delegate: - myproxy_username = delegate_myproxy(proxy_file=proxy_file, password_file=password_file) + myproxy_username = delegate_myproxy( + proxy_file=proxy_file, + password_file=password_file, + encode_username=True, + ) else: myproxy_username = info["username"] # type: ignore[index, assignment] - return {"proxy": proxy_file, "myproxy_username": myproxy_username} + return {"proxy_file": proxy_file, "myproxy_username": myproxy_username} def create_job_file_factory(self, **kwargs) -> CrabJobFileFactory: return self.task.crab_create_job_file_factory(**kwargs) # type: ignore[attr-defined] diff --git a/law/contrib/wlcg/util.py b/law/contrib/wlcg/util.py index 09affe4f..368da680 100644 --- a/law/contrib/wlcg/util.py +++ b/law/contrib/wlcg/util.py @@ -25,10 +25,10 @@ import functools from law.util import ( - interruptable_popen, create_hash, human_duration, parse_duration, quote_cmd, + interruptable_popen, create_hash, human_duration, parse_duration, quote_cmd, make_list, ) from law.logger import get_logger -from law._types import TextIO +from law._types import TextIO, Sequence logger = get_logger(__name__) @@ -359,10 +359,10 @@ def delegate_myproxy( usercert: str | pathlib.Path | None = None, username: str | None = None, proxy_file: str | pathlib.Path | None = None, - encode_username: bool = True, + encode_username: bool = False, cred_lifetime: int = 720, proxy_lifetime: int = 168, - retrievers: str | None = None, + retrievers: str | Sequence[str] | None = None, rfc: bool = True, vo: str | None = None, create_local: bool = False, @@ -379,7 +379,7 @@ def delegate_myproxy( *encode_username* is set, *username* is the sha1 encoded. The credential and proxy lifetimes can be defined in hours by *cred_lifetime* and - *proxy_lifetime*. When *retrievers* is given, it is passed as both ``--renewable_by`` and + *proxy_lifetime*. When *retrievers* are given, they are passed as both ``--renewable_by`` and ``--retrievable_by_cert`` to the underlying ``myproxy-init`` command. When *rfc* is *True*, the delegated proxy will be RFC compliant. To pass VOMS attributes to the @@ -418,10 +418,8 @@ def delegate_myproxy( "-c", str(cred_lifetime), ] if retrievers: - cmd.extend([ - "-x", "-R", retrievers, - "-x", "-Z", retrievers, - ]) + for r in make_list(retrievers): + cmd.extend(["-x", "-R", r, "-x", "-Z", r]) if vo: cmd.extend(["-m", vo]) if create_local: @@ -454,20 +452,20 @@ def delegate_myproxy( stdin_delay=0.2, )[0] - if code == 0: - return username - - if silent: - return None + # stop in case of an error + if code != 0: + if silent: + return None + raise Exception(f"myproxy-init failed with code {code}") - raise Exception(f"myproxy-init failed with code {code}") + return username def get_myproxy_info( endpoint: str = "myproxy.cern.ch", username: str | None = None, proxy_file: str | pathlib.Path | None = None, - encode_username: bool = True, + encode_username: bool = False, silent: bool = False, ) -> dict[str, str | int] | None: """ diff --git a/law/patches.py b/law/patches.py index 48b2929c..7c585106 100644 --- a/law/patches.py +++ b/law/patches.py @@ -17,6 +17,7 @@ import luigi # type: ignore[import-untyped] import law +from law.util import patch_object from law.logger import get_logger from law._types import Callable @@ -163,7 +164,8 @@ def patch_worker_add_task() -> None: """ Patches the ``luigi.worker.Worker._add_task`` method to skip dependencies of the triggered task when running in a sandbox, as dependencies are already controlled from outside the sandbox. - Also, the severity of luigi's interface logging is increased when running in a sandbox. + To reduce redundant logs, the severity of luigi's interface logging is increased when running in + a sandbox. In addition, info logs about repeatedly added tasks are suppressed. """ _add_task_orig = luigi.worker.Worker._add_task @@ -171,8 +173,17 @@ def patch_worker_add_task() -> None: @functools.wraps(_add_task_orig) def _add_task(self, *args, **kwargs): + # identify if the task was added before with the same status and if so, patch the info log + task = self._scheduled_tasks.get(kwargs["task_id"]) + is_dup = (task, kwargs["status"], kwargs["runnable"]) in self._add_task_history + info_orig = interface_logger.info + def info(msg, *args, **kwargs): + if is_dup and msg.startswith("Informed scheduler"): + return + return info_orig(msg, *args, **kwargs) + + # check if sandboxed and adjust level previous_level = interface_logger.level - if law.sandbox.base._sandbox_switched: # increase the log level interface_logger.setLevel(logging.WARNING) @@ -182,7 +193,8 @@ def _add_task(self, *args, **kwargs): kwargs["deps"] = None try: - return _add_task_orig(self, *args, **kwargs) + with patch_object(interface_logger, "info", info): + return _add_task_orig(self, *args, **kwargs) finally: interface_logger.setLevel(previous_level) diff --git a/law/task/base.py b/law/task/base.py index e656ea82..7f526a7f 100644 --- a/law/task/base.py +++ b/law/task/base.py @@ -681,7 +681,7 @@ def _repr_params(self, all_params: bool = False) -> dict[str, Any]: include = ( param.significant and not multi_match(name, exclude) and - (value not in (None, "NO_STR") or name not in self.exclude_params_repr_empty) + (value not in (None, "NO_STR", ()) or name not in self.exclude_params_repr_empty) ) if include: params[name] = value diff --git a/law/util.py b/law/util.py index 3e7d66d7..652c832d 100644 --- a/law/util.py +++ b/law/util.py @@ -8,7 +8,7 @@ __all__ = [ # singleton values - "default_lock", "io_lock", "console_lock", "no_value", + "default_lock", "io_lock", "console_lock", "mp_manager", "no_value", # path and task helpers "rel_path", "law_src_path", "law_home_path", "law_run", "common_task_params", "increment_path", # generic helpers @@ -52,6 +52,7 @@ import datetime import random import threading +import multiprocessing import shlex import inspect import logging @@ -82,6 +83,9 @@ io_lock = threading.Lock() console_lock = threading.Lock() +# globally usable manager for mp objects +mp_manager = multiprocessing.Manager() + class NoValue(object): diff --git a/law/workflow/local.py b/law/workflow/local.py index c27ce981..62c7fb22 100644 --- a/law/workflow/local.py +++ b/law/workflow/local.py @@ -16,12 +16,14 @@ from law.workflow.base import BaseWorkflow, BaseWorkflowProxy from law.target.collection import SiblingFileCollectionBase from law.logger import get_logger -from law.util import DotDict +from law.util import mp_manager, DotDict from law._types import Any, Iterator, Callable logger = get_logger(__name__) +_tasks_yielded = mp_manager.dict() + class LocalWorkflowProxy(BaseWorkflowProxy): """ @@ -30,10 +32,16 @@ class LocalWorkflowProxy(BaseWorkflowProxy): workflow_type = "local" - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) + @property + def _local_workflow_has_yielded(self) -> bool: + return self.live_task_id in _tasks_yielded - self._local_workflow_has_yielded = False + @_local_workflow_has_yielded.setter + def _local_workflow_has_yielded(self, value: bool) -> None: + if value: + _tasks_yielded[self.live_task_id] = True + else: + _tasks_yielded.pop(self.live_task_id, None) def requires(self) -> Any: reqs = super().requires()