Skip to content

Commit 8dcc722

Browse files
committed
feat: use a staging table to avoid unduly truncations
1 parent b1d87b8 commit 8dcc722

File tree

5 files changed

+113
-54
lines changed

5 files changed

+113
-54
lines changed

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1513,53 +1513,64 @@ def setJobCommandStatus(self, jobID, command, status):
15131513
def fillJobsHistorySummary(self):
15141514
"""Fill the JobsHistorySummary table with the summary of the jobs in a final state"""
15151515

1516+
# Create the staging table
1517+
createStagingTable_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging LIKE JobsHistorySummary"
1518+
result = self._update(createStagingTable_sql)
1519+
if not result["OK"]:
1520+
return result
1521+
1522+
# Insert the data into the staging table
15161523
defString = "Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus"
15171524
valuesString = "COUNT(JobID), SUM(RescheduleCounter)"
15181525
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
15191526
final_states = f"'{final_states}'"
1520-
15211527
query = (
1522-
f"INSERT INTO JobsHistorySummary SELECT {defString}, {valuesString} "
1528+
f"INSERT INTO JobsHistorySummary_staging SELECT {defString}, {valuesString} "
15231529
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime < UTC_DATE() "
15241530
f"GROUP BY {defString}"
15251531
)
15261532
result = self._update(query)
15271533
if not result["OK"]:
15281534
return result
1529-
return S_OK(result["Value"])
1535+
1536+
# Atomic swap
1537+
sql = (
1538+
"RENAME TABLE JobsHistorySummary TO JobsHistorySummary_old,"
1539+
"JobsHistorySummary_staging TO JobsHistorySummary;"
1540+
"DROP TABLE JobsHistorySummary_old;"
1541+
)
1542+
return self._update(sql)
15301543

15311544
def getSummarySnapshot(self, requestedFields=False):
15321545
"""Get the summary snapshot for a given combination"""
15331546
if not requestedFields:
15341547
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
1535-
valueFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"]
15361548
defString = ", ".join(requestedFields)
1537-
valueString = ", ".join(valueFields)
15381549
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
15391550
final_states = f"'{final_states}'"
15401551

1541-
query = f"SELECT {defString}, {valueString} FROM ("
1552+
query = f"SELECT {defString}, JobCount, RescheduleSum FROM ("
15421553
# All jobs that are NOT in a final state
15431554
query += (
1544-
f"SELECT {defString}, {valueString}, COUNT(JobID), SUM(RescheduleCounter) "
1555+
f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum "
15451556
f"FROM Jobs WHERE STATUS NOT IN ({final_states}) "
1546-
f"GROUP BY {defString}, {valueString} "
1557+
f"GROUP BY {defString} "
15471558
)
15481559
query += "UNION ALL "
15491560
# Recent jobs only (today) that are in a final state
15501561
query += (
1551-
f"SELECT {defString}, {valueString}, COUNT(JobID), SUM(RescheduleCounter) "
1562+
f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum "
15521563
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1553-
f"GROUP BY {defString}, {valueString} "
1564+
f"GROUP BY {defString} "
15541565
)
15551566
query += "UNION ALL "
15561567
# Cached history (of jobs in a final state)
1557-
query += f"SELECT * FROM JobsHistorySummary) AS combined GROUP BY {defString}, {valueString}"
1568+
query += f"SELECT * FROM JobsHistorySummary) AS combined GROUP BY {defString}, JobCount, RescheduleSum"
15581569

1559-
result = self._query(query)
1560-
if not result["OK"]:
1561-
return result
1562-
return S_OK(((requestedFields + valueFields), result["Value"]))
1570+
return self._query(query)
1571+
# if not result["OK"]:
1572+
# return result
1573+
# return S_OK(((requestedFields + valueFields), result["Value"]))
15631574

