Skip to content

Commit fb260d8

Browse files
committed
fix: pre-select the fill_time instead of using UTC_DATE
1 parent 07aba0e commit fb260d8

File tree

5 files changed

+26
-18
lines changed

5 files changed

+26
-18
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,15 @@ def initialize(self):
6868
if "Monitoring" in self.pilotMonitoringOption:
6969
self.pilotReporter = MonitoringReporter(monitoringType="PilotsHistory", failoverQueueName=messageQueue)
7070

71-
threadJobDB = threading.Thread(target=JobDB().fillJobsHistorySummary)
71+
# self.fill_time: now -1h
72+
now = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
73+
self.fill_time = now.strftime("%Y-%m-%d %H:%M:%S")
74+
75+
threadJobDB = threading.Thread(target=lambda: JobDB().fillJobsHistorySummary(self.fill_time))
7276
threadJobDB.daemon = True
7377
threadJobDB.start()
7478

75-
threadPilotDB = threading.Thread(target=PilotAgentsDB().fillPilotsHistorySummary)
79+
threadPilotDB = threading.Thread(target=lambda: PilotAgentsDB().fillPilotsHistorySummary(self.fill_time))
7680
threadPilotDB.daemon = True
7781
threadPilotDB.start()
7882

@@ -96,7 +100,7 @@ def execute(self):
96100
# PilotsHistory to Monitoring
97101
if "Monitoring" in self.pilotMonitoringOption:
98102
self.log.info("Committing PilotsHistory to Monitoring")
99-
result = PilotAgentsDB().getSummarySnapshot()
103+
result = PilotAgentsDB().getSummarySnapshot(fill_time=self.fill_time)
100104
now = datetime.datetime.utcnow()
101105
if not result["OK"]:
102106
self.log.error(
@@ -119,7 +123,7 @@ def execute(self):
119123

120124
# WMSHistory to Monitoring or Accounting
121125
self.log.info(f"Committing WMSHistory to {'and '.join(self.jobMonitoringOption)} backend")
122-
result = JobDB().getSummarySnapshot(self.__jobDBFields)
126+
result = JobDB().getSummarySnapshot(fill_time=self.fill_time, requestedFields=self.__jobDBFields)
123127
now = datetime.datetime.utcnow()
124128
if not result["OK"]:
125129
self.log.error("Can't get the JobDB summary", f"{result['Message']}: won't commit WMSHistory at this cycle")

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,7 +1510,7 @@ def setJobCommandStatus(self, jobID, command, status):
15101510
return self._update(f"UPDATE JobCommands SET Status={status} WHERE JobID={jobID} AND Command={command}")
15111511

15121512
#####################################################################################
1513-
def fillJobsHistorySummary(self):
1513+
def fillJobsHistorySummary(self, fill_time: str):
15141514
"""Fill the JobsHistorySummary table with the summary of the jobs in a final state"""
15151515

15161516
# Create the staging table
@@ -1525,7 +1525,7 @@ def fillJobsHistorySummary(self):
15251525
final_states = f"'{final_states}'"
15261526
query = (
15271527
f"INSERT INTO JobsHistorySummary_staging SELECT {defString}, {valuesString} "
1528-
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime < UTC_DATE() "
1528+
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime < '{fill_time}' "
15291529
f"GROUP BY {defString}"
15301530
)
15311531
if not (result := self._update(query))["OK"]:
@@ -1539,7 +1539,7 @@ def fillJobsHistorySummary(self):
15391539
)
15401540
return self._update(sql)
15411541

