Skip to content

Commit

Permalink
Adapt to master changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Jan 13, 2025
1 parent 9c02f95 commit f706599
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 76 deletions.
59 changes: 40 additions & 19 deletions law/contrib/cms/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class CrabJobManager(BaseJobManager):
submission_task_name_cre = re.compile(r"^Task\s+name\s*\:\s+([^\s]+)\s*$")
submission_log_file_cre = re.compile(r"^Log\s+file\s+is\s+([^\s]+\.log)\s*$")
query_server_status_cre = re.compile(r"^Status\s+on\s+the\s+CRAB\s+server\s*\:\s+([^\s].*)$")
query_server_failure_cre = re.compile(r"^Failure\s+message\s+from\s+server\s*\:\s+([^\s].*)$")
query_user_cre = re.compile(r"^Task\s+name\s*:\s+\d+_\d+\:([^_]+)_.+$")
query_scheduler_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+([^\s]+).+$")
query_scheduler_id_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+crab.*\@.+[^\d](\d+)\..+$") # noqa
Expand All @@ -70,7 +71,8 @@ class CrabJobManager(BaseJobManager):
def __init__(
self,
sandbox_name: str | None = None,
proxy: str | None = None,
proxy_file: str | None = None,
myproxy_username: str | None = None,
instance: str | None = None,
threads: int = 1,
) -> None:
Expand All @@ -89,7 +91,8 @@ def __init__(
)

# store attributes
self.proxy = proxy
self.proxy_file = proxy_file
self.myproxy_username = myproxy_username
self.instance = instance
self.threads = threads

Expand Down Expand Up @@ -177,17 +180,19 @@ def submit( # type: ignore[override]
job_file: str | pathlib.Path,
*,
job_files: Sequence[str | pathlib.Path] | None = None,
proxy: str | None = None,
instance: str | None = None,
proxy_file: str | None = None,
myproxy_username: str | None = None,
instance: str | None = None,
retries: int = 0,
retry_delay: int | float = 3,
silent: bool = False,
_processes: list | None = None,
) -> list[JobId] | None:
# default arguments
if proxy is None:
proxy = self.proxy
if proxy_file is None:
proxy_file = self.proxy_file
if myproxy_username is None:
myproxy_username = self.myproxy_username
if instance is None:
instance = self.instance