15641575
def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines):
15651576
"""Remove HeartBeatLoggingInfo from DB.

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,53 +1050,63 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems):
10501050
def fillPilotsHistorySummary(self):
10511051
"""Fill the PilotsHistorySummary table with the summary of the Pilots in a final state"""
10521052

1053+
# Create the staging table
1054+
createStagingTable_sql = "CREATE TABLE IF NOT EXISTS PilotsHistorySummary_staging LIKE PilotsHistorySummary"
1055+
result = self._update(createStagingTable_sql)
1056+
if not result["OK"]:
1057+
return result
1058+
1059+
# Insert the data into the staging table
10531060
defString = "GridSite, GridType, Status"
10541061
valuesString = "COUNT(PilotID)"
10551062
final_states = "', '".join(PilotStatus.PILOT_FINAL_STATES)
10561063
final_states = f"'{final_states}'"
10571064

10581065
query = (
1059-
f"INSERT INTO PilotsHistorySummary SELECT {defString}, {valuesString} "
1066+
f"INSERT INTO PilotsHistorySummary_staging SELECT {defString}, {valuesString} "
10601067
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime < UTC_DATE() "
10611068
f"GROUP BY {defString}"
10621069
)
10631070
result = self._update(query)
10641071
if not result["OK"]:
10651072
return result
1066-
return S_OK(result["Value"])
1073+
1074+
# Atomic swap
1075+
sql = (
1076+
"RENAME TABLE PilotsHistorySummary TO PilotsHistorySummary_old,"
1077+
"PilotsHistorySummary_staging TO PilotsHistorySummary;"
1078+
"DROP TABLE PilotsHistorySummary_old;"
1079+
)
1080+
return self._update(sql)
10671081

10681082
def getSummarySnapshot(self, requestedFields=False):
10691083
"""Get the summary snapshot for a given combination"""
10701084
if not requestedFields:
10711085
requestedFields = ["GridSite", "GridType", "Status"]
1072-
valueFields = ["COUNT(PilotID)"]
10731086
defString = ", ".join(requestedFields)
1074-
valueString = ", ".join(valueFields)
1087+
valueString = "COUNT(PilotID) AS PilotCount"
10751088
final_states = "', '".join(PilotStatus.PILOT_FINAL_STATES)
10761089
final_states = f"'{final_states}'"
10771090

1078-
query = f"SELECT {defString}, {valueString} FROM ("
1091+
query = f"SELECT {defString}, PilotCount FROM ("
10791092
# All Pilots that are NOT in a final state
10801093
query += (
1081-
f"SELECT {defString}, {valueString}, COUNT(PilotID) "
1082-
f"FROM PilotsAgents WHERE STATUS NOT IN ({final_states}) "
1083-
f"GROUP BY {defString}, {valueString} "
1094+
f"SELECT {defString}, {valueString} "
1095+
f"FROM PilotAgents WHERE STATUS NOT IN ({final_states}) "
1096+
f"GROUP BY {defString} "
10841097
)
10851098
query += "UNION ALL "
10861099
# Recent Pilots only (today) that are in a final state
10871100
query += (
1088-
f"SELECT {defString}, {valueString}, COUNT(PilotID) "
1089-
f"FROM Pilots WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1090-
f"GROUP BY {defString}, {valueString} "
1101+
f"SELECT {defString}, {valueString} "
1102+
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1103+
f"GROUP BY {defString} "
10911104
)
10921105
query += "UNION ALL "
10931106
# Cached history (of Pilots in a final state)
1094-
query += f"SELECT * FROM PilotsHistorySummary) AS combined GROUP BY {defString}, {valueString}"
1107+
query += f"SELECT * FROM PilotsHistorySummary) AS combined GROUP BY {defString}, PilotCount"
10951108

1096-
result = self._query(query)
1097-
if not result["OK"]:
1098-
return result
1099-
return S_OK(((requestedFields + valueFields), result["Value"]))
1109+
return self._query(query)
11001110

11011111

11021112
class PivotedPilotSummaryTable:

tests/Integration/WorkloadManagementSystem/Test_JobDB.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -450,16 +450,13 @@ def test_attributes(jobDB):
450450
def process_data(jobIDs, data):
451451
converted_data = []
452452

453-
print(data[0])
454453
full_data = []
455454

456455
for j, d in zip(jobIDs, data):
457456
row = list(d)
458457
row.insert(0, j) # Insert JobID at the beginning of the row
459458
full_data.append(row)
460459

461-
print(full_data[0])
462-
463460
for row in full_data:
464461
# date fields
465462
date_indices = [8, 9, 10, 11, 12, 13] # Positions of date fields
@@ -483,12 +480,25 @@ def process_data(jobIDs, data):
483480
except ValueError:
484481
# Handle invalid integers
485482
row[i] = 0
486-
# Convert boolean fields
487483
converted_data.append(tuple(row))
488484
return converted_data
489485

490486

491-
def test_summarySnapshot(jobDB: JobDB):
487+
def test_summarySnapshot():
488+
jobDB = JobDB()
489+
for table in [
490+
"InputData",
491+
"JobParameters",
492+
"AtticJobParameters",
493+
"HeartBeatLoggingInfo",
494+
"OptimizerParameters",
495+
"JobCommands",
496+
"Jobs",
497+
"JobJDLs",
498+
]:
499+
sqlCmd = f"DELETE from `{table}`"
500+
jobDB._update(sqlCmd)
501+
492502
# insert some predefined jobs to test the summary snapshot
493503
with open("jobs.csv", newline="", encoding="utf-8") as csvfile:
494504
csvreader = csv.reader(csvfile)

tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# pylint: disable=wrong-import-position
77

88
import csv
9-
9+
from datetime import datetime
1010
from unittest.mock import patch
1111

1212
import DIRAC
@@ -190,14 +190,56 @@ def test_PivotedPilotSummaryTable():
190190
cleanUpPilots(pilotRef)
191191

192192

193+
# Parse date strings into datetime objects
194+
def process_data(data):
195+
converted_data = []
196+
197+
for row in data:
198+
# date fields
199+
date_indices = [10, 11] # Positions of date fields
200+
for i in date_indices:
201+
if not row[i]:
202+
row[i] = None
203+
else:
204+
try:
205+
row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S")
206+
except ValueError:
207+
# Handle invalid dates
208+
row[i] = None
209+
# Convert other fields to appropriate types
210+
int_indices = [0, 1] # Positions of integer fields
211+
for i in int_indices:
212+
if not row[i]:
213+
row[i] = 0
214+
else:
215+
try:
216+
row[i] = int(row[i])
217+
except ValueError:
218+
# Handle invalid integers
219+
row[i] = 0
220+
float_indices = [9] # Positions of float fields
221+
for i in float_indices:
222+
if not row[i]:
223+
row[i] = 0
224+
else:
225+
try:
226+
row[i] = float(row[i])
227+
except ValueError:
228+
# Handle invalid float
229+
row[i] = 0
230+
converted_data.append(tuple(row))
231+
return converted_data
232+
233+
193234
def test_summarySnapshot():
194235
# insert some predefined jobs to test the summary snapshot
195236
with open("pilots.csv", newline="", encoding="utf-8") as csvfile:
196237
csvreader = csv.reader(csvfile)
197238
data = list(csvreader)
198-
placeholders = ",".join(["%s"] * len(data[0]))
199-
sql = f"INSERT INTO PilotAgents VALUES ({placeholders})"
200-
res = paDB._updatemany(sql, data)
239+
processed_data = process_data(data)
240+
placeholders = ",".join(["%s"] * len(processed_data[0]))
241+
sql = f"INSERT INTO PilotAgents (InitialJobID, CurrentJobID, PilotJobReference, PilotStamp, DestinationSite, Queue, GridSite, VO, GridType, BenchMark, SubmissionTime, LastUpdateTime, Status, StatusReason, AccountingSent) VALUES ({placeholders})"
242+
res = paDB._updatemany(sql, processed_data)
201243
assert res["OK"], res["Message"]
202244
# Act
203245
res = paDB.fillPilotsHistorySummary()

tests/Integration/WorkloadManagementSystem/jobs.csv

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -232,20 +232,6 @@ MCSimulation,00100015,LCG.CERN.cern,00100015_00000045,cburr,lhcb_mc,Unknown,2021
232232
MCSimulation,00100015,LCG.CERN.cern,00100015_00000046,cburr,lhcb_mc,Unknown,2021-05-21 13:21:49,,2021-05-22 09:46:42,2021-05-21 13:22:00,2021-05-22 09:46:41,2021-05-22 09:46:42,Done,Execution Complete,Job Finished Successfully,0,0,True,False
233233
MCSimulation,00100015,LCG.CERN.cern,00100015_00000047,cburr,lhcb_mc,Unknown,2021-05-21 13:21:49,,2021-05-22 03:53:36,2021-05-21 13:22:12,2021-05-22 03:53:31,2021-05-22 03:53:36,Done,Execution Complete,Job Finished Successfully,0,0,True,False
234234
MCSimulation,00100015,LCG.Manchester.uk,00100015_00000048,cburr,lhcb_mc,Unknown,2021-05-21 13:21:49,,2021-05-22 12:19:26,2021-05-21 13:22:13,2021-05-22 10:53:22,2021-05-22 12:19:26,Failed,Failed sending requests,Failed to upload output data,0,0,True,False
235-
MCSimulation,00100015,LCG.RAL-HEP.uk,00100015_00000050,cburr,lhcb_mc,2021-05-21 13:21:49,,2021-05-22 15:51:42,2021-05-21 13:22:18,2021-05-22 14:25:38,2021-05-22 15:51:43,Failed,Failed sending requests,Failed to upload output data,0,0,True,False,Unknown
236-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000051,cburr,lhcb_mc,2021-05-21 13:21:49,,2021-05-22 16:47:54,2021-05-21 13:22:18,2021-05-22 16:47:53,2021-05-22 16:47:54,Done,Execution Complete,Job Finished Successfully,0,0,True,False,Unknown
237-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000052,cburr,lhcb_mc,2021-05-21 13:21:49,,2021-05-22 18:02:54,2021-05-21 13:22:16,2021-05-22 18:02:52,2021-05-22 18:02:54,Done,Execution Complete,Job Finished Successfully,0,0,True,False,Unknown
238-
MCSimulation,00100015,LCG.Manchester.uk,00100015_00000053,cburr,lhcb_mc,2021-05-21 13:21:49,,2021-05-22 11:24:41,2021-05-21 13:22:45,2021-05-22 09:58:25,2021-05-22 11:24:41,Failed,Failed sending requests,Failed to upload output data,0,0,True,False,Unknown
239-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000054,cburr,lhcb_mc,2021-05-21 13:21:49,,2021-05-22 20:15:57,2021-05-21 13:22:19,2021-05-22 20:15:55,2021-05-22 20:15:57,Done,Execution Complete,Job Finished Successfully,0,0,True,False,Unknown
240-
MCSimulation,00100015,LCG.RAL.uk,00100015_00000055,cburr,lhcb_mc,2021-05-21 13:21:49,,2021-05-22 08:03:14,2021-05-21 13:22:31,2021-05-22 06:37:22,2021-05-22 08:03:15,Failed,Failed sending requests,Failed to upload output data,0,0,True,False,Unknown
241-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000056,cburr,lhcb_mc,2021-05-21 13:21:50,,2021-05-22 18:42:18,2021-05-21 13:22:22,2021-05-22 18:42:16,2021-05-22 18:42:18,Done,Execution Complete,Job Finished Successfully,0,0,True,False,Unknown
242-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000057,cburr,lhcb_mc,2021-05-21 13:21:50,,2021-05-22 21:35:07,2021-05-21 13:22:28,2021-05-22 21:35:06,2021-05-22 21:35:07,Done,Execution Complete,Job Finished Successfully,0,0,True,False,Unknown
243-
MCSimulation,00100015,LCG.RAL.uk,00100015_00000058,cburr,lhcb_mc,2021-05-21 13:21:50,,2021-05-22 09:47:09,2021-05-21 13:22:49,2021-05-22 08:21:13,2021-05-22 09:47:09,Failed,Failed sending requests,Failed to upload output data,0,0,True,False,Unknown
244-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000059,cburr,lhcb_mc,2021-05-21 13:21:50,,2021-05-22 20:18:15,2021-05-21 13:22:16,2021-05-22 20:18:14,2021-05-22 20:18:16,Done,Execution Complete,Job Finished Successfully,0,0,True,False,Unknown
245-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000060,cburr,lhcb_mc,2021-05-21 13:21:50,,2021-05-22 04:27:25,2021-05-21 13:22:14,2021-05-22 02:22:30,,Failed,Job stalled: pilot not running,Gauss step 1,0,0,True,True,Unknown
246-
MCSimulation,00100015,LCG.CERN.cern,00100015_00000061,cburr,lhcb_mc,2021-05-21 13:21:50,,2021-05-22 07:01:32,2021-05-21 13:22:19,2021-05-22 07:01:25,2021-05-22 07:01:31,Done,Execution Complete,Job Finished Successfully,0,0,True,False,Unknown
247-
MCSimulation,00100015,LCG.RAL-HEP.uk,00100015_00000062,cburr,lhcb_mc,2021-05-21 13:21:50,,2021-05-22 14:11:33,2021-05-21 13:23:19,2021-05-22 12:45:45,2021-05-22 14:11:33,Failed,Failed sending requests,Failed to upload output data,0,0,True,False,Unknown
248-
MCReconstruction,00100016,LCG.CERN.cern,00100016_00000019,cburr,lhcb_data,2021-05-22 18:05:34,,2021-05-22 20:22:37,2021-05-22 20:16:20,2021-05-22 20:22:37,2021-05-22 20:22:38,Failed,Application Finished With Errors,Boole v30r4 exited with status 2,8,0,True,False,Unknown
249235
MCReconstruction,00100016,LCG.CERN.cern,00100016_00000021,cburr,lhcb_data,Unknown,2021-05-22 22:33:54,,2021-05-23 01:26:34,2021-05-23 01:23:07,2021-05-23 01:26:33,2021-05-23 01:26:34,Failed,Application Finished With Errors,Boole v30r4 exited with status 2,8,0,True,False
250236
MCReconstruction,00100016,LCG.CERN.cern,00100016_00000022,cburr,lhcb_data,Unknown,2021-05-23 01:26:56,,2021-05-23 02:02:13,2021-05-23 01:27:37,2021-05-23 02:02:10,2021-05-23 02:02:13,Failed,Application Finished With Errors,Boole v30r4 exited with status 2,8,0,True,False
251237
MCReconstruction,00100016,LCG.CERN.cern,00100016_00000023,cburr,lhcb_data,Unknown,2021-05-23 02:02:57,,2021-05-23 02:07:09,2021-05-23 02:03:51,2021-05-23 02:07:08,2021-05-23 02:07:09,Failed,Application Finished With Errors,Boole v30r4 exited with status 2,8,0,True,False

0 commit comments

Comments
 (0)