1542-
def getSummarySnapshot(self, requestedFields=False):
1542+
def getSummarySnapshot(self, fill_time: str, requestedFields=False):
15431543
"""Get the summary snapshot for a given combination"""
15441544
if not requestedFields:
15451545
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
@@ -1555,10 +1555,10 @@ def getSummarySnapshot(self, requestedFields=False):
15551555
f"GROUP BY {defString} "
15561556
)
15571557
query += "UNION ALL "
1558-
# Recent jobs only (today) that are in a final state
1558+
# Recent jobs only (after fill_time) that are in a final state
15591559
query += (
15601560
f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum "
1561-
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1561+
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime >= '{fill_time}' "
15621562
f"GROUP BY {defString} "
15631563
)
15641564
query += "UNION ALL "

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,7 +1047,7 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems):
10471047

10481048
return S_OK(resultDict)
10491049

1050-
def fillPilotsHistorySummary(self):
1050+
def fillPilotsHistorySummary(self, fill_time: str):
10511051
"""Fill the PilotsHistorySummary table with the summary of the Pilots in a final state"""
10521052

10531053
# Create the staging table
@@ -1063,7 +1063,7 @@ def fillPilotsHistorySummary(self):
10631063

10641064
query = (
10651065
f"INSERT INTO PilotsHistorySummary_staging SELECT {defString}, {valuesString} "
1066-
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime < UTC_DATE() "
1066+
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime < '{fill_time}' "
10671067
f"GROUP BY {defString}"
10681068
)
10691069
if not (result := self._update(query))["OK"]:
@@ -1077,7 +1077,7 @@ def fillPilotsHistorySummary(self):
10771077
)
10781078
return self._update(sql)
10791079

1080-
def getSummarySnapshot(self, requestedFields=False):
1080+
def getSummarySnapshot(self, fill_time: str, requestedFields=False):
10811081
"""Get the summary snapshot for a given combination"""
10821082
if not requestedFields:
10831083
requestedFields = ["GridSite", "GridType", "Status"]
@@ -1097,7 +1097,7 @@ def getSummarySnapshot(self, requestedFields=False):
10971097
# Recent Pilots only (today) that are in a final state
10981098
query += (
10991099
f"SELECT {defString}, {valueString} "
1100-
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1100+
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime >= '{fill_time}' "
11011101
f"GROUP BY {defString} "
11021102
)
11031103
query += "UNION ALL "

tests/Integration/WorkloadManagementSystem/Test_JobDB.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,9 +520,11 @@ def test_summarySnapshot():
520520
res = jobDB._updatemany(sql, processed_data)
521521
assert res["OK"], res["Message"]
522522
# Act
523-
res = jobDB.fillJobsHistorySummary()
523+
now = datetime.utcnow() - timedelta(hours=1)
524+
fill_time = now.strftime("%Y-%m-%d %H:%M:%S")
525+
res = jobDB.fillJobsHistorySummary(fill_time)
524526
assert res["OK"], res["Message"]
525-
res = jobDB.getSummarySnapshot()
527+
res = jobDB.getSummarySnapshot(fill_time)
526528
assert res["OK"], res["Message"]
527529
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
528530
defString = ", ".join(requestedFields)

tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# pylint: disable=wrong-import-position
77

88
import csv
9-
from datetime import datetime
9+
from datetime import datetime, timedelta
1010
from unittest.mock import patch
1111

1212
import DIRAC
@@ -242,9 +242,11 @@ def test_summarySnapshot():
242242
res = paDB._updatemany(sql, processed_data)
243243
assert res["OK"], res["Message"]
244244
# Act
245-
res = paDB.fillPilotsHistorySummary()
245+
now = datetime.utcnow() - timedelta(hours=1)
246+
fill_time = now.strftime("%Y-%m-%d %H:%M:%S")
247+
res = paDB.fillPilotsHistorySummary(fill_time)
246248
assert res["OK"], res["Message"]
247-
res = paDB.getSummarySnapshot()
249+
res = paDB.getSummarySnapshot(fill_time)
248250
assert res["OK"], res["Message"]
249251
requestedFields = ["GridSite", "GridType", "Status"]
250252
defString = ", ".join(requestedFields)

0 commit comments

Comments
 (0)