|
6 | 6 |
|
7 | 7 | # pylint: disable=wrong-import-position, missing-docstring |
8 | 8 |
|
| 9 | +import csv |
9 | 10 | from datetime import datetime, timedelta |
10 | 11 | from unittest.mock import MagicMock, patch |
11 | 12 |
|
@@ -443,3 +444,73 @@ def test_attributes(jobDB): |
443 | 444 | res = jobDB.getJobsAttributes([jobID_1, jobID_2], ["Status"]) |
444 | 445 | assert res["OK"], res["Message"] |
445 | 446 | assert res["Value"] == {jobID_1: {"Status": JobStatus.DONE}, jobID_2: {"Status": JobStatus.RUNNING}} |
| 447 | + |
| 448 | + |
| 449 | +# Parse date strings into datetime objects |
| 450 | +def process_data(jobIDs, data): |
| 451 | + converted_data = [] |
| 452 | + |
| 453 | + print(data[0]) |
| 454 | + full_data = [] |
| 455 | + |
| 456 | + for j, d in zip(jobIDs, data): |
| 457 | + row = list(d) |
| 458 | + row.insert(0, j) # Insert JobID at the beginning of the row |
| 459 | + full_data.append(row) |
| 460 | + |
| 461 | + print(full_data[0]) |
| 462 | + |
| 463 | + for row in full_data: |
| 464 | + # date fields |
| 465 | + date_indices = [8, 9, 10, 11, 12, 13] # Positions of date fields |
| 466 | + for i in date_indices: |
| 467 | + if not row[i]: |
| 468 | + row[i] = None |
| 469 | + else: |
| 470 | + try: |
| 471 | + row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S") |
| 472 | + except ValueError: |
| 473 | + # Handle invalid dates |
| 474 | + row[i] = None |
| 475 | + # Convert other fields to appropriate types |
| 476 | + int_indices = [17, 18] # Positions of integer fields |
| 477 | + for i in int_indices: |
| 478 | + if not row[i]: |
| 479 | + row[i] = 0 |
| 480 | + else: |
| 481 | + try: |
| 482 | + row[i] = int(row[i]) |
| 483 | + except ValueError: |
| 484 | + # Handle invalid integers |
| 485 | + row[i] = 0 |
| 486 | + # Convert boolean fields |
| 487 | + converted_data.append(tuple(row)) |
| 488 | + return converted_data |
| 489 | + |
| 490 | + |
| 491 | +def test_summarySnapshot(jobDB: JobDB): |
| 492 | + # insert some predefined jobs to test the summary snapshot |
| 493 | + with open("jobs.csv", newline="", encoding="utf-8") as csvfile: |
| 494 | + csvreader = csv.reader(csvfile) |
| 495 | + data = list(csvreader) |
| 496 | + |
| 497 | + # First inserting the JDLs |
| 498 | + jdlData = [(jdl, "", "")] * len(data) |
| 499 | + res = jobDB._updatemany("INSERT INTO JobJDLs (JDL, JobRequirements, OriginalJDL) VALUES (%s,%s,%s)", jdlData) |
| 500 | + assert res["OK"], res["Message"] |
| 501 | + # Getting which JobIDs were inserted |
| 502 | + res = jobDB._query("SELECT JobID FROM JobJDLs") |
| 503 | + assert res["OK"], res["Message"] |
| 504 | + jobIDs = [row[0] for row in res["Value"]][0 : len(data)] |
| 505 | + |
| 506 | + # Now inserting the jobs |
| 507 | + processed_data = process_data(jobIDs, data) |
| 508 | + placeholders = ",".join(["%s"] * len(processed_data[0])) |
| 509 | + sql = f"INSERT INTO Jobs (JobID, JobType, JobGroup, Site, JobName, Owner, OwnerGroup, VO, SubmissionTime, RescheduleTime, LastUpdateTime, StartExecTime, HeartBeatTime, EndExecTime, Status, MinorStatus, ApplicationStatus, UserPriority, RescheduleCounter, VerifiedFlag, AccountedFlag) VALUES ({placeholders})" |
| 510 | + res = jobDB._updatemany(sql, processed_data) |
| 511 | + assert res["OK"], res["Message"] |
| 512 | + # Act |
| 513 | + res = jobDB.fillJobsHistorySummary() |
| 514 | + assert res["OK"], res["Message"] |
| 515 | + res = jobDB.getSummarySnapshot() |
| 516 | + assert res["OK"], res["Message"] |
0 commit comments