diff --git a/docker/spider-query-cronjob/Dockerfile b/docker/spider-query-cronjob/Dockerfile new file mode 100644 index 00000000..e45a72ec --- /dev/null +++ b/docker/spider-query-cronjob/Dockerfile @@ -0,0 +1,34 @@ +FROM python:3.11-slim + +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Application user and directories +RUN useradd -ms /bin/bash cmsjobmon && \ + mkdir -p /opt/spider/scripts && \ + chown -R cmsjobmon:cmsjobmon /opt/spider + +WORKDIR /opt/spider + +COPY ./requirements.txt /tmp/requirements.txt +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r /tmp/requirements.txt + +COPY ./src ./src +COPY ./spider_cms_*.sh ./scripts/ +RUN cp ./src/spider_cms.py ./scripts/spider_cms.py && \ + chmod +x ./scripts/*.sh + +RUN chown -R cmsjobmon:cmsjobmon /home/cmsjobmon /opt/spider + +ENV PYTHONPATH=/opt/spider/src \ + SPIDER_WORKDIR=/opt/spider \ + AFFILIATION_DIR_LOCATION=/opt/spider/.affiliation_dir.json + +USER cmsjobmon diff --git a/docker/spider-query-cronjob/requirements.txt b/docker/spider-query-cronjob/requirements.txt new file mode 100644 index 00000000..23b127e8 --- /dev/null +++ b/docker/spider-query-cronjob/requirements.txt @@ -0,0 +1,15 @@ +# be consistent with: https://gitlab.cern.ch/ai/it-puppet-hostgroup-vocms/-/blob/master/data/fqdns/vocms0240.cern.ch.yaml#L7 +# check breaking changes before any update and ask to HTCondor-users if you see any problem +htcondor==23.0.28 + +# exact version is needed, previous versions include breaking changes; +# installs also stomp.py==7.0.0 +CMSMonitoring==0.6.12 + +# last version for Py v3.9 +requests~=2.31 + +# after any OpenSearch upgrade, it may change +opensearch-py~=2.5 + +click \ No newline at end of file diff --git a/docker/spider-query-cronjob/spider_cms_history.sh b/docker/spider-query-cronjob/spider_cms_history.sh new file mode 100755 index 00000000..a8b9a740 --- /dev/null +++ b/docker/spider-query-cronjob/spider_cms_history.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# Copied from vocms0240 +# +export SPIDER_WORKDIR="/opt/spider" +export AFFILIATION_DIR_LOCATION="$SPIDER_WORKDIR/.affiliation_dir.json" +export PYTHONPATH="$SPIDER_WORKDIR/src/:$PYTHONPATH" +export CMS_HTCONDOR_TOPIC="/topic/cms.jobmon.condor" + +# PROD +export CMS_HTCONDOR_PRODUCER="condor" +export CMS_HTCONDOR_BROKER="cms-mb.cern.ch" +_ES_INDEX_TEMPLATE="cms" + +_LOGDIR=$SPIDER_WORKDIR/log_history/ +_LOG_LEVEL="WARNING" +_ALERT_EMAILS="cms-comp-monit-alerts@cern.ch" +_ES_BUNCH_SIZE=100 +_QUERY_POOL_SIZE=16 +_UPLOAD_POOL_SIZE=8 + +cd $SPIDER_WORKDIR || exit +source "$SPIDER_WORKDIR/venv/bin/activate" + +# ./scripts/cronAffiliation.sh # First run + +python scripts/spider_cms.py \ + --feed_amq \ + --feed_es \ + --log_dir $_LOGDIR \ + --log_level $_LOG_LEVEL \ + --es_bunch_size $_ES_BUNCH_SIZE \ + --query_pool_size $_QUERY_POOL_SIZE \ + --upload_pool_size $_UPLOAD_POOL_SIZE \ + --email_alerts "$_ALERT_EMAILS" \ + --collectors_file $SPIDER_WORKDIR/etc/collectors.json \ + --es_index_template $_ES_INDEX_TEMPLATE + +# crontab entry (to run every 12 min, starting from 5 min past the hour): +# i.e. at 5,17,29,41,53 past the hour +# 5-59/12 * * * * /opt/spider/scripts/spider_cms.sh diff --git a/docker/spider-query-cronjob/spider_cms_queues.sh b/docker/spider-query-cronjob/spider_cms_queues.sh new file mode 100755 index 00000000..5ec8be06 --- /dev/null +++ b/docker/spider-query-cronjob/spider_cms_queues.sh @@ -0,0 +1,38 @@ +#!/bin/bash +# Copied from vocms0240 +# +export SPIDER_WORKDIR="/opt/spider" +export AFFILIATION_DIR_LOCATION="$SPIDER_WORKDIR/.affiliation_dir.json" +export PYTHONPATH="$SPIDER_WORKDIR/src/:$PYTHONPATH" +export CMS_HTCONDOR_TOPIC="/topic/cms.jobmon.condor" + +# PROD +export CMS_HTCONDOR_PRODUCER="condor" +export CMS_HTCONDOR_BROKER="cms-mb.cern.ch" +_LOGDIR=$SPIDER_WORKDIR/log/ +_LOG_LEVEL="WARNING" + +_QUERY_QUEUE_BATCH_SIZE=100 +_QUERY_POOL_SIZE=16 +_UPLOAD_POOL_SIZE=8 + +cd $SPIDER_WORKDIR || exit +source "$SPIDER_WORKDIR/venv/bin/activate" + +# ./scripts/cronAffiliation.sh # First run + +python scripts/spider_cms.py \ + --feed_amq \ + --log_dir $_LOGDIR \ + --log_level $_LOG_LEVEL \ + --skip_history \ + --process_queue \ + --query_queue_batch_size $_QUERY_QUEUE_BATCH_SIZE \ + --query_pool_size $_QUERY_POOL_SIZE \ + --upload_pool_size $_UPLOAD_POOL_SIZE \ + --collectors_file $SPIDER_WORKDIR/etc/collectors.json + +#python spider_cms.py --log_dir $LOGDIR --log_level WARNING --feed_amq --email_alerts 'cms-comp-monit-alerts@cern.ch' --skip_history --process_queue --query_queue_batch_size 100 --query_pool_size 16 --upload_pool_size 8 --collectors_file $SPIDER_WORKDIR/etc/collectors.json + +# crontab entry (to run every 12 min): +# */12 * * * * /opt/spider/scripts/spider_cms.sh diff --git a/docker/spider-query-cronjob/src/AffiliationManager.py b/docker/spider-query-cronjob/src/AffiliationManager.py new file mode 100644 index 00000000..2f51c82f --- /dev/null +++ b/docker/spider-query-cronjob/src/AffiliationManager.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Author: Christian Ariza +# pylint: disable=line-too-long +import errno +import json +import logging +import os +from datetime import datetime, timedelta +from logging.handlers import RotatingFileHandler +from pathlib import Path + +import requests + +_DEFAULT_WORKDIR = os.getenv("SPIDER_WORKDIR", "/opt/spider") +AFFILIATION_LOG_DIR = os.path.join(_DEFAULT_WORKDIR, "log_aff") + + +def setup_logging(): + """ + Affiliation cache logger + """ + _logger = logging.getLogger("affiliation_logger") + _logger.setLevel(logging.INFO) + try: + if not os.path.exists(AFFILIATION_LOG_DIR): + os.makedirs(AFFILIATION_LOG_DIR) + except Exception as e: + _logger.warning("AFFILIATION_LOG_DIR does not exist: " + AFFILIATION_LOG_DIR + str(e)) + log_file = os.path.join(AFFILIATION_LOG_DIR, "affiliation.log") + log_handler = RotatingFileHandler(log_file, maxBytes=100000, backupCount=5) + log_handler.setFormatter(logging.Formatter("%(asctime)s : %(name)s:%(levelname)s - %(message)s")) + _logger.addHandler(log_handler) + + +setup_logging() +aff_logger = logging.getLogger("affiliation_logger") + + +class AffiliationManager: + __DEFAULT_DIR_PATH = Path(os.path.join(_DEFAULT_WORKDIR, ".affiliation_dir.json")) + __DEFAULT_URL = "https://cms-cric.cern.ch/api/accounts/user/query/?json" + __DEFAULT_CA_CERT = "/etc/pki/tls/certs/CERN-bundle.pem" + __DEFAULT_ROBOT_CERT = "/home/cmsjobmon/.globus/usercert.pem" + __DEFAULT_ROBOT_KEY = "/home/cmsjobmon/.globus/userkey.pem" + + def __init__( + self, + dir_file=__DEFAULT_DIR_PATH, + recreate=False, + recreate_older_days=None, + service_url=__DEFAULT_URL, + robot_cert=__DEFAULT_ROBOT_CERT, + robot_key=__DEFAULT_ROBOT_KEY, + ca_cert=__DEFAULT_CA_CERT, + ): + """ + params: + recreate: boolean + recreate_older_days: int, recreate the dir if is older + than that number of days. + """ + self.path = Path(dir_file) + self.url = service_url + self.path = Path(dir_file) + self.url = service_url + self.robot_cert = robot_cert + self.robot_key = robot_key + self.ca_cert = ca_cert + if not recreate and recreate_older_days: + if self.path.is_file(): + _min_date = datetime.now() - timedelta(days=recreate_older_days) + _dir_time = datetime.fromtimestamp(self.path.stat().st_mtime) + recreate = _dir_time < _min_date + else: + recreate = True + + try: + self.__dir = self.loadOrCreateDirectory(recreate) + self.__dn_dir = { + person["dn"]: person for person in list(self.__dir.values()) + } + except ( + IOError, + requests.RequestException, + requests.HTTPError, + json.JSONDecodeError, + ) as cause: + aff_logger.error("Affiliation instance initialization error: " + str(cause)) + raise AffiliationManagerException from cause + + def loadOrCreateDirectory(self, recreate=False): + """ + Create or load from a json file an inverted + index of instutions by person login. e.g.: + + { + 'valya':{u'country': u'US', + u'institute': u'Cornell University'}, + 'belforte': {u'country': u'IT', + u'institute': u'Universita e INFN Trieste'} + ... + } + raises IOError if the file doesn't exist (of it cannot be read) + RequestException if something happen with the request + HTTPError if the response was something different + to a success + """ + aff_logger.debug("Affiliation load or create args. recreate:" + str(recreate)) + _tmp_dir = None + if recreate: + # response = requests.get(self.url) #no auth + cert = (self.robot_cert, self.robot_key) + response = requests.get(self.url, cert=cert, verify=self.ca_cert) + response.raise_for_status() + + _json = json.loads(response.text) + _tmp_dir = {} + for person in list(_json.values()): + login = None + for profile in person["profiles"]: + if "login" in profile: + login = profile["login"] + break + if login and "institute" in person: + _tmp_dir[login] = { + "institute": person["institute"], + "country": person["institute_country"], + "dn": person["dn"], + } + aff_logger.debug("Temp affiliations before written: " + str(_tmp_dir)) + # Only override the file if the dict is not empty. + if _tmp_dir: + with open(self.path, "w") as _dir_file: + json.dump(_tmp_dir, _dir_file) + aff_logger.info("Successfully recreated: " + str(self.path)) + elif self.path.is_file(): + with open(self.path, "r") as dir_file: + _tmp_dir = json.load(dir_file) + else: + raise IOError(errno.ENOENT, os.strerror(errno.ENOENT), self.path) + return _tmp_dir + + def getAffiliation(self, login=None, dn=None): + """ + Returns a python dictionary with the institute and country + for the given login or dn. + Returns None if not found. + """ + if login: + return self.__dir.get(login) + if dn: + return self.__dn_dir.get(dn) + return None + + +class AffiliationManagerException(Exception): + """ + Exception wrapper for problems that prevents us to obtain the affiliation info. + """ + pass diff --git a/docker/spider-query-cronjob/src/convert_to_json.py b/docker/spider-query-cronjob/src/convert_to_json.py new file mode 100644 index 00000000..1d8e8954 --- /dev/null +++ b/docker/spider-query-cronjob/src/convert_to_json.py @@ -0,0 +1,1493 @@ +#!/usr/bin/python + +import os +import re +import json +import time +import classad +import calendar +import logging +import datetime +import zlib +import base64 +from AffiliationManager import ( + AffiliationManager, + AffiliationManagerException, +) + +string_vals = { + "AutoClusterId", + "AffiliationInstitute", + "AffiliationCountry", + "Processor", + "ChirpCMSSWCPUModels", + "CPUModel", + "CPUModelName", + "ChirpCMSSWCPUModels", + "CMSPrimaryPrimaryDataset", + "CMSPrimaryProcessedDataset", + "CMSPrimaryDataTier", + "CMSSWVersion", + "CMSSWMajorVersion", + "CMSSWReleaseSeries", + "CRAB_JobType", + "CRAB_JobSW", + "CRAB_JobArch", + "CRAB_Id", + "CRAB_ISB", + "CRAB_PostJobStatus", + "CRAB_Workflow", + "CRAB_UserRole", + "CMSGroups", + "CRAB_UserHN", + "CRAB_UserGroup", + "CRAB_TaskWorker", + "CRAB_SiteWhitelist", + "CRAB_SiteBlacklist", + "CRAB_SplitAlgo", + "CRAB_PrimaryDataset", + "Args", + "AccountingGroup", + "Cmd", + "CMS_JobType", + "CMS_WMTool", + "DESIRED_Archs", + "DESIRED_CMSDataLocations", + "DESIRED_CMSDataset", + "DESIRED_Sites", + "ExtDESIRED_Sites", + "FormattedCrabId", + "GlobalJobId", + "GlideinClient", + "GlideinEntryName", + "GlideinFactory", + "GlideinFrontendName", + "GlideinName", + "GLIDECLIENT_Name", + "GLIDEIN_Entry_Name", + "GLIDEIN_Factory", + "GlobusRSL", + "GridJobId", + "LastRemoteHost", + "MachineAttrCMSSubSiteName0", + "MATCH_EXP_JOB_GLIDECLIENT_Name", + "MATCH_EXP_JOB_GLIDEIN_ClusterId", + "MATCH_EXP_JOB_GLIDEIN_CMSSite", + "MATCH_EXP_JOB_GLIDEIN_Entry_Name", + "MATCH_EXP_JOB_GLIDEIN_Factory", + "MATCH_EXP_JOB_GLIDEIN_Name", + "MATCH_EXP_JOB_GLIDEIN_Schedd", + "MATCH_EXP_JOB_GLIDEIN_SEs", + "MATCH_EXP_JOB_GLIDEIN_Site", + "MATCH_EXP_JOB_GLIDEIN_SiteWMS", + "MATCH_EXP_JOB_GLIDEIN_SiteWMS_JobId", + "MATCH_EXP_JOB_GLIDEIN_SiteWMS_Queue", + "MATCH_EXP_JOB_GLIDEIN_SiteWMS_Slot", + "MachineAttrCUDACapability0", + "MachineAttrCUDADeviceName0", + "MachineAttrCUDADriverVersion0", + "Owner", + "Rank", + "RemoteHost", + "REQUIRED_OS", + "ShouldTransferFiles", + "StartdIpAddr", + "StartdPrincipal", + "User", + "WhenToTransferOutput", + "WMAgent_AgentName", + "WMAgent_RequestName", + "WMAgent_SubTaskName", + "x509UserProxyEmail", + "x509UserProxyFirstFQAN", + "x509UserProxyFQAN", + "x509userproxysubject", + "x509UserProxyVOName", + "InputData", + "Original_DESIRED_Sites", + "WMAgent_TaskType", + "NordugridRSL", + "Campaign", + "TaskType", + "DataLocations", + "Workflow", + "Site", + "Tier", + "Country", + "Status", + "Universe", + "ExitReason", + "LastHoldReason", + "RemoveReason", + "DESIRED_Overflow_Region", + "DESIRED_OpSysMajorVers", + "DESIRED_CMSDataset", + "DAGNodeName", + "DAGParentNodeNames", + "OverflowType", + "ScheddName", +} + +int_vals = { + "CRAB_Retry", + "BytesRecvd", + "BytesSent", + "ClusterId", + "CommittedSlotTime", + "CumulativeSlotTime", + "CumulativeSuspensionTime", + "CurrentHosts", + "CRAB_JobCount", + "DelegatedProxyExpiration", + "DiskUsage_RAW", + "ExecutableSize_RAW", + "ExitStatus", + "GlobusStatus", + "ImageSize_RAW", + "JobPrio", + "JobRunCount", + "JobStatus", + "JobFailed", + "JobUniverse", + "LastJobStatus", + "LocalSysCpu", + "LocalUserCpu", + "MachineAttrCpus0", + "MachineAttrSlotWeight0", + "MachineAttrCUDAComputeUnits0", + "MachineAttrCUDACoresPerCU0", + "MachineAttrCUDAGlobalMemoryMb0", + "MATCH_EXP_JOB_GLIDEIN_Job_Max_Time", + "MATCH_EXP_JOB_GLIDEIN_MaxMemMBs", + "MATCH_EXP_JOB_GLIDEIN_Max_Walltime", + "MATCH_EXP_JOB_GLIDEIN_Memory", + "MATCH_EXP_JOB_GLIDEIN_ProcId", + "MATCH_EXP_JOB_GLIDEIN_ToDie", + "MATCH_EXP_JOB_GLIDEIN_ToRetire", + "MaxHosts", + "MaxWallTimeMins_RAW", + "MemoryUsage", + "MinHosts", + "NumGlobusSubmits" "NumJobMatches", + "NumJobStarts", + "NumRestarts", + "NumShadowStarts", + "NumSystemHolds", + "PilotRestLifeTimeMins", + "PostJobPrio1", + "PostJobPrio2", + "ProcId", + "RecentBlockReadKbytes", + "RecentBlockReads", + "RecentBlockWriteKbytes", + "RecentBlockWrites", + "RemoteSlotID", + "RemoteSysCpu", + "RemoteUserCpu", + "RemoteWallClockTime", + "RequestCpus", + "RequestDisk_RAW", + "RequestMemory_RAW", + "ResidentSetSize_RAW", + "StatsLifetimeStarter", + "TotalSuspensions", + "TransferInputSizeMB", + "WallClockCheckpoint", + "WMAgent_JobID", + "DesiredSiteCount", + "DataLocationsCount", +} + +date_vals = { + "CompletionDate", + "CRAB_TaskCreationDate", + "EnteredCurrentStatus", + "JobCurrentStartDate", + "JobCurrentStartExecutingDate", + "JobCurrentStartTransferOutputDate", + "JobLastStartDate", + "JobStartDate", + "LastMatchTime", + "LastSuspensionTime", + "LastVacateTime_RAW", + "MATCH_GLIDEIN_ToDie", + "MATCH_GLIDEIN_ToRetire", + "QDate", + "ShadowBday", + "StageInFinish", + "StageInStart", + "JobFinishedHookDone", + "LastJobLeaseRenewal", + "LastRemoteStatusUpdate", + "GLIDEIN_ToDie", + "GLIDEIN_ToRetire", + "DataCollectionDate", + "RecordTime", + "ChirpCMSSWLastUpdate", +} + +ignore = { + "Arguments", + "CmdHash", + "CRAB_UserDN", + "CRAB_Destination", + "CRAB_DBSURL", + "CRAB_ASOURL", + "CRAB_ASODB", + "CRAB_AdditionalOutputFiles", + "CRAB_EDMOutputFiles", + "CRAB_TFileOutputFiles", + "CRAB_oneEventMode", + "CRAB_NumAutomJobRetries", + "CRAB_localOutputFiles", + "CRAB_ASOTimeout", + "CRAB_OutTempLFNDir", + "CRAB_PublishDBSURL", + "CRAB_PublishGroupName", + "CRAB_RestURInoAPI", + "CRAB_RestHost", + "CRAB_ReqName", + "CRAB_RetryOnASOFailures", + "CRAB_StageoutPolicy", + "SubmitEventNotes", + "DAGManNodesMask", + "DAGManNodesLog", + "DAGManJobId", + "accounting_group", + "AcctGroup", + "AcctGroupUser", + "AllowOpportunistic", + "AutoClusterAttrs", + "BufferBlockSize", + "BufferSize", + "CondorPlatform", + "CondorVersion", + "DiskUsage", + "Err", + "Environment", + "EnvDelim", + "Env", + "ExecutableSize", + "HasPrioCorrection", + "GlideinCredentialIdentifier", + "GlideinLogNr", + "GlideinSecurityClass", + "GlideinSlotsLayout", + "GlideinWebBase", + "GlideinWorkDir", + "ImageSize", + "In", + "Iwd", + "JobAdInformationAttrs", + "job_ad_information_attrs", + "JOB_GLIDECLIENT_Name", + "JOB_GLIDEIN_ClusterId", + "JOB_GLIDEIN_CMSSite", + "JOBGLIDEIN_CMSSite", + "JOB_GLIDEIN_Entry_Name", + "JOB_GLIDEIN_Factory", + "JOB_GLIDEIN_Job_Max_Time", + "JOB_GLIDEIN_MaxMemMBs", + "JOB_GLIDEIN_Max_Walltime", + "JOB_GLIDEIN_Memory", + "JOB_GLIDEIN_Name", + "JOB_GLIDEIN_ProcId", + "JOB_GLIDEIN_Schedd", + "JOB_GLIDEIN_SEs", + "JOB_GLIDEIN_Site", + "JOB_GLIDEIN_SiteWMS", + "JOB_GLIDEIN_SiteWMS_JobId", + "JOB_GLIDEIN_SiteWMS_Queue", + "JOB_GLIDEIN_SiteWMS_Slot", + "JOB_GLIDEIN_ToDie", + "JOB_GLIDEIN_ToRetire", + "JobLeaseDuration", + "JobNotification", + "JOB_Site", + "Managed", + "MATCH_EXP_JOBGLIDEIN_CMSSite", + "MATCH_EXP_JOB_Site", + "MATCH_GLIDECLIENT_Name", + "MATCH_GLIDEIN_ClusterId", + "MATCH_GLIDEIN_CMSSite", + "MATCH_GLIDEIN_Entry_Name", + "MATCH_GLIDEIN_Factory", + "MATCH_GLIDEIN_Job_Max_Time", + "MATCH_GLIDEIN_MaxMemMBs", + "MATCH_GLIDEIN_Max_Walltime", + "MATCH_GLIDEIN_Name", + "MATCH_GLIDEIN_ProcId", + "MATCH_GLIDEIN_Schedd", + "MATCH_GLIDEIN_SEs", + "MATCH_GLIDEIN_Site", + "MATCH_GLIDEIN_SiteWMS", + "MATCH_GLIDEIN_SiteWMS_JobId", + "MATCH_GLIDEIN_SiteWMS_Queue", + "MATCH_GLIDEIN_SiteWMS_Slot", + "MATCH_Memory", + "MyType", + "NiceUser", + "NumCkpts", + "NumCkpts_RAW", + "OnExitHold", + "OnExitRemove", + "OrigMaxHosts", + "Out", + "PeriodicHold", + "PeriodicRelease", + "PeriodicRemove", + "Prev_DESIRED_Sites", + "PublicClaimId", + "RequestDisk", + "RequestMemory", + "ResidentSetSize", + "REQUIRES_LOCAL_DATA", + "RecentBlockReadKbytes", + "RecentBlockReads", + "RecentBlockWriteKbytes", + "RecentBlockWrites", + "RootDir", + "ServerTime", + "SpooledOutputFiles", + "StreamErr", + "StreamOut", + "TargetType", + "TransferIn", + "TransferInput", + "TransferOutput", + "UserLog", + "UserLogUseXML", + "use_x509userproxy", + "x509userproxy", + "x509UserProxyExpiration", + "WantCheckpoint", + "WantRemoteIO", + "WantRemoteSyscalls", + "BlockReadKbytes", + "BlockReads", + "BlockWriteKbytes", + "BlockWrites", + "LocalSysCpu", + "LeaveJobInQueue", + "LocalUserCpu", + "JobMachineAttrs", + "LastRejMatchReason", + "MachineAttrGLIDEIN_CMSSite0", + "CMS_ALLOW_OVERFLOW", + "LastPublicClaimId", + "LastRemotePool", + "Used_Gatekeeper", + "DESIRED_OpSyses", +} + +no_idx = { + "CRAB_OutLFNDir", + "Args", + "Cmd", + "BytesRecvd", + "CoreSize", + "DelegatedProxyExpiration", + "Environment", + "RecentBlockReadKbytes", + "RecentBlockReads", + "RecentBlockWriteKbytes", + "RecentBlockWrites", + "RecentStatsLifetimeStarter", + "CurrentHosts", + "MachineAttrCpus0", + "MachineAttrSlotWeight0", + "LocalSysCpu", + "LocalUserCpu", + "MaxHosts", + "MinHosts", + "StartdIpAddr", + "StartdPrincipal", + "LastRemoteHost", +} + +no_analysis = { + "CRAB_PublishName", + "CRAB_PublishGroupName", + "CondorPlatform", + "CondorVersion", + "CurrentHosts", + "DESIRED_Archs", + "ShouldTransferFiles", + "TotalSuspensions", + "REQUIRED_OS", + "ShouldTransferFiles", + "WhenToTransferOutput", + "DAGParentNodeNames", +} + +bool_vals = { + "CurrentStatusUnknown", + "CRAB_Publish", + "CRAB_SaveLogsFlag", + "CRAB_TransferOutputs", + "GlobusResubmit", + "TransferQueued", + "TransferringInput", + "HasSingularity", + "NiceUser", + "ExitBySignal", + "CMSSWDone", + "HasBeenRouted", + "HasBeenOverflowRouted", + "HasBeenTimingTuned", + "MachineAttrCUDAECCEnabled0", +} + +# Fields to be kept in docs concerning running jobs +running_fields = { + "AccountingGroup", + "AutoClusterId", + "AffiliationInstitute", + "AffiliationCountry", + "BenchmarkJobDB12", + "Campaign", + "CMS_CampaignType", + "CMS_JobType", + "CMS_JobRetryCount", + "CMS_Pool", + "CMSGroups", + "CMSPrimaryDataTier", + "CMSSWKLumis", + "CMSSWWallHrs", + "CMSSWVersion", + "CMSSWMajorVersion", + "CMSSWReleaseSeries", + "CommittedCoreHr", + "CommittedTime", + "CoreHr", + "Country", + "CpuBadput", + "CpuEff", + "CpuEffOutlier", + "CpuEventRate", + "CpuTimeHr", + "CpuTimePerEvent", + "CRAB_AsyncDest", + "CRAB_DataBlock", + "CRAB_Id", + "CRAB_JobCount", + "CRAB_PostJobStatus", + "CRAB_Retry", + "CRAB_TaskCreationDate", + "CRAB_UserHN", + "CRAB_Workflow", + "CRAB_SplitAlgo", + "CMS_SubmissionTool", + "CMS_WMTool", + # "DataLocations", + "DESIRED_CMSDataset", + # "DESIRED_Sites", + "EnteredCurrentStatus", + "EventRate", + "FormattedCrabId", + "GlobalJobId", + "GLIDECLIENT_Name", + "GLIDEIN_ClusterId", + "GLIDEIN_Entry_Name", + "GLIDEIN_Factory", + "GLIDEIN_ProcId", + "HasSingularity", + "InputData", + "InputGB", + "JobPrio", + "JobCurrentStartDate", + "JobLastStartDate", + "JobUniverse", + "KEvents", + "MachineAttrCMSSF_ResourceType0", + "MachineAttrCMSSubSiteName0", + "MachineAttrGLIDEIN_OVERLOAD_ENABLED0", + "MaxWallTimeMins", + "MegaEvents", + "MemoryMB", + "OutputGB", + "QueueHrs", + "QDate", + "ReadTimeMins", + "RecordTime", + "RemoteHost", + "RequestCpus", + "RequestMemory", + "RequestMemory_Eval", + "ScheddName", + "Site", + "Status", + "TaskType", + "Tier", + "TimePerEvent", + "Type", + "WallClockHr", + "WMAgent_JobID", + "WMAgent_RequestName", + "WMAgent_SubTaskName", + "Workflow", + "DESIRED_Sites", + "DESIRED_SITES_Diff", + "DESIRED_SITES_Orig", + "EstimatedWallTimeMins", + "EstimatedWallTimeJobCount", + "PilotRestLifeTimeMins", + "LastRouted", + "LastTimingTuned", + "LPCRouted", + "MemoryUsage", + "PeriodicHoldReason", + "RouteType", + "HasBeenOverflowRouted", + "HasBeenRouted", + "HasBeenTimingTuned", +} + +status = { + 0: "Unexpanded", + 1: "Idle", + 2: "Running", + 3: "Removed", + 4: "Completed", + 5: "Held", + 6: "Error", +} + +universe = { + 1: "Standard", + 2: "Pipe", + 3: "Linda", + 4: "PVM", + 5: "Vanilla", + 6: "PVMD", + 7: "Scheduler", + 8: "MPI", + 9: "Grid", + 10: "Java", + 11: "Parallel", + 12: "Local", +} + +postjob_status_decode = { + "NOT RUN": "postProc", + "TRANSFERRING": "transferring", + "COOLOFF": "toRetry", + "FAILED": "failed", + "FINISHED": "finished", +} + +_launch_time = int(time.time()) + +# Initialize aff_mgr +aff_mgr = None +try: + aff_mgr = AffiliationManager( + recreate=False, + dir_file=os.getenv( + "AFFILIATION_DIR_LOCATION", + AffiliationManager._AffiliationManager__DEFAULT_DIR_PATH, + ), + ) +except AffiliationManagerException as e: + # If its not possible to create the affiliation manager + # Log it + logging.error("There were an error creating the affiliation manager, %s", e) + # Continue execution without affiliation. + + +def make_list_from_string_field(ad, key, split_re=r"[\s,]+\s*", default=None): + default = default or ["UNKNOWN"] + try: + return re.split(split_re, ad[key]) + except (TypeError, KeyError): + return default + + +def get_creation_time_from_taskname(ad): + """ + returns the task creation date as a timestamp given the task name. + CRAB task names includes the creation time in format %y%m%d_%H%M%S: + 190309_085131:adeiorio_crab_80xV2_ST_t-channel_top_4f_scaleup_inclusiveDecays_13TeV-powhegV2-madspin-pythia8 + """ + try: + _str_date = ad["CRAB_Workflow"].split(":")[0] + _naive_date = datetime.datetime.strptime(_str_date, "%y%m%d_%H%M%S") + return int(calendar.timegm(_naive_date.timetuple())) + except (KeyError, TypeError, ValueError): + # fallback to recordtime if there is not a CRAB_Workflow value + # or if it hasn't the expected format. + return recordTime(ad) + + +_cream_re = re.compile(r"CPUNumber = (\d+)") +_nordugrid_re = re.compile(r"\(count=(\d+)\)") +_camp_re = re.compile(r"[A-Za-z0-9_]+_[A-Z0-9]+-([A-Za-z0-9]+)-") +_prep_re = re.compile(r"[A-Za-z0-9_]+_([A-Z]+-([A-Za-z0-9]+)-[0-9]+)") +_rval_re = re.compile(r"[A-Za-z0-9]+_(RVCMSSW_[0-9]+_[0-9]+_[0-9]+)") +_prep_prompt_re = re.compile(r"(PromptReco|Repack|Express)_[A-Za-z0-9]+_([A-Za-z0-9]+)") +# Executable error messages in WMCore +_wmcore_exe_exmsg = re.compile(r"^Chirp_WMCore_[A-Za-z0-9]+_Exception_Message$") +# 2016 reRECO; of the form cerminar_Run2016B-v2-JetHT-23Sep2016_8020_160923_164036_4747 +_rereco_re = re.compile(r"[A-Za-z0-9_]+_Run20[A-Za-z0-9-_]+-([A-Za-z0-9]+)") +_generic_site = re.compile(r"^[A-Za-z0-9]+_[A-Za-z0-9]+_(.*)_") +_cms_site = re.compile(r"CMS[A-Za-z]*_(.*)_") +_cmssw_version = re.compile(r"CMSSW_((\d*)_(\d*)_.*)") + + +def convert_to_json( + ad, cms=True, return_dict=False, reduce_data=False, pool_name="Unknown" +): + if ad.get("TaskType") == "ROOT": + return None + result = {} + result["RecordTime"] = recordTime(ad) + result["DataCollection"] = ad.get("CompletionDate", 0) or _launch_time + result["DataCollectionDate"] = result["RecordTime"] + + result["ScheddName"] = ad.get("GlobalJobId", "UNKNOWN").split("#")[0] + result["CMS_Pool"] = pool_name + # Determine type + if cms: + result["Type"] = ad.get("CMS_Type", "unknown").lower() + analysis = isAnalysisJob(ad) + + if "CRAB_Id" in ad: + result["FormattedCrabId"] = get_formatted_CRAB_Id(ad.get("CRAB_Id")) + + if cms: + ad.setdefault( + "MATCH_EXP_JOB_GLIDEIN_CMSSite", + ad.get("MATCH_EXP_JOBGLIDEIN_CMSSite", + ad.get("MATCH_GLIDEIN_CMSSite", + ad.get("MachineAttrGLIDEIN_CMSSite0", + ad.get("MachineAttrCMSProcessingSiteName0", "Unknown")))), + ) + + bulk_convert_ad_data(ad, result) + + # Classify failed jobs + result["JobFailed"] = jobFailed(ad) + result["ErrorType"] = errorType(ad) + result["ErrorClass"] = errorClass(result) + result["ExitCode"] = commonExitCode(ad) + if "ExitCode" in ad: + result["CondorExitCode"] = ad["ExitCode"] + + if cms: + result["task"] = ad.get("WMAgent_SubTaskName") # add "task" field to unify with WMArchive + result["CMS_JobType"] = str( + ad.get("CMS_JobType", "Analysis" if analysis else "Unknown") + ) + result["CRAB_AsyncDest"] = str(ad.get("CRAB_AsyncDest", "Unknown")) + result["WMAgent_TaskType"] = ad.get("WMAgent_SubTaskName", "/UNKNOWN").rsplit( + "/", 1 + )[-1] + result["CMS_CampaignType"] = guess_campaign_type(ad, analysis) + result["Campaign"] = guessCampaign(ad, analysis, result["CMS_CampaignType"]) + task_type = result.get("CMS_extendedJobType") + if task_type == "UNKNOWN" or task_type is None: + task_type = result.get( + "CMS_TaskType", result["CMS_JobType"] if analysis else guessTaskType(ad) + ) + result["TaskType"] = task_type + result["Workflow"] = guessWorkflow(ad, analysis) + now = time.time() + if ad.get("JobStatus") == 2 and (ad.get("EnteredCurrentStatus", now + 1) < now): + ad["RemoteWallClockTime"] = int(now - ad["EnteredCurrentStatus"]) + ad["CommittedTime"] = ad["RemoteWallClockTime"] + result["WallClockHr"] = ad.get("RemoteWallClockTime", 0) / 3600.0 + + result["PilotRestLifeTimeMins"] = -1 + if analysis and ad.get("JobStatus") == 2 and "LastMatchTime" in ad: + try: + result["PilotRestLifeTimeMins"] = int( + (ad["MATCH_GLIDEIN_ToDie"] - ad["EnteredCurrentStatus"]) / 60 + ) + except (KeyError, ValueError, TypeError): + result["PilotRestLifeTimeMins"] = -72 * 60 + + result["HasBeenTimingTuned"] = ad.get("HasBeenTimingTuned", False) + + if "RequestCpus" not in ad: + m = _cream_re.search(ad.get("CreamAttributes", "")) + m2 = _nordugrid_re.search(ad.get("NordugridRSL")) + if m: + try: + ad["RequestCpus"] = int(m.groups()[0]) + except: + pass + elif m2: + try: + ad["RequestCpus"] = int(m2.groups()[0]) + except: + pass + elif "xcount" in ad: + ad["RequestCpus"] = ad["xcount"] + ad.setdefault("RequestCpus", 1) + try: + ad["RequestCpus"] = int(ad.eval("RequestCpus")) + except: + ad["RequestCpus"] = 1.0 + result["RequestCpus"] = ad["RequestCpus"] + + result["CoreHr"] = ( + ad.get("RequestCpus", 1.0) * int(ad.get("RemoteWallClockTime", 0)) / 3600.0 + ) + result["CommittedCoreHr"] = ( + ad.get("RequestCpus", 1.0) * ad.get("CommittedTime", 0) / 3600.0 + ) + result["CommittedWallClockHr"] = ad.get("CommittedTime", 0) / 3600.0 + result["CpuTimeHr"] = ( + ad.get("RemoteSysCpu", 0) + ad.get("RemoteUserCpu", 0) + ) / 3600.0 + result["DiskUsageGB"] = ad.get("DiskUsage_RAW", 0) / 1000000.0 + result["MemoryMB"] = ad.get("ResidentSetSize_RAW", 0) / 1024.0 + result["DataLocations"] = make_list_from_string_field( + ad, "DESIRED_CMSDataLocations" + ) + result["DESIRED_Sites"] = make_list_from_string_field(ad, "DESIRED_Sites") + result["Original_DESIRED_Sites"] = make_list_from_string_field( + ad, "ExtDESIRED_Sites" + ) + result["DesiredSiteCount"] = len(result["DESIRED_Sites"]) + result["DataLocationsCount"] = len(result["DataLocations"]) + result["CRAB_TaskCreationDate"] = get_creation_time_from_taskname(ad) + + result["CMSPrimaryPrimaryDataset"] = "Unknown" + result["CMSPrimaryProcessedDataset"] = "Unknown" + result["CMSPrimaryDataTier"] = "Unknown" + if "DESIRED_CMSDataset" in result: + info = str(result["DESIRED_CMSDataset"]).split("/") + if len(info) > 3: + result["CMSPrimaryPrimaryDataset"] = info[1] + result["CMSPrimaryProcessedDataset"] = info[2] + result["CMSPrimaryDataTier"] = info[-1] + + if cms and analysis: + result["OutputFiles"] = ( + len(ad.get("CRAB_AdditionalOutputFiles", [])) + + len(ad.get("CRAB_TFileOutputFiles", [])) + + len(ad.get("CRAB_EDMOutputFiles", [])) + + ad.get("CRAB_SaveLogsFlag", 0) + ) + if "x509UserProxyFQAN" in ad: + result["x509UserProxyFQAN"] = str(ad["x509UserProxyFQAN"]).split(",") + if "x509UserProxyVOName" in ad: + result["VO"] = str(ad["x509UserProxyVOName"]) + if cms: + result["CMSGroups"] = make_list_from_string_field(ad, "CMSGroups") + result["Site"] = ad.get("MATCH_EXP_JOB_GLIDEIN_CMSSite", "UNKNOWN") + if result["Site"].endswith("_Disk"): + result["Site"] = result["Site"].strip("_Disk") + elif ("GlideinEntryName" in ad) and ("MATCH_EXP_JOBGLIDEIN_ResourceName" not in ad): + m = _generic_site.match(ad["GlideinEntryName"]) + m2 = _cms_site.match(ad["GlideinEntryName"]) + if m2: + result["Site"] = m2.groups()[0] + info = result["Site"].split("_", 2) + if len(info) == 3: + result["Tier"] = info[0] + result["Country"] = info[1] + else: + result["Tier"] = "Unknown" + result["Country"] = "Unknown" + elif m: + result["Site"] = m.groups()[0] + else: + result["Site"] = "UNKNOWN" + else: + result["Site"] = ad.get("MATCH_EXP_JOBGLIDEIN_ResourceName", "UNKNOWN") + if cms: + info = result["Site"].split("_", 2) + if len(info) == 3: + result["Tier"] = info[0] + result["Country"] = info[1] + else: + result["Tier"] = "Unknown" + result["Country"] = "Unknown" + if "Site" not in result or "DESIRED_Sites" not in result: + result["InputData"] = "Unknown" + elif ("DESIRED_CMSDataLocations" not in result) or ( + result["DESIRED_CMSDataLocations"] is None + ): # CRAB2 case. + result["InputData"] = "Onsite" + elif result["Site"] in result["DESIRED_CMSDataLocations"]: + result["InputData"] = "Onsite" + elif (result["Site"] != "UNKNOWN") and (ad.get("JobStatus") != 1): + result["InputData"] = "Offsite" + if analysis: + if result["Site"] not in result["DESIRED_Sites"]: + result["OverflowType"] = "FrontendOverflow" + else: + result["OverflowType"] = "IgnoreLocality" + else: + result["OverflowType"] = "Unified" + if result["WallClockHr"] == 0: + result["CpuEff"] = 0 + else: + result["CpuEff"] = ( + 100 + * result["CpuTimeHr"] + / result["WallClockHr"] + / float(ad.get("RequestCpus", 1.0)) + ) + result["Status"] = status.get(ad.get("JobStatus"), "Unknown") + result["Universe"] = universe.get(ad.get("JobUniverse"), "Unknown") + result["QueueHrs"] = ( + ad.get("JobCurrentStartDate", time.time()) - ad["QDate"] + ) / 3600.0 + result["Badput"] = max(result["CoreHr"] - result["CommittedCoreHr"], 0.0) + result["CpuBadput"] = max(result["CoreHr"] - result["CpuTimeHr"], 0.0) + + handle_chirp_info(ad, result) + + # Parse CRAB3 information on CMSSW version + result["CMSSWVersion"] = "Unknown" + result["CMSSWMajorVersion"] = "Unknown" + result["CMSSWReleaseSeries"] = "Unknown" + if "CRAB_JobSW" in result: + match = _cmssw_version.match(result["CRAB_JobSW"]) + if match: + result["CMSSWVersion"] = match.group(1) + subv, ssubv = int(match.group(2)), int(match.group(3)) + result["CMSSWMajorVersion"] = "%d_X_X" % subv + result["CMSSWReleaseSeries"] = "%d_%d_X" % (subv, ssubv) + + # Parse new machine statistics. + try: + cpus = float(result["GLIDEIN_Cpus"]) + result["BenchmarkJobHS06"] = float(ad["MachineAttrMJF_JOB_HS06_JOB0"]) / cpus + if result.get("EventRate", 0) > 0: + result["HS06EventRate"] = result["EventRate"] / result["BenchmarkJobHS06"] + if result.get("CpuEventRate", 0) > 0: + result["HS06CpuEventRate"] = ( + result["CpuEventRate"] / result["BenchmarkJobHS06"] + ) + if result.get("CpuTimePerEvent", 0) > 0: + result["HS06CpuTimePerEvent"] = ( + result["CpuTimePerEvent"] * result["BenchmarkJobHS06"] + ) + if result.get("TimePerEvent", 0) > 0: + result["HS06TimePerEvent"] = ( + result["TimePerEvent"] * result["BenchmarkJobHS06"] + ) + result["HS06CoreHr"] = result["CoreHr"] * result["BenchmarkJobHS06"] + result["HS06CommittedCoreHr"] = ( + result["CommittedCoreHr"] * result["BenchmarkJobHS06"] + ) + result["HS06CpuTimeHr"] = result["CpuTimeHr"] * result["BenchmarkJobHS06"] + except: + result.pop("MachineAttrMJF_JOB_HS06_JOB0", None) + if ("MachineAttrDIRACBenchmark0" in ad) and classad.ExprTree( + "MachineAttrDIRACBenchmark0 isnt undefined" + ).eval(ad): + result["BenchmarkJobDB12"] = float(ad["MachineAttrDIRACBenchmark0"]) + if result.get("EventRate", 0) > 0: + result["DB12EventRate"] = result["EventRate"] / result["BenchmarkJobDB12"] + if result.get("CpuEventRate", 0) > 0: + result["DB12CpuEventRate"] = ( + result["CpuEventRate"] / result["BenchmarkJobDB12"] + ) + if result.get("CpuTimePerEvent", 0) > 0: + result["DB12CpuTimePerEvent"] = ( + result["CpuTimePerEvent"] * result["BenchmarkJobDB12"] + ) + if result.get("TimePerEvent", 0) > 0: + result["DB12TimePerEvent"] = ( + result["TimePerEvent"] * result["BenchmarkJobDB12"] + ) + result["DB12CoreHr"] = result["CoreHr"] * result["BenchmarkJobDB12"] + result["DB12CommittedCoreHr"] = ( + result["CommittedCoreHr"] * result["BenchmarkJobDB12"] + ) + result["DB12CpuTimeHr"] = result["CpuTimeHr"] * result["BenchmarkJobDB12"] + + result["HasSingularity"] = classad.ExprTree( + "MachineAttrHAS_SINGULARITY0 is true" + ).eval(ad) + if "ChirpCMSSWCPUModels" in ad and not isinstance( + ad["ChirpCMSSWCPUModels"], classad.ExprTree + ): + result["CPUModel"] = str(ad["ChirpCMSSWCPUModels"]) + result["CPUModelName"] = str(ad["ChirpCMSSWCPUModels"]) + result["Processor"] = str(ad["ChirpCMSSWCPUModels"]) + elif "MachineAttrCPUModel0" in ad: + result["CPUModel"] = str(ad["MachineAttrCPUModel0"]) + result["CPUModelName"] = str(ad["MachineAttrCPUModel0"]) + result["Processor"] = str(ad["MachineAttrCPUModel0"]) + + # Affiliation data: + if aff_mgr: + _aff = None + if "CRAB_UserHN" in result: + _aff = aff_mgr.getAffiliation(login=result["CRAB_UserHN"]) + elif "x509userproxysubject" in result: + _aff = aff_mgr.getAffiliation(dn=result["x509userproxysubject"]) + + if _aff is not None: + result["AffiliationInstitute"] = _aff["institute"] + result["AffiliationCountry"] = _aff["country"] + + # We will use the CRAB_PostJobStatus as the actual status. + # If is an analysis task and is not completed + # (or removed if the postjob status is not "NOT RUN"), + # its status is defined by Status, else it will be defined by + # CRAB_PostJobStatus. + # + # We will use the postjob_status_decode dict to decode + # the status. If there is an unknown value it will set to it. + # Note: if the completed task has not a committed time + # or completion date, we will set it using RemoteWallClockTime + # and EnteredCurrentStatus. + _pjst = result.get("CRAB_PostJobStatus", None) + # Sometimes there are some inconsistences in the value from CRAB + # to avoid this we can remove the spaces and make it uppercase. + _pjst = _pjst.strip().upper() if _pjst else None + _status = result["Status"] + if _pjst and ( + (_status == "Removed" and _pjst != "NOT RUN") or (_status == "Completed") + ): + result["CRAB_PostJobStatus"] = postjob_status_decode.get(_pjst, _pjst) + if "CompletionDate" not in result: + result["CompletionDate"] = result.get("EnteredCurrentStatus") + if "CommittedTime" not in result or result.get("CommittedTime", 0) == 0: + result["CommittedTime"] = result.get("RemoteWallClockTime") + elif "CRAB_Id" in result: # If is an analysis or HC test task. + result["CRAB_PostJobStatus"] = _status + # Normalize wmtool value, this is a temporary change + # Not to be merged (The value was fixed upstream, + # this change is only needed while old tasks + # are still being processed + _wmtool = result.get( + "CMS_WMTool", + "UNKNOWN" + if result.get("CMS_SubmissionTool") != "InstitutionalSchedd" + else "User", + ) + result["CMS_WMTool"] = "User" if _wmtool.lower() == "user" else _wmtool + + if reduce_data: + result = drop_fields_for_running_jobs(result) + + # Set outliers + result = set_outliers(result) + + if return_dict: + return result + else: + return json.dumps(result) + + +def set_outliers(result): + """Filter and set appropriate flags for outliers""" + if ("CpuEff" in result) and (result["CpuEff"] >= 100.0): + result["CpuEffOutlier"] = 1 + else: + result["CpuEffOutlier"] = 0 + return result + + +def recordTime(ad): + """ + RecordTime falls back to launch time as last-resort and for jobs in the queue + + For Completed/Removed/Error jobs, try to update it: + - to CompletionDate if present (only available for completed jobs) + - else to JobFinishedHookDone if present (available for all completed and removed jobs) + - else fall back to launch time + """ + if ad["JobStatus"] in [3, 4, 6]: + if ad.get("CompletionDate", 0): + return ad["CompletionDate"] + + elif ad.get("JobFinishedHookDone", 0): + return ad["JobFinishedHookDone"] + + return _launch_time + + +def guessTaskType(ad): + """Guess the TaskType from the WMAgent subtask name""" + jobType = ad.get("CMS_JobType", "UNKNOWN") + + if jobType == "Processing": + return "DataProcessing" + elif jobType == "Production": + ttype = ad.get("WMAgent_SubTaskName", "/UNKNOWN").rsplit("/", 1)[-1] + # Guess an alternate campaign name from the subtask + camp2_info = ttype.split("-") + if len(camp2_info) > 1: + camp2 = camp2_info[1] + else: + camp2 = ttype + + if "CleanupUnmerged" in ttype: + return "Cleanup" + elif "Merge" in ttype: + return "Merge" + elif "LogCollect" in ttype: + return "LogCollect" + elif ("MiniAOD" in ad.get("WMAgent_RequestName", "UNKNOWN")) and ( + ttype == "StepOneProc" + ): + return "MINIAOD" + elif "MiniAOD" in ttype: + return "MINIAOD" + elif ttype == "StepOneProc" and ( + re.search("[1-9][0-9]DR", camp2) + ): + return "DIGIRECO" + elif ( + re.search("[1-9][0-9]GS", camp2) + ) and ttype.endswith("_0"): + return "GENSIM" + elif ttype.endswith("_0"): + return "DIGI" + elif ttype.endswith("_1") or ttype.lower() == "reco": + return "RECO" + elif ttype == "MonteCarloFromGEN": + return "GENSIM" + else: + return "UNKNOWN" + else: + return jobType + + +def guessCampaign(ad, analysis, cms_campaign_type): + # Guess the campaign from the request name. + camp = ad.get("WMAgent_RequestName", "UNKNOWN") + if ad.get("CMS_CampaignName"): + return ad.get("CMS_CampaignName") + if analysis: + return "crab_" + ad.get("CRAB_UserHN", "UNKNOWN") + if camp.startswith("PromptReco"): + return "PromptReco" + if camp.startswith("Repack"): + return "Repack" + if camp.startswith("Express"): + return "Express" + if "RVCMSSW" in camp: + return "RelVal" + m = _camp_re.match(camp) + if m: + return m.groups()[0] + m = _rereco_re.match(camp) + if m and ("DataProcessing" in ad.get("WMAgent_SubTaskName", "")): + return m.groups()[0] + "Reprocessing" + # [Temp solution] If Campaign not found, return CMS_CampaignType + logging.info("Campaign will be CMS_CampaignType. camp:{}".format(camp)) + return cms_campaign_type + + +def guess_campaign_type(ad, analysis): + """ + Based on the request name return a campaign type. + The campaign type is based on the classification defined at + https://its.cern.ch/jira/browse/CMSMONIT-174#comment-3050384 + """ + camp = ad.get("WMAgent_RequestName", "UNKNOWN") + if analysis: + return "Analysis" + elif re.match(r".*(RunII(Summer|Spring)([12])[0-9]UL|_UL[0-9]+).*", camp): + return "MC Ultralegacy" + elif re.match(r".*UltraLegacy.*", camp): + return "Data Ultralegacy" + elif re.match(r".*Phase2.*", camp): + return "Phase2 requests" + elif re.match(r".*(Run3|RunIII).*", camp): + return "Run3 requests" + elif "RVCMSSW" in camp: + return "RelVal" + elif re.match(r".*(RunII|(Summer|Fall|Autumn|Winter|Spring)(1[5-9]|20)).*", camp): # [!] Should be after UL + return "Run2 requests" + elif "SnowmassWinter21" in camp: + # Example WMAgent_RequestName: pdmvserv_task_TSG-SnowmassWinter21wmLHEGEN-00229__v1_T_211208_125036_3179 + return "SnowmassWinter21" + else: + return "UNKNOWN" + + +def guessWorkflow(ad, analysis): + prep = ad.get("WMAgent_RequestName", "UNKNOWN") + m = _prep_re.match(prep) + if analysis: + return ad.get("CRAB_Workflow", "UNKNOWN").split(":", 1)[-1] + elif m: + return m.groups()[0] + else: + m = _prep_prompt_re.match(prep) + if m: + return m.groups()[0] + "_" + m.groups()[1] + else: + m = _rval_re.match(prep) + if m: + return m.groups()[0] + + return prep + + +def chirpCMSSWIOSiteName(key): + """Extract site name from ChirpCMSS_IOSite key""" + iosite_match = re.match(r"ChirpCMSSW(.*?)IOSite_(.*)_(ReadBytes|ReadTimeMS)", key) + return iosite_match.group(2), iosite_match.group(1).strip("_") + + +def jobFailed(ad): + """ + Returns 0 when none of the exitcode fields has a non-zero value + otherwise returns 1 + """ + if commonExitCode(ad) == 0: + return 0 + else: + return 1 + +def isAnalysisJob(ad): + """ + Check if this is an analysis job, based on + the CMS_Type/CMS_JobType classads in the job. + """ + if ad.get("CMS_Type", "unknown").lower() == "analysis" \ + or ad.get("CMS_JobType", "unknown") == "Analysis": + return True + + return False + + +def commonExitCode(ad): + """ + Consolidate the exit code values of JobExitCode, + the chirped CRAB and WMCore values, and + the original condor exit code, according to + the workflow type: production or analysis. + """ + # If the raw ExitCode in the ad is not present, + # the job was removed and its executable did not finish, + # hence we return 50666 for any workflow type. + if not "ExitCode" in ad: + return 50666 + + condorExitCode = ad.get("ExitCode") + + if isAnalysisJob(ad): # CRAB or CMS Connect job + return ad.get( + "JobExitCode", + ad.get("Chirp_CRAB3_Job_ExitCode", condorExitCode) + ) + else: # production job + # If cmsRun exit code exists and was not 0, we consider the job failed + # even if the wrapper reported a sucess status in condor. + # Also, if cmsRunExitCode was 0, but not wrapper exit code, + # we stick with the wrapper exit code and consider the job failed. + if ad.get("Chirp_WMCore_cmsRun_ExitCode", 0) > 0: + return ad["Chirp_WMCore_cmsRun_ExitCode"] + return condorExitCode + +def errorType(ad): + """ + Categorization of exit codes into a handful of readable error types. + + Allowed values are: + 'Success', 'Environment' 'Executable', 'Stageout', 'Publication', + 'JobWrapper', 'FileOpen', 'FileRead', 'OutOfBounds', 'Other' + + This currently only works for CRAB jobs. Production jobs will always + fall into 'Other' as they don't have the Chirp_CRAB3_Job_ExitCode + """ + if not jobFailed(ad): + return "Success" + + exitcode = commonExitCode(ad) + + if (10000 <= exitcode <= 19999) or exitcode == 50513: + return "Environment" + + if 60000 <= exitcode <= 69999: + if exitcode >= 69000: # Not yet in classads? + return "Publication" + else: + return "StageOut" + + if 80000 <= exitcode <= 89999: + return "JobWrapper" + + if exitcode in [8020, 8028]: + return "FileOpen" + + if exitcode == 8021: + return "FileRead" + + if exitcode in [8030, 8031, 8032, 9000] or ( + 50660 <= exitcode <= 50669 + ): + return "OutOfBounds" + + if (7000 <= exitcode <= 9000) or exitcode == 139: + return "Executable" + + return "Other" + + +def errorClass(result): + """ + Further classify error types into even broader failure classes + """ + if result["ErrorType"] in [ + "Environment", + "Publication", + "StageOut", + "AsyncStageOut", + ]: + return "System" + + elif result["ErrorType"] in ["FileOpen", "FileRead"]: + return "DataAccess" + + elif result["ErrorType"] in ["JobWrapper", "OutOfBounds", "Executable"]: + return "Application" + + elif result["JobFailed"]: + return "Other" + + return "Success" + + +def handle_chirp_info(ad, result): + """ + Process any data present from the Chirp ads. + + Chirp statistics should be available in CMSSW_8_0_0 and later. + """ + for key, val in list(result.items()): + if key.startswith("ChirpCMSSW") and "IOSite" in key: + sitename, chirpstring = chirpCMSSWIOSiteName(key) + keybase = key.rsplit("_", 1)[0] + try: + readbytes = result.pop(keybase + "_ReadBytes") + readtimems = result.pop(keybase + "_ReadTimeMS") + siteio = {} + siteio["SiteName"] = sitename + siteio["ChirpString"] = chirpstring + siteio["ReadBytes"] = readbytes + siteio["ReadTimeMS"] = readtimems + result.setdefault("ChirpCMSSW_SiteIO", []).append(siteio) + + except KeyError: + # First hit will pop both ReadBytes and ReadTimeMS fields hence + # second hit will throw a KeyError that we want to ignore + pass + + continue + + if key.startswith("ChirpCMSSW_"): + cmssw_key = "ChirpCMSSW" + key.split("_", 2)[-1] + if cmssw_key not in result: + result[cmssw_key] = val + elif ( + cmssw_key.endswith("LastUpdate") + or cmssw_key.endswith("Events") + or cmssw_key.endswith("MaxLumis") + or cmssw_key.endswith("MaxFiles") + ): + result[cmssw_key] = max(result[cmssw_key], val) + else: + result[cmssw_key] += val + + if "ChirpCMSSWFiles" in result: + result["CompletedFiles"] = result["ChirpCMSSWFiles"] + if result.get("ChirpCMSSWMaxFiles", -1) > 0: + result["MaxFiles"] = result["ChirpCMSSWMaxFiles"] + if "ChirpCMSSWDone" in result: + result["CMSSWDone"] = bool(result["ChirpCMSSWDone"]) + result["ChirpCMSSWDone"] = int(result["ChirpCMSSWDone"]) + if "ChirpCMSSWElapsed" in result: + result["CMSSWWallHrs"] = result["ChirpCMSSWElapsed"] / 3600.0 + if "ChirpCMSSWEvents" in result: + result["KEvents"] = result["ChirpCMSSWEvents"] / 1000.0 + result["MegaEvents"] = result["ChirpCMSSWEvents"] / 1e6 + if "ChirpCMSSWLastUpdate" in result: + # Report time since last update - this is likely stageout time for completed jobs + result["SinceLastCMSSWUpdateHrs"] = ( + max(result["RecordTime"] - result["ChirpCMSSWLastUpdate"], 0) / 3600.0 + ) + if result["Status"] == "Completed": + result["StageOutHrs"] = result["SinceLastCMSSWUpdateHrs"] + if "ChirpCMSSWLumis" in result: + result["CMSSWKLumis"] = result["ChirpCMSSWLumis"] / 1000.0 + if "ChirpCMSSWReadBytes" in result: + result["InputGB"] = result["ChirpCMSSWReadBytes"] / 1e9 + if "ChirpCMSSWReadTimeMsecs" in result: + result["ReadTimeHrs"] = result["ChirpCMSSWReadTimeMsecs"] / 3600000.0 + result["ReadTimeMins"] = result["ChirpCMSSWReadTimeMsecs"] / 60000.0 + if "ChirpCMSSWWriteBytes" in result: + result["OutputGB"] = result["ChirpCMSSWWriteBytes"] / 1e9 + if "ChirpCMSSWWriteTimeMsecs" in result: + result["WriteTimeHrs"] = result["ChirpCMSSWWriteTimeMsecs"] / 3600000.0 + result["WriteTimeMins"] = result["ChirpCMSSWWriteTimeMsecs"] / 60000.0 + if result.get("CMSSWDone") and (result.get("ChirpCMSSWElapsed", 0) > 0): + result["CMSSWEventRate"] = result.get("ChirpCMSSWEvents", 0) / float( + result["ChirpCMSSWElapsed"] * ad.get("RequestCpus", 1.0) + ) + if result["CMSSWEventRate"] > 0: + result["CMSSWTimePerEvent"] = 1.0 / result["CMSSWEventRate"] + if result["CoreHr"] > 0: + result["EventRate"] = result.get("ChirpCMSSWEvents", 0) / float( + result["CoreHr"] * 3600.0 + ) + if result["EventRate"] > 0: + result["TimePerEvent"] = 1.0 / result["EventRate"] + if ("ChirpCMSSWReadOps" in result) and ("ChirpCMSSWReadSegments" in result): + ops = result["ChirpCMSSWReadSegments"] + result["ChirpCMSSWReadOps"] + if ops: + result["ReadOpSegmentPercent"] = ( + result["ChirpCMSSWReadOps"] / float(ops) * 100 + ) + if ("ChirpCMSSWReadOps" in result) and ("ChirpCMSSWReadVOps" in result): + ops = result["ChirpCMSSWReadOps"] + result["ChirpCMSSWReadVOps"] + if ops: + result["ReadOpsPercent"] = result["ChirpCMSSWReadOps"] / float(ops) * 100 + + +_CONVERT_COUNT = 0 +_CONVERT_CPU = 0 + + +def bulk_convert_ad_data(ad, result): + """ + Given a ClassAd, bulk convert to a python dictionary. + """ + _keys = set(ad.keys()) - ignore + for key in _keys: + if key.startswith("HasBeen") and key not in bool_vals: + continue + if key == "DESIRED_SITES": + key = "DESIRED_Sites" + try: + value = ad.eval(key) + except: + continue + if isinstance(value, classad.Value): + _is_err_cond = value is classad.Value.Error + if _is_err_cond: + continue + else: + value = None + elif key in bool_vals: + value = bool(value) + elif key in int_vals: + try: + value = int(value) + except ValueError: + if value == "Unknown": + value = None + elif (key == "MATCH_EXP_JOB_GLIDEIN_MaxMemMBs") and (value == "GLIDEIN_MaxMemMBs"): + # FIXME after SI/WMA/CRAB teams solve this upstream. This key should be convertible to int + continue + else: + logging.warning( + "Failed to convert key %s with value %s to int" + % (key, repr(value)) + ) + continue + elif key in string_vals: + value = str(value) + elif key in date_vals: + if value == 0 or (isinstance(value, str) and value.lower() == "unknown"): + value = None + else: + try: + value = int(value) + except ValueError: + logging.warning( + "Failed to convert key %s with value %s to int for a date field" + % (key, repr(value)) + ) + value = None + # elif key in date_vals: + # value = datetime.datetime.fromtimestamp(value).strftime("%Y-%m-%d %H:%M:%S") + if key.startswith("MATCH_EXP_JOB_"): + key = key[len("MATCH_EXP_JOB_"):] + if key.endswith("_RAW"): + key = key[: -len("_RAW")] + if _wmcore_exe_exmsg.match(key): + value = str(decode_and_decompress(value)) + result[key] = value + evaluate_fields(result, ad) + + +def evaluate_fields(result, ad): + """Evaluates RequestMemory expression in ClassAd""" + if "RequestMemory" in ad: + try: + result["RequestMemory_Eval"] = ad.eval("RequestMemory") + except Exception as eval_exc: + logging.error("Could not evaluate RequestMemory exp, error: %s" % (str(eval_exc))) + + +def decode_and_decompress(value): + try: + value = str(zlib.decompress(base64.b64decode(value))) + except (TypeError, zlib.error): + logging.warning("Failed to decode and decompress value: %s" % (repr(value))) + + return value + + +def convert_dates_to_millisecs(record): + for date_field in date_vals: + try: + record[date_field] *= 1000 + except (KeyError, TypeError): + continue + + return record + + +def drop_fields_for_running_jobs(record): + """ + Check if the job is running or pending + and prune it if it is. + """ + if "Status" in record and record["Status"] not in ["Running", "Idle", "Held"]: + return record + _fields = running_fields.intersection(set(record.keys())) + skimmed_record = {field: record[field] for field in _fields} + return skimmed_record + + +def unique_doc_id(doc): + """ + Return a string of format "#" + To uniquely identify documents (not jobs) + + Note that this uniqueness breaks if the same jobs are submitted + with the same RecordTime + """ + return "%s#%d" % (doc["GlobalJobId"], doc["RecordTime"]) + + +def get_formatted_CRAB_Id(CRAB_Id): + # FormattedCrabId + # In this field, we want to format the crab_id (if exists) + # to ensure that the lexicographical order is the desired. + # Currently, there are two CRAB_Id formats: + # a positive integer or an integer tuple formated as X-N + # + # The desired order is start with the 0-N then the integer values then the + # 1-N...X-N + # To do that we will use leading zeros to ensure the lexicographical order, + # e.g: + # 0.000001,0.000002 ...0.000012, 000001,000002,....009999,010000, + # 1.000001....3,009999. + # Falls back to '000000' + # args: CRAB_Id + _cid = CRAB_Id + formatted = "000000" + try: + if "-" in _cid: + formatted = "{}.{:06d}".format(*[int(x) for x in _cid.split("-")]) + elif _cid.isdigit(): + formatted = "{:06d}".format(int(_cid)) + except TypeError: + pass + return formatted diff --git a/docker/spider-query-cronjob/src/history.py b/docker/spider-query-cronjob/src/history.py new file mode 100755 index 00000000..657052ed --- /dev/null +++ b/docker/spider-query-cronjob/src/history.py @@ -0,0 +1,302 @@ +""" +Methods for processing the history in a schedd queue. +""" + +import datetime +import json +import logging +import multiprocessing +import os +import time +import traceback + +import classad +import htcondor + +from utils import send_email_alert, time_remaining, TIMEOUT_MINS +from convert_to_json import convert_to_json, unique_doc_id +from nats import get_nats_connection, publish_job_to_nats + +# Main query time, should be same with cron schedule. +QUERY_TIME_PERIOD = 720 # 12 minutes + +# Even in checkpoint.json last query time is older than this, older than "now()-12h" results will be ignored. +CRAB_MAX_QUERY_TIME_SPAN = 12 * 3600 # 12 hours + +# If last query time in checkpoint.json is too old, but not from crab, results older +# than "now()-RETENTION_POLICY" will be ignored. +RETENTION_POLICY = 39 * 24 * 3600 # 39 days + +_WORKDIR = os.getenv("SPIDER_WORKDIR", "/opt/spider") +_CHECKPOINT_JSON = os.path.join(_WORKDIR, "checkpoint.json") + + +def process_schedd( + starttime, last_completion, checkpoint_queue, schedd_ad, args, metadata=None +): + """ + Given a schedd, process its entire set of history since last checkpoint. + """ + my_start = time.time() + pool_name = schedd_ad.get("CMS_Pool", "Unknown") + if time_remaining(starttime) < 10: + message = ( + "No time remaining to process %s history; exiting." % schedd_ad["Name"] + ) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms history timeout warning", message + ) + return last_completion + + metadata = metadata or {} + schedd = htcondor.Schedd(schedd_ad) + _q = """ + (JobUniverse == 5) && (CMS_Type != "DONOTMONIT") + && + ( + EnteredCurrentStatus >= %(last_completion)d + || CRAB_PostJobLastUpdate >= %(last_completion)d + ) + """ + history_query = classad.ExprTree(_q % {"last_completion": last_completion - QUERY_TIME_PERIOD}) + logging.info( + "Querying %s for history: %s. " "%.1f minutes of ads", + schedd_ad["Name"], + history_query, + (time.time() - last_completion) / 60.0, + ) + count = 0 + published_count = 0 + sent_warnings = False + timed_out = False + error = False + latest_completion = last_completion + + # Get NATS JetStream connection + try: + _, jetstream = get_nats_connection(args) + # Use a different subject for history if specified + history_subject = getattr(args, 'nats_history_subject', None) or os.getenv('NATS_HISTORY_SUBJECT', 'cms.htcondor.history.job') + except Exception as e: + logging.error("Failed to get NATS connection: %s", str(e)) + error = True + jetstream = None + history_subject = None + + try: + if not args.dry_run: + history_iter = schedd.history(history_query, [], match=-1) + else: + history_iter = [] + except RuntimeError: + message = "Failed to query schedd for job history: %s" % schedd_ad["Name"] + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.error(message) + error = True + history_iter = [] + + # Process each job in history and publish to NATS + if not error and jetstream is not None: + try: + for job_ad in history_iter: + if time_remaining(starttime) < 10: + message = ( + "History crawler on %s has been running for " + "more than %d minutes; exiting" + % (schedd_ad["Name"], TIMEOUT_MINS) + ) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms history timeout warning", message + ) + timed_out = True + break + + dict_ad = None + try: + dict_ad = convert_to_json( + job_ad, + return_dict=True, + reduce_data=not args.keep_full_queue_data, + pool_name=pool_name, + ) + except Exception as e: + message = "Failure when converting document on %s history: %s" % ( + schedd_ad["Name"], + str(e), + ) + logging.warning(message) + if not sent_warnings: + send_email_alert( + args.email_alerts, + "spider_cms history document conversion error", + message, + ) + sent_warnings = True + continue + + if not dict_ad: + continue + + job_id = unique_doc_id(dict_ad) + count += 1 + + # Update latest completion time based on job's completion date + job_completion = dict_ad.get("CompletionDate") or dict_ad.get("EnteredCurrentStatus") or dict_ad.get("RecordTime") + if job_completion and job_completion > latest_completion: + latest_completion = job_completion + + # Publish each job to NATS JetStream + if not args.dry_run: + if publish_job_to_nats(jetstream, history_subject, job_id, dict_ad, args): + published_count += 1 + else: + logging.debug("DRY RUN: Would publish history job %s", job_id) + + except Exception as e: + message = "Failure when processing schedd history query on %s: %s" % ( + schedd_ad["Name"], + str(e), + ) + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms schedd history query error", message + ) + error = True + + # If we got to this point without a timeout and processed jobs, update the checkpoint + if not timed_out and not error and count > 0: + checkpoint_queue.put((schedd_ad["Name"], latest_completion)) + + total_time = (time.time() - my_start) / 60.0 + last_formatted = datetime.datetime.fromtimestamp(last_completion).strftime( + "%Y-%m-%d %H:%M:%S" + ) + logging.warning( + "Schedd %-25s history: queried %5d jobs, published %5d jobs; last completion %s; " + "query time %.2f min", + schedd_ad["Name"], + count, + published_count, + last_formatted, + total_time, + ) + + return latest_completion + + +def update_checkpoint(name, completion_date): + try: + with open(_CHECKPOINT_JSON, "r") as fd: + checkpoint = json.load(fd) + except Exception as e: + logging.warning("!!! checkpoint.json is not found or not readable. " + "It will be created and fresh results will be written. " + str(e)) + checkpoint = {} + + checkpoint[name] = completion_date + + with open(_CHECKPOINT_JSON, "w") as fd: + json.dump(checkpoint, fd) + + +def process_histories(schedd_ads, starttime, pool, args, metadata=None): + """ + Process history files for each schedd listed in a given + multiprocessing pool + """ + try: + checkpoint = json.load(open(_CHECKPOINT_JSON)) + except Exception as e: + # Exception should be general + logging.warning("!!! checkpoint.json is not found or not readable. Empty dict will be used. " + str(e)) + checkpoint = {} + + futures = [] + metadata = metadata or {} + metadata["spider_source"] = "condor_history" + + manager = multiprocessing.Manager() + checkpoint_queue = manager.Queue() + + for schedd_ad in schedd_ads: + name = schedd_ad["Name"] + + # Check for last completion time + # If there was no previous completion, get last 12 h + history_query_max_n_minutes = args.history_query_max_n_minutes # Default 12 * 60 + last_completion = checkpoint.get(name, time.time() - history_query_max_n_minutes * 60) + last_completion = max(last_completion, time.time() - RETENTION_POLICY) + + # For CRAB, only ever get a maximum of 12 h + if name.startswith("crab") and last_completion < time.time() - CRAB_MAX_QUERY_TIME_SPAN: + last_completion = time.time() - history_query_max_n_minutes * 60 + + future = pool.apply_async( + process_schedd, + (starttime, last_completion, checkpoint_queue, schedd_ad, args, metadata), + ) + futures.append((name, future)) + + def _chkp_updater(): + while True: + try: + job = checkpoint_queue.get() + if job is None: # Swallow poison pill + break + except EOFError as error: + logging.warning( + "EOFError - Nothing to consume left in the queue %s", error + ) + break + update_checkpoint(*job) + + chkp_updater = multiprocessing.Process(target=_chkp_updater) + chkp_updater.start() + + # Check whether one of the processes timed out and reset their last + # completion checkpoint in case + timed_out = False + for name, future in futures: + if time_remaining(starttime, positive=False) > -20: + try: + future.get(time_remaining(starttime) + 10) + except multiprocessing.TimeoutError: + # This implies that the checkpoint hasn't been updated + message = "Schedd %s history timed out; ignoring progress." % name + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms history timeout warning", message + ) + except Exception as e: + # Catch any other exceptions from process_schedd + message = ( + "Error while processing history data of %s: %s" + % (name, str(e)) + ) + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.error(message) + send_email_alert( + args.email_alerts, + "spider_cms history processing error warning", + message, + ) + else: + timed_out = True + break + if timed_out: + pool.terminate() + + checkpoint_queue.put(None) # Send a poison pill + chkp_updater.join() + + logging.warning( + "Processing time for history: %.2f mins", ((time.time() - starttime) / 60.0) + ) diff --git a/docker/spider-query-cronjob/src/nats.py b/docker/spider-query-cronjob/src/nats.py new file mode 100644 index 00000000..391ac07d --- /dev/null +++ b/docker/spider-query-cronjob/src/nats.py @@ -0,0 +1,87 @@ +""" +NATS JetStream connection and publishing utilities. +""" + +import os +import json +import logging +import asyncio + +import nats + +# Global NATS JetStream connection +_nats_connection = None +_nats_jetstream = None + + +def get_nats_connection(args): + """ + Get or create a NATS JetStream connection. + """ + global _nats_connection, _nats_jetstream + + if _nats_connection is None or _nats_connection.is_closed: + nats_servers = getattr(args, 'nats_servers', None) or os.getenv('NATS_SERVERS', 'nats://localhost:4222') + nats_servers = [s.strip() for s in nats_servers.split(',')] + + try: + # Get or create event loop + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + raise RuntimeError("Event loop is closed") + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + _nats_connection = loop.run_until_complete(nats.connect(servers=nats_servers)) + _nats_jetstream = _nats_connection.jetstream() + + # Ensure the stream exists + stream_name = getattr(args, 'nats_stream_name', None) or os.getenv('NATS_STREAM_NAME', 'CMS_HTCONDOR_QUEUE') + subject = getattr(args, 'nats_subject', None) or os.getenv('NATS_SUBJECT', 'cms.htcondor.queue.job') + + try: + loop.run_until_complete(_nats_jetstream.stream_info(stream_name)) + except Exception: + # Stream doesn't exist, create it + logging.info("Creating NATS JetStream stream: %s", stream_name) + loop.run_until_complete(_nats_jetstream.add_stream( + name=stream_name, + subjects=[subject] + )) + + logging.info("Connected to NATS JetStream at %s", nats_servers) + except Exception as e: + logging.error("Failed to connect to NATS JetStream: %s", str(e)) + raise + + return _nats_connection, _nats_jetstream + + +def publish_job_to_nats(jetstream, subject, job_id, job_doc, args): + """ + Publish a single job to NATS JetStream. + """ + try: + message_data = { + "id": job_id, + "doc": job_doc + } + message_json = json.dumps(message_data).encode('utf-8') + + # Get or create event loop + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + raise RuntimeError("Event loop is closed") + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete(jetstream.publish(subject, message_json)) + return True + except Exception as e: + logging.error("Failed to publish job %s to NATS: %s", job_id, str(e)) + return False + diff --git a/docker/spider-query-cronjob/src/queues.py b/docker/spider-query-cronjob/src/queues.py new file mode 100755 index 00000000..75b7f9b0 --- /dev/null +++ b/docker/spider-query-cronjob/src/queues.py @@ -0,0 +1,165 @@ +""" +Query the jobs in queue for given set of schedds and publish to NATS JetStream. +""" + +import os +import time +import logging +import resource +import traceback + +import classad +import htcondor + +from utils import send_email_alert, time_remaining, TIMEOUT_MINS +from convert_to_json import convert_to_json, unique_doc_id +from nats import get_nats_connection, publish_job_to_nats + + +def query_schedd_queue(starttime, schedd_ad, args): + """ + Query a schedd for jobs and publish each job to NATS JetStream. + """ + my_start = time.time() + pool_name = schedd_ad.get("CMS_Pool", "Unknown") + logging.info("Querying %s queue for jobs.", schedd_ad["Name"]) + + if time_remaining(starttime) < 10: + message = ( + "No time remaining to run queue crawler on %s; " + "exiting." % schedd_ad["Name"] + ) + logging.error(message) + send_email_alert(args.email_alerts, "spider_cms queue timeout warning", message) + return 0 + + # Get NATS JetStream connection + try: + nats_connection, jetstream = get_nats_connection(args) + subject = getattr(args, 'nats_subject', None) or os.getenv('NATS_SUBJECT', 'cms.htcondor.queue.job') + except Exception as e: + logging.error("Failed to get NATS connection: %s", str(e)) + return 0 + + count_since_last_report = 0 + count = 0 + published_count = 0 + cpu_usage = resource.getrusage(resource.RUSAGE_SELF).ru_utime + sent_warnings = False + + schedd = htcondor.Schedd(schedd_ad) + # Query for a snapshot of the jobs running/idle/held, + # but only the completed that had changed in the last period of time. + _completed_since = starttime - (TIMEOUT_MINS + 1) * 60 + query = """ + (JobUniverse == 5) && (CMS_Type != "DONOTMONIT") + && + ( + JobStatus < 3 || JobStatus > 4 + || EnteredCurrentStatus >= %(completed_since)d + || CRAB_PostJobLastUpdate >= %(completed_since)d + ) + """ % { + "completed_since": _completed_since + } + + try: + query_iter = schedd.xquery(constraint=query) if not args.dry_run else [] + for job_ad in query_iter: + if time_remaining(starttime) < 10: + message = ( + "Queue crawler on %s has been running for " + "more than %d minutes; exiting" + % (schedd_ad["Name"], TIMEOUT_MINS) + ) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms queue timeout warning", message + ) + break + + dict_ad = None + try: + dict_ad = convert_to_json( + job_ad, + return_dict=True, + reduce_data=not args.keep_full_queue_data, + pool_name=pool_name, + ) + except Exception as e: + message = "Failure when converting document on %s queue: %s" % ( + schedd_ad["Name"], + str(e), + ) + logging.warning(message) + if not sent_warnings: + send_email_alert( + args.email_alerts, + "spider_cms queue document conversion error", + message, + ) + sent_warnings = True + continue + + if not dict_ad: + continue + + job_id = unique_doc_id(dict_ad) + count += 1 + count_since_last_report += 1 + + # Publish each job to NATS JetStream + if not args.dry_run: + if publish_job_to_nats(jetstream, subject, job_id, dict_ad, args): + published_count += 1 + else: + logging.debug("DRY RUN: Would publish job %s", job_id) + + # Report progress periodically + if count_since_last_report >= 1000: + cpu_usage_now = resource.getrusage(resource.RUSAGE_SELF).ru_utime + cpu_usage = cpu_usage_now - cpu_usage + processing_rate = count_since_last_report / cpu_usage if cpu_usage > 0 else 0 + cpu_usage = cpu_usage_now + logging.info( + "Query for %s: processed %d jobs, published %d jobs " + "(%.1f jobs per CPU-second)", + schedd_ad["Name"], + count, + published_count, + processing_rate, + ) + count_since_last_report = 0 + + if args.max_documents_to_process and count > args.max_documents_to_process: + logging.warning( + "Aborting after %d documents (--max_documents_to_process option)" + % args.max_documents_to_process + ) + break + + except RuntimeError as e: + logging.error( + "Failed to query schedd %s for jobs: %s", schedd_ad["Name"], str(e) + ) + except Exception as e: + message = "Failure when querying schedd queue on %s: %s" % ( + schedd_ad["Name"], + str(e), + ) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms schedd queue query error", message + ) + traceback.print_exc() + + total_time = (time.time() - my_start) / 60.0 + logging.warning( + "Schedd %-25s queue: queried %5d jobs, published %5d jobs; query time %.2f min", + schedd_ad["Name"], + count, + published_count, + total_time, + ) + + return count diff --git a/docker/spider-query-cronjob/src/spider_cms.py b/docker/spider-query-cronjob/src/spider_cms.py new file mode 100644 index 00000000..c9435f97 --- /dev/null +++ b/docker/spider-query-cronjob/src/spider_cms.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python +""" +Script for processing the contents of the CMS pool. +""" + +import os +import sys +import time +import signal +import logging +import argparse +import multiprocessing + +from utils import ( + get_schedds, + get_schedds_from_file, + set_up_logging, + collect_metadata, + TIMEOUT_MINS +) + +from history import process_histories +from queues import query_schedd_queue + + +def main_driver(args): + """ + Driver method for the spider script. + """ + starttime = time.time() + + signal.alarm(TIMEOUT_MINS * 60 + 60) + + # Get all the schedd ads + if args.collectors_file: + schedd_ads = get_schedds_from_file(args, collectors_file=args.collectors_file) + del args.collectors_file # sending a file through postprocessing will cause problems. + else: + schedd_ads = get_schedds(args, collectors=args.collectors) + logging.warning("&&& There are %d schedds to query.", len(schedd_ads)) + + pool = multiprocessing.Pool(processes=args.query_pool_size) + + metadata = collect_metadata() + + if not args.skip_history: + process_histories( + schedd_ads=schedd_ads, + starttime=starttime, + pool=pool, + args=args, + metadata=metadata, + ) + + # Now that we have the fresh history, process the queues themselves. + if args.process_queue: + query_schedd_queue( + schedd_ads=schedd_ads, + starttime=starttime, + pool=pool, + args=args, + metadata=metadata, + ) + + pool.close() + pool.join() + + logging.warning( + "@@@ Total processing time: %.2f mins", ((time.time() - starttime) / 60.0) + ) + + return 0 + + +def main(): + """ + Main method for the spider_cms script. + + Parses arguments and invokes main_driver + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--process_queue", action="store_true", dest="process_queue", + help="Process also schedd queue (Running/Idle/Pending jobs)", + ) + parser.add_argument( + "--feed_es", action="store_true", dest="feed_es", help="Feed to Elasticsearch" + ) + parser.add_argument( + "--feed_es_for_queues", action="store_true", dest="feed_es_for_queues", + help="Feed queue data also to Elasticsearch", + ) + parser.add_argument( + "--feed_amq", action="store_true", dest="feed_amq", help="Feed to CERN AMQ" + ) + parser.add_argument( + "--schedd_filter", default="", type=str, dest="schedd_filter", + help="Comma separated list of schedd names to process [default is to process all]", + ) + parser.add_argument( + "--skip_history", action="store_true", dest="skip_history", + help="Skip processing the history. (Only do queues.)", + ) + parser.add_argument( + "--read_only", action="store_true", dest="read_only", help="Only read the info, don't submit it.", + ) + parser.add_argument( + "--dry_run", action="store_true", dest="dry_run", + help="Don't even read info, just pretend to. (Still query the collector for the schedd's though.)", + ) + parser.add_argument( + "--max_documents_to_process", default=0, type=int, dest="max_documents_to_process", + help="Abort after this many documents (per schedd). [default: %(default)d (process all)]", + ) + parser.add_argument( + "--keep_full_queue_data", action="store_true", dest="keep_full_queue_data", + help="Drop all but some fields for running jobs.", + ) + parser.add_argument( + "--amq_bunch_size", default=5000, type=int, dest="amq_bunch_size", + help="Send docs to AMQ in bunches of this number " "[default: %(default)d]", + ) + parser.add_argument( + "--es_bunch_size", default=250, type=int, dest="es_bunch_size", + help="Send docs to ES in bunches of this number " "[default: %(default)d]", + ) + parser.add_argument( + "--query_queue_batch_size", default=50, type=int, dest="query_queue_batch_size", + help="Send docs to listener in batches of this number " "[default: %(default)d]", + ) + parser.add_argument( + "--upload_pool_size", default=8, type=int, dest="upload_pool_size", + help="Number of parallel processes for uploading. SHOULD BE LESS THAN #CPUS or StompAMQ fails! Suggested: #cpus/2 . [default: %(default)d]", + ) + parser.add_argument( + "--query_pool_size", default=8, type=int, dest="query_pool_size", + help="Number of parallel processes for querying. SHOULD BE LESS THAN #CPUS! [default: %(default)d]", + ) + parser.add_argument( + "--es_index_template", type=str, dest="es_index_template", default=None, + help="Trunk of index pattern. Needs to start with 'cms' [default: %(default)s]", + ) + parser.add_argument( + "--log_dir", default="log/", type=str, dest="log_dir", + help="Directory for logging information [default: %(default)s]", + ) + parser.add_argument( + "--log_level", default="WARNING", type=str, dest="log_level", + help="Log level (CRITICAL/ERROR/WARNING/INFO/DEBUG) " "[default: %(default)s]", + ) + parser.add_argument( + "--email_alerts", default=[], action="append", dest="email_alerts", + help="Email addresses for alerts [default: none]", + ) + parser.add_argument( + "--collectors", + default=[ + "cmssrv623.fnal.gov:9620", + "cmsgwms-collector-tier0.cern.ch:9620", + "cmssrv276.fnal.gov", + "cmsgwms-collector-itb.cern.ch", + "vocms0840.cern.ch", + ], + action="append", + dest="collectors", + help="Collectors' addresses", + ) + parser.add_argument( + "--collectors_file", default=None, action="store", type=argparse.FileType("r"), dest="collectors_file", + help="FIle defining the pools and collectors", + ) + parser.add_argument( + "--history_query_max_n_minutes", default=12 * 60, type=int, dest="history_query_max_n_minutes", + help="If no checkpoint provided, query max N minutes from now of completed job results in history shcedds. " + "It gives elasticity to test jobs. [default: %(default)d]", + ) + parser.add_argument( + "--mock_cern_domain", action="store_true", dest="mock_cern_domain", + help="ElasticsearchInterface forces to be in CERN domain. In dev tests, mock that using this var", + ) + args = parser.parse_args() + # ============ Checks ============== + # Check AMQ environment variables + if (not os.getenv("CMS_HTCONDOR_PRODUCER")) or (not os.getenv("CMS_HTCONDOR_BROKER")): + logging.error("Please set required environment variables for AMQ") + sys.exit(1) + + # Check ES params are defined for feed_es(history schedds) + if args.feed_es: + _workdir = os.getenv("SPIDER_WORKDIR", "/opt/spider") + _es_creds_file = os.path.join(_workdir, "etc/es_conf.json") + if (not args.es_index_template) or (not os.path.exists(_es_creds_file)): + logging.error( + "Please [set es_index_template param] AND [provide etc/es_conf.json], because feed_es is set.") + sys.exit(1) + + if args.feed_amq: + _workdir = os.getenv("SPIDER_WORKDIR", "/opt/spider") + if (not os.path.exists(os.path.join(_workdir, "etc/amq_username"))) or \ + (not os.path.exists(os.path.join(_workdir, "etc/amq_password"))): + logging.error("Please provide [etc/amq_username] and [etc/amq_password], because feed_amq is set.") + sys.exit(1) + + set_up_logging(args) + + # --dry_run implies read_only + args.read_only = args.read_only or args.dry_run + + main_driver(args) + + +if __name__ == "__main__": + main() diff --git a/docker/spider-query-cronjob/src/utils.py b/docker/spider-query-cronjob/src/utils.py new file mode 100644 index 00000000..ce413882 --- /dev/null +++ b/docker/spider-query-cronjob/src/utils.py @@ -0,0 +1,176 @@ +""" +Various helper utilities for the HTCondor-ES integration +""" + +import os +import pwd +import sys +import time +import errno +import shlex +import socket +import random +import logging +import smtplib +import subprocess +import email.mime.text +import logging.handlers +import json + +import classad +import htcondor + +TIMEOUT_MINS = 60 + + +def get_schedds_from_file(args=None, collectors_file=None): + schedds = [] + names = set() + try: + pools = json.load(collectors_file) + for pool in pools: + _pool_schedds = get_schedds(args, collectors=pools[pool], pool_name=pool) + schedds.extend([s for s in _pool_schedds if s.get("Name") not in names]) + names.update([s.get("Name") for s in _pool_schedds]) + + except (IOError, json.JSONDecodeError): + schedds = get_schedds(args) + return schedds + + +def get_schedds(args=None, collectors=None, pool_name="Unknown"): + """ + Return a list of schedd ads representing all the schedds in the pool. + """ + collectors = collectors or [] + schedd_query = classad.ExprTree("!isUndefined(CMSGWMS_Type)") + + schedd_ads = {} + for host in collectors: + coll = htcondor.Collector(host) + try: + schedds = coll.query( + htcondor.AdTypes.Schedd, + schedd_query, + projection=["MyAddress", "ScheddIpAddr", "Name"], + ) + except IOError as e: + logging.warning(str(e)) + continue + + for schedd in schedds: + try: + schedd["CMS_Pool"] = pool_name + schedd_ads[schedd["Name"]] = schedd + except KeyError: + pass + + schedd_ads = list(schedd_ads.values()) + random.shuffle(schedd_ads) + + if args and args.schedd_filter: + return [s for s in schedd_ads if s["Name"] in args.schedd_filter.split(",")] + + return schedd_ads + + +def send_email_alert(recipients, subject, message): + """ + Send a simple email alert (typically of failure). + """ + # TMP: somehow send_email_alert still sending alerts + if not recipients: + return + msg = email.mime.text.MIMEText(message) + msg["Subject"] = "%s - %sh: %s" % ( + socket.gethostname(), + time.strftime("%b %d, %H:%M"), + subject, + ) + + domain = socket.getfqdn() + uid = os.geteuid() + pw_info = pwd.getpwuid(uid) + if "cern.ch" not in domain: + domain = "%s.unl.edu" % socket.gethostname() + msg["From"] = "%s@%s" % (pw_info.pw_name, domain) + msg["To"] = recipients[0] + + try: + sess = smtplib.SMTP("localhost") + sess.sendmail(msg["From"], recipients, msg.as_string()) + sess.quit() + except Exception as exn: # pylint: disable=broad-except + logging.warning("Email notification failed: %s", str(exn)) + + +def time_remaining(starttime, timeout=TIMEOUT_MINS * 60, positive=True): + """ + Return the remaining time (in seconds) until starttime + timeout + Returns 0 if there is no time remaining + """ + elapsed = time.time() - starttime + if positive: + return max(0, timeout - elapsed) + return timeout - elapsed + + +def set_up_logging(args): + """Configure root logger with rotating file handler""" + logger = logging.getLogger() + + log_level = getattr(logging, args.log_level.upper(), None) + if not isinstance(log_level, int): + raise ValueError("Invalid log level: %s" % log_level) + logger.setLevel(log_level) + + if log_level <= logging.INFO: + logging.getLogger("CMSMonitoring.StompAMQ").setLevel(log_level + 10) + logging.getLogger("stomp.py").setLevel(log_level + 10) + + try: + os.makedirs(args.log_dir) + except OSError as oserr: + if oserr.errno != errno.EEXIST: + raise + + log_file = os.path.join(args.log_dir, "spider_cms.log") + filehandler = logging.handlers.RotatingFileHandler(log_file, maxBytes=100000) + filehandler.setFormatter( + logging.Formatter("%(asctime)s : %(name)s:%(levelname)s - %(message)s") + ) + logger.addHandler(filehandler) + + if os.isatty(sys.stdout.fileno()): + streamhandler = logging.StreamHandler(stream=sys.stdout) + logger.addHandler(streamhandler) + + +def collect_metadata(): + """ + Return a dictionary with: + - hostname + - username + - current time (in epoch millisec) + - hash of current git commit + """ + result = {} + result["spider_git_hash"] = get_githash() + result["spider_hostname"] = socket.gethostname() + result["spider_username"] = pwd.getpwuid(os.geteuid()).pw_name + result["spider_runtime"] = int(time.time() * 1000) + return result + + +def get_githash(): + """Returns the git hash of the current commit in the scripts repository""" + gitwd = os.path.dirname(os.path.realpath(__file__)) + cmd = r"git rev-parse --verify HEAD" + try: + call = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, cwd=gitwd) + out, err = call.communicate() + return str(out.strip()) + + except Exception as e: + logging.warning(str(e)) + return "unknown"