Skip to content

Commit 46e2c90

Browse files
committed
feat: JobDB snapshot: use a summary table for jobs in final state
1 parent 10ef53a commit 46e2c90

File tree

3 files changed

+64
-2
lines changed

3 files changed

+64
-2
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ def initialize(self):
4949
"""Standard initialization"""
5050
# This agent will always loop every 15 minutes
5151
self.am_setOption("PollingTime", 900)
52+
# This agent will restart every 24 hours
53+
self.am_setOption("MaxCycles", 96)
5254

5355
# Check whether to send to Monitoring or Accounting or both
5456
self.jobMonitoringOption = Operations().getMonitoringBackends(monitoringType="WMSHistory")
@@ -65,6 +67,11 @@ def initialize(self):
6567
if "Monitoring" in self.pilotMonitoringOption:
6668
self.pilotReporter = MonitoringReporter(monitoringType="PilotsHistory", failoverQueueName=messageQueue)
6769

70+
res = JobDB().fillJobsHistorySummary()
71+
if not res["OK"]:
72+
self.log.error("Could not fill the JobDB summary", res["Message"])
73+
return S_ERROR()
74+
6875
self.__jobDBFields = []
6976
for field in self.__summaryKeyFieldsMapping:
7077
if field == "User":

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2121
from DIRAC.Core.Utilities.Decorators import deprecated
2222
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
23-
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException
23+
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, SErrorException, convertToReturnValue, returnValueOrRaise
2424
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
2525
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
2626
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
@@ -1510,14 +1510,53 @@ def setJobCommandStatus(self, jobID, command, status):
15101510
return self._update(f"UPDATE JobCommands SET Status={status} WHERE JobID={jobID} AND Command={command}")
15111511

15121512
#####################################################################################
1513+
def fillJobsHistorySummary(self):
1514+
"""Fill the JobsHistorySummary table with the summary of the jobs in a final state"""
1515+
1516+
defString = "Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus"
1517+
valuesString = "COUNT(JobID), SUM(RescheduleCounter)"
1518+
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
1519+
final_states = f"'{final_states}'"
1520+
1521+
query = (
1522+
f"INSERT INTO JobsHistorySummary SELECT {defString}, {valuesString} "
1523+
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime < UTC_DATE() "
1524+
f"GROUP BY {defString}"
1525+
)
1526+
result = self._update(query)
1527+
if not result["OK"]:
1528+
return result
1529+
return S_OK(result["Value"])
1530+
15131531
def getSummarySnapshot(self, requestedFields=False):
15141532
"""Get the summary snapshot for a given combination"""
15151533
if not requestedFields:
15161534
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
15171535
valueFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"]
15181536
defString = ", ".join(requestedFields)
15191537
valueString = ", ".join(valueFields)
1520-
result = self._query(f"SELECT {defString}, {valueString} FROM Jobs GROUP BY {defString}")
1538+
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
1539+
final_states = f"'{final_states}'"
1540+
1541+
query = f"SELECT {defString}, {valueString} FROM ("
1542+
# All jobs that are NOT in a final state
1543+
query += (
1544+
f"SELECT {defString}, {valueString}, COUNT(JobID), SUM(RescheduleCounter) "
1545+
f"FROM Jobs WHERE STATUS NOT IN ({final_states}) "
1546+
f"GROUP BY {defString}, {valueString} "
1547+
)
1548+
query += "UNION ALL "
1549+
# Recent jobs only (today) that are in a final state
1550+
query += (
1551+
f"SELECT {defString}, {valueString}, COUNT(JobID), SUM(RescheduleCounter) "
1552+
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1553+
f"GROUP BY {defString}, {valueString} "
1554+
)
1555+
query += "UNION ALL "
1556+
# Cached history (of jobs in a final state)
1557+
query += f"SELECT * FROM JobsHistorySummary) AS combined GROUP BY {defString}, {valueString}"
1558+
1559+
result = self._query(query)
15211560
if not result["OK"]:
15221561
return result
15231562
return S_OK(((requestedFields + valueFields), result["Value"]))

src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,19 @@ CREATE TABLE `JobCommands` (
132132
PRIMARY KEY (`JobID`,`Arguments`,`ReceptionTime`),
133133
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
134134
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
135+
136+
-- ------------------------------------------------------------------------------
137+
DROP TABLE IF EXISTS `JobsHistorySummary`;
138+
CREATE TABLE `JobsHistorySummary` (
139+
`Status` VARCHAR(32),
140+
`Site` VARCHAR(100),
141+
`Owner` VARCHAR(32),
142+
`OwnerGroup` VARCHAR(128),
143+
`JobGroup` VARCHAR(32),
144+
`JobType` VARCHAR(32),
145+
`ApplicationStatus` VARCHAR(255),
146+
`MinorStatus` VARCHAR(128),
147+
`JobCount` INT,
148+
`RescheduleSum` INT,
149+
PRIMARY KEY (Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus)
150+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

0 commit comments

Comments
 (0)