Expand All @@ -198,8 +203,8 @@ def submit( # type: ignore[override]
while True:
# build the command
cmd = ["crab", "submit", "--config", job_file_name]
if proxy:
cmd += ["--proxy", proxy]
if proxy_file:
cmd += ["--proxy", proxy_file]
if instance:
cmd += ["--instance", instance]
cmd_str = quote_cmd(cmd)
Expand Down Expand Up @@ -280,21 +285,28 @@ def cancel( # type: ignore[override]
proj_dir: str | pathlib.Path,
*,
job_ids: list[JobId] | None = None,
proxy: str | None = None,
instance: str | None = None,
proxy_file: str | None = None,
myproxy_username: str | None = None,
instance: str | None = None,
silent: bool = False,
_processes: list | None = None,
) -> dict[JobId, None]:
self._check_proj_dir(proj_dir)

# default arguments
if job_ids is None:
job_ids = self._job_ids_from_proj_dir(proj_dir)
if proxy_file is None:
proxy_file = self.proxy_file
if myproxy_username is None:
myproxy_username = self.myproxy_username
if instance is None:
instance = self.instance

# build the command
cmd = ["crab", "kill", "--dir", str(proj_dir)]
if proxy:
cmd += ["--proxy", proxy]
if proxy_file:
cmd += ["--proxy", proxy_file]
if instance:
cmd += ["--instance", instance]
cmd_str = quote_cmd(cmd)
Expand Down Expand Up @@ -326,9 +338,9 @@ def cleanup( # type: ignore[override]
proj_dir: str | pathlib.Path,
*,
job_ids: list[JobId] | None = None,
proxy: str | None = None,
instance: str | None = None,
proxy_file: str | None = None,
myproxy_username: str | None = None,
instance: str | None = None,
silent: bool = False,
_processes: list | None = None,
) -> dict[JobId, None]:
Expand All @@ -347,28 +359,35 @@ def query( # type: ignore[override]
proj_dir: str | pathlib.Path,
*,
job_ids: list[JobId] | None = None,
proxy: str | None = None,
instance: str | None = None,
proxy_file: str | None = None,
myproxy_username: str | None = None,
instance: str | None = None,
skip_transfers: bool | None = None,
silent: bool = False,
_processes: list | None = None,
) -> dict[JobId, dict[str, Any]] | None:
self._check_proj_dir(proj_dir)

# default arguments
proj_dir = str(proj_dir)
log_data = self._parse_log_file(os.path.join(proj_dir, "crab.log"))
if job_ids is None:
job_ids = self._job_ids_from_proj_dir(proj_dir, log_data=log_data)
if proxy_file is None:
proxy_file = self.proxy_file
if myproxy_username is None:
myproxy_username = self.myproxy_username
if instance is None:
instance = self.instance

# when output collection is disabled, we can consider all "transferring" states as finished
if skip_transfers is None:
skip_transfers = str(log_data.get("disable_output_collection")).lower() == "true"

# build the command
cmd = ["crab", "status", "--dir", proj_dir, "--json"]
if proxy:
cmd += ["--proxy", proxy]
if proxy_file:
cmd += ["--proxy", proxy_file]
if instance:
cmd += ["--instance", instance]
cmd_str = quote_cmd(cmd)
Expand Down Expand Up @@ -427,6 +446,7 @@ def parse_query_output(
cls.query_scheduler_status_cre,
cls.query_json_line_cre,
cls.query_monitoring_url_cre,
cls.query_server_failure_cre,
]
values: list[str | None] = len(cres) * [None] # type: ignore[assignment]
for line in out.replace("\r", "").split("\n"):
Expand All @@ -446,6 +466,7 @@ def parse_query_output(
scheduler_status,
json_line,
monitoring_url,
server_failure,
) = values

# helper to build extra info
Expand Down Expand Up @@ -482,7 +503,7 @@ def extra(
status = cls.PENDING
elif server_status in failed_server_states:
status = cls.FAILED
error = "submission failed"
error = server_failure or "submission failed"
else:
s = ",".join(map("'{}'".format, pending_server_states | failed_server_states))
raise Exception(
Expand Down
65 changes: 48 additions & 17 deletions law/contrib/cms/scripts/delegate_myproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,68 @@
Script to trigger a myproxy delegation, e.g. for crab submission.
"""


def delegate(renew, endpoint, username, password_file, vo, voms_proxy):
from __future__ import annotations


def delegate(
renew: bool,
endpoint: str,
username: str | None,
password_file: str | None,
vo: str | None,
voms_proxy: bool,
crab: bool,
/,
) -> None:
import law

law.contrib.load("cms", "wlcg")

# settings
encode_username = False
retrievers: list[str] | None = None

# crab mode
if crab:
encode_username = True
voms_proxy = True
username = None
retrievers = law.cms.util._default_crab_receivers # type: ignore[attr-defined]

# when not renewing, check if a previous delegation exists
if not renew:
info = law.wlcg.get_myproxy_info(
info = law.wlcg.get_myproxy_info( # type: ignore[attr-defined]
endpoint=endpoint,
username=username,
encode_username=encode_username,
silent=True,
)
if info:
print("existing myproxy delegation found for username {}".format(info["username"]))
print(f"existing myproxy delegation found for username {info['username']}")
return

if voms_proxy and (renew or not law.wlcg.check_vomsproxy_validity()):
if voms_proxy and (renew or not law.wlcg.check_vomsproxy_validity()): # type: ignore[attr-defined] # noqa
print("\nrenewing voms-proxy")
law.cms.renew_vomsproxy(vo=vo, password_file=password_file)
law.cms.renew_vomsproxy(vo=vo, password_file=password_file) # type: ignore[attr-defined]

# create a new delegation
print("\ndelegating to {}".format(endpoint))
law.cms.delegate_myproxy(
print(f"\ndelegating to {endpoint}")
law.cms.delegate_myproxy( # type: ignore[attr-defined]
endpoint=endpoint,
username=username,
encode_username=encode_username,
password_file=password_file,
vo=vo,
retrievers=retrievers,
)


def main():
def main() -> int:
from argparse import ArgumentParser

import law

default_pf = law.config.get_expanded("job", "crab_password_file")
cfg = law.config.Config.instance()

parser = ArgumentParser(
prog="law_cms_delegate_myproxy",
Expand All @@ -56,15 +81,13 @@ def main():
"--endpoint",
"-e",
default="myproxy.cern.ch",
help="the server endpoint; default: myproxy.cern.ch",
help="the server endpoint; default: %(default)s",
)
parser.add_argument(
"--password-file",
"-p",
help="a file containing the certificate password" + (
"; default: {}".format(default_pf) if default_pf else ""
),
default=default_pf,
default=(pf := cfg.get_expanded("job", "crab_password_file")),
help="a file containing the certificate password" + (f"; default: {pf}" if pf else ""),
)
parser.add_argument(
"--username",
Expand All @@ -74,15 +97,20 @@ def main():
parser.add_argument(
"--vo",
"-m",
default="cms",
help="virtual organization to use; default: cms",
default=law.cms.util._default_vo(), # type: ignore[attr-defined]
help="virtual organization to use; default: %(default)s",
)
parser.add_argument(
"--voms-proxy",
"-v",
action="store_true",
help="create a voms-proxy prior to the delegation",
)
parser.add_argument(
"--crab",
action="store_true",
help="adds crab-specific defaults",
)
args = parser.parse_args()

delegate(
Expand All @@ -92,8 +120,11 @@ def main():
args.password_file,
args.vo,
args.voms_proxy,
args.crab,
)

return 0


# entry hook
if __name__ == "__main__":
Expand Down
36 changes: 26 additions & 10 deletions law/contrib/cms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@

import law


law.contrib.load("wlcg")


# obtained via _get_crab_receivers below
_default_crab_receivers = [
"/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod)-tw(01|02).cern.ch|/DC=ch/DC=cern/OU=computers/CN=crab-dev-tw(01|02|03|04).cern.ch|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=cmscrab/CN=(817881|373708)/CN=Robot: cms crab|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=crabint1/CN=373708/CN=Robot: CMS CRAB Integration 1", # noqa
]


def _default_vo() -> str:
return os.getenv("LAW_CMS_VO", "cms")


class Site(object):
"""
Helper class that provides site-related data, mostly via simple properties. When *name* is
Expand Down Expand Up @@ -135,21 +144,28 @@ def lfn_to_pfn(lfn: str, redirector: str = "global") -> str:
def renew_vomsproxy(**kwargs) -> str | None:
"""
Renews a VOMS proxy in the exact same way that :py:func:`law.wlcg.renew_vomsproxy` does, but
with the *vo* attribute set to ``"cms"`` by default.
with the *vo* argument default to the environment variable LAW_CMS_VO or ``"cms"`` when empty.
"""
kwargs.setdefault("vo", "cms")
if "vo" not in kwargs:
kwargs["vo"] = _default_vo()
return law.wlcg.renew_vomsproxy(**kwargs) # type: ignore[attr-defined]


def delegate_myproxy(**kwargs) -> str | None:
"""
Delegates a X509 proxy to a myproxy server in the exact same way that
:py:func:`law.wlcg.delegate_myproxy` does, but with the *retrievers* argument set to a value
that is usually expected for crab submissions and the vo set to "cms".
:py:func:`law.wlcg.delegate_myproxy` does, but with the *vo* argument default to the environment
variable LAW_CMS_VO or ``"cms"`` when empty.
"""
kwargs.setdefault(
"retrievers",
"/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod|dev)-tw(01|02|03).cern.ch|/DC=ch/DC=cern/OU=computers/CN=stefanov(m|m2).cern.ch|/DC=ch/DC=cern/OU=computers/CN=dciangot-tw.cern.ch", # noqa
)
kwargs.setdefault("vo", "cms")
if "vo" not in kwargs:
kwargs["vo"] = _default_vo()
return law.wlcg.delegate_myproxy(**kwargs) # type: ignore[attr-defined]


def _get_crab_receivers() -> None:
from CRABClient.ClientUtilities import initLoggers, server_info # type: ignore[import-not-found] # noqa
from CRABClient.Commands.createmyproxy import createmyproxy # type: ignore[import-not-found]

cmd = createmyproxy(logger=initLoggers()[1])
alldns = server_info(crabserver=cmd.crabserver, subresource="delegatedn")
print(alldns.get("services"))
Loading

0 comments on commit f706599

Please sign in to comment.