From 3f077b1a1ea114efaeb5db5330445fb91e0d62b5 Mon Sep 17 00:00:00 2001 From: Richard Darst Date: Thu, 8 Aug 2024 22:40:34 +0300 Subject: [PATCH] Support CSV input option, which also allows tests to work again - CSV input doesn't have to be exactly like `sacct` would be, so that test data doesn't have to be updated for every schema change. - --csv-input allows CSV to be fed in. - Adjust different other parsers so that they can accept missing input fields (mainly the JobID fields), since with CSV various fields may be missing. This may change outputs, mainly giving nulls in places where there may have been empty strings before. --- slurm2sql.py | 207 ++++++++++++++++++++++++++----------------- test.py | 79 +++++++++++++---- tests/test-data3.csv | 2 + 3 files changed, 190 insertions(+), 98 deletions(-) create mode 100644 tests/test-data3.csv diff --git a/slurm2sql.py b/slurm2sql.py index f132d79..57ea050 100644 --- a/slurm2sql.py +++ b/slurm2sql.py @@ -51,6 +51,11 @@ def nullint(x): """int or None""" return int(x) if x else None +@settype('text') +def nullstr(x): + """str or None""" + return str(x) if x else None + @settype('text') def nullstr_strip(x): """str or None""" @@ -389,6 +394,7 @@ class slurmJobIDplain(linefunc): type = 'int' @staticmethod def calc(row): + if 'JobID' not in row: return return int(jobidplain_re.match(row['JobID']).group(0)) class slurmJobIDnostep(linefunc): @@ -396,6 +402,7 @@ class slurmJobIDnostep(linefunc): type = 'str' @staticmethod def calc(row): + if 'JobID' not in row: return return jobidnostep_re.match(row['JobID']).group(0) class slurmJobIDrawplain(linefunc): @@ -403,6 +410,7 @@ class slurmJobIDrawplain(linefunc): type = 'int' @staticmethod def calc(row): + if 'JobIDRaw' not in row: return return int(jobidplain_re.match(row['JobIDRaw']).group(0)) class slurmJobIDRawnostep(linefunc): @@ -410,6 +418,7 @@ class slurmJobIDRawnostep(linefunc): type = 'int' @staticmethod def calc(row): + if 'JobIDRaw' not in row: return return int(jobidplain_re.match(row['JobIDRaw']).group(0)) arraytaskid_re = re.compile(r'_([0-9]+)') @@ -418,6 +427,7 @@ class slurmArrayTaskID(linefunc): type = 'int' @staticmethod def calc(row): + if 'JobID' not in row: return if '_' not in row['JobID']: return if '[' in row['JobID']: return return int(arraytaskid_re.search(row['JobID']).group(1)) @@ -426,6 +436,7 @@ class slurmJobStep(linefunc): type = 'text' @staticmethod def calc(row): + if 'JobID' not in row: return if '.' not in row['JobID']: return return row['JobID'].split('.')[-1] # not necessarily an integer @@ -434,6 +445,7 @@ class slurmJobIDslurm(linefunc): type = 'text' @staticmethod def calc(row): + if 'JobID' not in row: return return row['JobID'] # Efficiency stuff @@ -517,16 +529,16 @@ def calc(row): '_JobStep': slurmJobStep, # Part after '.' '_JobIDSlurm': slurmJobIDslurm, # JobID directly as Slurm presents it # (with '_' and '.') - #'JobIDRawSlurm': str, # - 'JobName': str, # Free-form text name of the job - 'User': str, # Username - 'Group': str, # Group - 'Account': str, # Account - 'SubmitLine': str, # SubmitLine (execution command line) + #'JobIDRawSlurm': str, # + 'JobName': nullstr, # Free-form text name of the job + 'User': nullstr, # Username + 'Group': nullstr, # Group + 'Account': nullstr, # Account + 'SubmitLine': nullstr, # SubmitLine (execution command line) '_Billing': slurmBilling, # Billing (from tres) # Times and runtime info - 'State': str, # Job state + 'State': nullstr, # Job state 'Timelimit': slurmtime, # Timelimit specified by user 'Elapsed': slurmtime, # Walltime of the job #'_Time': slurmDefaultTime, # Genalized time, max(Submit, End, (current if started)) @@ -538,11 +550,11 @@ def calc(row): 'Start': slurmStartTS, # unixtime: Start 'End': slurmEndTS, # unixtime: End '_QueueTime': slurmQueueTime, # seconds, difference between submission and start - 'Partition': str, # Partition + 'Partition': nullstr, # Partition '_ExitCodeRaw': slurmExitCodeRaw, # ExitStatus:Signal 'ExitCode': slurmExitCode, # ExitStatus from above, int '_ExitSignal': slurmExitSignal, # Signal from above, int - 'NodeList': str, # Node list of jobs + 'NodeList': nullstr, # Node list of jobs 'Priority': nullint, # Slurm priority (higher = will run sooner) '_ConsumedEnergy': slurmConsumedEnergy, @@ -552,16 +564,16 @@ def calc(row): 'AllocNodes': nullint, # Number of nodes (allocated, zero if not running yet) # Miscelaneous requested resources - 'ReqTRES': str, - 'ReqGRES': str, # Raw GRES string + 'ReqTRES': nullstr, + 'ReqGRES': nullstr, # Raw GRES string 'NTasks': nullint, #'AllocGRES' - 'AllocTRES': str, - 'TRESUsageInAve': str, - 'TRESUsageInMax': str, - 'TRESUsageInMin': str, - 'TRESUsageInTot': str, - 'TRESUsageOutTot': str, + 'AllocTRES': nullstr, + 'TRESUsageInAve': nullstr, + 'TRESUsageInMax': nullstr, + 'TRESUsageInMin': nullstr, + 'TRESUsageInTot': nullstr, + 'TRESUsageOutTot': nullstr, # CPU related 'NCPUS': nullint, # === AllocCPUS @@ -573,18 +585,18 @@ def calc(row): 'SystemCPU': slurmtime, # '_CPUEff': slurmCPUEff, # CPU efficiency, should be same as seff 'MinCPU': slurmtime, # Minimum CPU used by any task in the job - 'MinCPUNode': str, - 'MinCPUTask': str, + 'MinCPUNode': nullstr, + 'MinCPUTask': nullstr, # Memory related - 'ReqMem': str, # Requested mem, value from slurm. Has a 'c' on 'n' suffix + 'ReqMem': nullstr, # Requested mem, value from slurm. Has a 'c' on 'n' suffix '_ReqMemType': slurmMemType, # 'c' for mem-per-cpu or 'n' for mem-per-node '_ReqMemNode': slurmMemNode, # Mem per node, computed if type 'c' '_ReqMemCPU': slurmMemCPU, # Mem per cpu, computed if type 'n' 'AveRSS': slurmmem, 'MaxRSS': slurmmem, - 'MaxRSSNode': str, - 'MaxRSSTask': str, + 'MaxRSSNode': nullstr, + 'MaxRSSTask': nullstr, 'MaxPages': int_metric, 'MaxVMSize': slurmmem, '_MemEff': slurmMemEff, # Slurm memory efficiency @@ -617,7 +629,7 @@ def calc(row): -def main(argv=sys.argv[1:], db=None, raw_sacct=None): +def main(argv=sys.argv[1:], db=None, raw_sacct=None, csv_input=None): """Parse arguments and use the other API""" parser = argparse.ArgumentParser() parser.add_argument('db', help="Database filename to create or update") @@ -641,6 +653,10 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None): "of the other history options to have any effect.") parser.add_argument('--jobs-only', action='store_true', help="Don't include job steps but only the man jobs") + parser.add_argument('--csv-input', + help="Don't parse sacct but import this CSV file. It's read with " + "Python's default csv reader (excel format). Beware badly " + "formatted inputs, for example line breaks in job names.") parser.add_argument('--quiet', '-q', action='store_true', help="Don't output anything unless errors") parser.add_argument('--verbose', '-v', action='store_true', @@ -674,7 +690,10 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None): history_start=args.history_start, history_end=args.history_end, jobs_only=args.jobs_only, - raw_sacct=raw_sacct) + raw_sacct=raw_sacct, + # --history real usage doesn't make sense with csv + # (below is just for running tests) + csv_input=csv_input) create_indexes(db) # Normal operation @@ -683,7 +702,8 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None): update=args.update, jobs_only=args.jobs_only, raw_sacct=raw_sacct, - verbose=args.verbose) + verbose=args.verbose, + csv_input=args.csv_input or csv_input) create_indexes(db) if errors: @@ -695,7 +715,7 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None): def get_history(db, sacct_filter=['-a'], history=None, history_resume=None, history_days=None, history_start=None, history_end=None, - jobs_only=False, raw_sacct=None): + jobs_only=False, raw_sacct=None, csv_input=None): """Get history for a certain period of days. Queries each day and updates the database, so as to avoid @@ -739,7 +759,7 @@ def get_history(db, sacct_filter=['-a'], LOG.debug(new_filter) LOG.info("%s %s", days_ago, start.date() if history_days is not None else start) errors += slurm2sql(db, sacct_filter=new_filter, update=True, jobs_only=jobs_only, - raw_sacct=raw_sacct) + raw_sacct=raw_sacct, csv_input=csv_input) db.commit() update_last_timestamp(db, update_time=end) start = end @@ -768,9 +788,49 @@ def create_indexes(db): db.execute('ANALYZE;') db.commit() +def sacct_iter(slurm_cols, sacct_filter, errors=[0], raw_sacct=None): + """Iterate through sacct, returning rows as dicts""" + # Read data from sacct, or interpert sacct_filter directly as + # testdata if it has the attribute 'testdata' + if raw_sacct: + # Support tests - raw lines can be put in + lines = raw_sacct + else: + # This is a real filter, read data + lines = sacct(slurm_cols, sacct_filter) + + # We don't use the csv module because the csv can be malformed. + # In particular, job name can include newlines(!). TODO: handle job + # names with newlines. + line_continuation = None + for i, rawline in enumerate(lines): + if i == 0: + # header + header = rawline.strip().split(';|;') + continue + # Handle fields that have embedded newline (JobName). If we + # have too few fields, save the line and continue. + if line_continuation: + rawline = line_continuation + rawline + line_continuation = None + line = rawline.strip().split(';|;') + if len(line) < len(slurm_cols): + line_continuation = rawline + continue + # (end) + if len(line) > len(slurm_cols): + LOG.error("Line with wrong number of columns: (want columns=%s, line has=%s)", len(slurm_cols), len(line)) + LOG.error("columns = %s", header) + LOG.error("rawline = %s", rawline) + errors[0] += 1 + continue + line = dict(zip(header, line)) + yield line + def slurm2sql(db, sacct_filter=['-a'], update=False, jobs_only=False, - raw_sacct=None, verbose=False): + raw_sacct=None, verbose=False, + csv_input=None): """Import one call of sacct to a sqlite database. db: @@ -807,7 +867,7 @@ def infer_type(cd): db.execute('CREATE TABLE IF NOT EXISTS meta_slurm_lastupdate (id INTEGER PRIMARY KEY, update_time REAL)') db.execute('CREATE VIEW IF NOT EXISTS allocations AS select * from slurm where JobStep is null;') db.execute('CREATE VIEW IF NOT EXISTS eff AS select ' - 'JobIDnostep, ' + 'JobIDnostep AS JobID, ' 'max(User) AS User, ' 'Partition, ' 'Account, ' @@ -840,62 +900,40 @@ def infer_type(cd): slurm_cols = tuple(c for c in list(columns.keys()) + COLUMNS_EXTRA if not c.startswith('_')) - # Read data from sacct, or interpert sacct_filter directly as - # testdata if it has the attribute 'testdata' - if raw_sacct is None: - # This is a real filter, read data - lines = sacct(slurm_cols, sacct_filter) + errors = [ 0 ] + if csv_input: + import collections + import csv + if not isinstance(csv_input, csv.DictReader): + csv_input = csv.DictReader(open(csv_input, 'r')) + def rows(): + for row in csv_input: + row = collections.defaultdict(str, ((k,v.strip()) for k,v in row.items())) + yield row + rows = rows() # activate the generator else: - # Support tests - raw lines can be put in - lines = raw_sacct + rows = sacct_iter(slurm_cols, sacct_filter, raw_sacct=raw_sacct, errors=errors) - # We don't use the csv module because the csv can be malformed. - # In particular, job name can include newlines(!). TODO: handle job - # names with newlines. - errors = 0 - line_continuation = None - for i, rawline in enumerate(lines): - if i == 0: - # header - header = rawline.strip().split(';|;') - continue - # Handle fields that have embedded newline (JobName). If we - # have too few fields, save the line and continue. - if line_continuation: - rawline = line_continuation + rawline - line_continuation = None - line = rawline.strip().split(';|;') - if len(line) < len(slurm_cols): - line_continuation = rawline - continue - # (end) - if len(line) > len(slurm_cols): - LOG.error("Line with wrong number of columns: (want columns=%s, line has=%s)", len(slurm_cols), len(line)) - LOG.error("columns = %s", header) - LOG.error("rawline = %s", rawline) - errors += 1 - continue - line = dict(zip(header, line)) + for i, row in enumerate(rows): # If --jobs-only, then skip all job steps (sacct updates the # mem/cpu usage on the allocation itself already) - step_id = slurmJobStep.calc(line) + step_id = slurmJobStep.calc(row) if jobs_only and step_id is not None: continue - #LOG.debug(line) - processed_line = {k.strip('_'): (columns[k](line[k]) - #if not isinstance(columns[k], type) or not issubclass(columns[k], linefunc) - if not hasattr(columns[k], 'linefunc') - else columns[k].calc(line)) - for k in columns.keys()} + #LOG.debug(row) + processed_row = {k.strip('_'): (columns[k](row[k]) + #if not isinstance(columns[k], type) or not issubclass(columns[k], linefunc) + if not hasattr(columns[k], 'linefunc') + else columns[k].calc(row)) + for k in columns.keys()} c.execute('INSERT %s INTO slurm (%s) VALUES (%s)'%( 'OR REPLACE' if update else '', - ','.join('"'+x+'"' for x in processed_line.keys()), - ','.join(['?']*len(processed_line))), - tuple(processed_line.values())) - + ','.join('"'+x+'"' for x in processed_row.keys()), + ','.join(['?']*len(processed_row))), + tuple(processed_row.values())) # Committing every so often allows other queries to succeed if i%10000 == 0: @@ -904,7 +942,7 @@ def infer_type(cd): if verbose: print('... processing row %d'%i) db.commit() - return errors + return errors[0] def update_last_timestamp(db, update_time=None): @@ -929,7 +967,10 @@ def get_last_timestamp(db): def slurm_version(cmd=['sacct', '--version']): """Return the version number of Slurm, as a tuple""" # example output: b'slurm 18.08.8\n' or slurm 19.05.7-Bull.1.0 - slurm_version = subprocess.check_output(cmd).decode() + try: + slurm_version = subprocess.check_output(cmd).decode() + except FileNotFoundError: # no sacct + return (20, 11) # lastest with a schema change slurm_version = re.match(r"slurm\s([0-9]+)\.([0-9]+)\.([0-9]+)", slurm_version) slurm_version = tuple(int(x) for x in slurm_version.groups()) return slurm_version @@ -976,6 +1017,10 @@ def sacct_cli(argv=sys.argv[1:]): help="SQL order by (arbitrary SQL expression using column names). NOT safe from SQL injection.") parser.add_argument('--completed', '-c', action='store_true', help=f"Select for completed job states ({COMPLETED_STATES}) You need to specify --starttime (-S) at some point in the past, due to how saccont default works (for example '-S now-1week'). This option automatically sets '-E now'") + parser.add_argument('--csv-input', + help="Don't parse sacct but import this CSV file. It's read with " + "Python's default csv reader (excel format). Beware badly " + "formatted inputs, for example line breaks in job names.") parser.add_argument('--quiet', '-q', action='store_true', help="Don't output anything unless errors") parser.add_argument('--verbose', '-v', action='store_true', @@ -996,7 +1041,8 @@ def sacct_cli(argv=sys.argv[1:]): db = sqlite3.connect(args.db) else: db = sqlite3.connect(':memory:') - errors = slurm2sql(db, sacct_filter=unknown_args) + errors = slurm2sql(db, sacct_filter=unknown_args, + csv_input=args.csv_input) from tabulate import tabulate @@ -1049,13 +1095,14 @@ def seff_cli(argv=sys.argv[1:]): # A single argument that looks like a jobID is used. if len(unknown_args) == 1 and re.match(r'[0-9+_]+(.[0-9a-z]+)?', unknown_args[0]): unknown_args = [f'--jobs={unknown_args[0]}'] + # Set for completed jobs. + if args.completed: + unknown_args[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}'] if args.order: order_by = f'ORDER BY {args.order}' else: order_by = '' - if args.completed: - unknown_args[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}'] LOG.debug(f'sacct args: {unknown_args}') if args.db: @@ -1096,7 +1143,7 @@ def seff_cli(argv=sys.argv[1:]): sys.exit() cur = db.execute(f"""select * from ( select - JobIDnostep AS JobID, + JobID, User, round(Elapsed/3600,2) AS hours, diff --git a/test.py b/test.py index 6dec92f..77f998d 100644 --- a/test.py +++ b/test.py @@ -1,5 +1,6 @@ # pylint: disable=redefined-outer-name +import csv import datetime import getpass from io import StringIO @@ -53,8 +54,9 @@ def slurm_version_2011(monkeypatch, slurm_version_number=(20, 11, 1)): @pytest.fixture(scope='function') def data1(slurm_version): """Test data set 1""" - lines = open('tests/test-data1.csv').read().replace('|', ';|;') - yield StringIO(lines) + #lines = open('tests/test-data1.csv').read().replace('|', ';|;') + #yield StringIO(lines) + yield csv.DictReader(open('tests/test-data1.csv'), delimiter='|') @pytest.fixture(scope='function') def data2(slurm_version_2011): @@ -62,22 +64,49 @@ def data2(slurm_version_2011): This is the same as data1, but removes the ReqGRES column (for slurm>=20.11) """ - lines = open('tests/test-data2.csv').read().replace('|', ';|;') - yield StringIO(lines) + #lines = open('tests/test-data2.csv').read().replace('|', ';|;') + #yield StringIO(lines) + yield csv.DictReader(open('tests/test-data2.csv'), delimiter='|') + +@pytest.fixture(scope='function') +def data3(slurm_version_2011): + """A CSV dataset + """ + yield 'tests/test-data3.csv' +def csvdata(data): + """Convert string CSV to a reader for s2s""" + reader = csv.DictReader(StringIO(data.strip())) + return reader + +def fetch(db, jobid, field, table='slurm'): + selector = 'JobIDSlurm' + if table == 'eff': + selector = 'JobID' + r = db.execute(f"SELECT {field} FROM {table} WHERE {selector}=?", (jobid,)) + return r.fetchone()[0] + # # Tests # def test_slurm2sql_basic(db, data1): - slurm2sql.slurm2sql(db, sacct_filter=[], raw_sacct=data1) + slurm2sql.slurm2sql(db, sacct_filter=[], csv_input=data1) r = db.execute("SELECT JobName, Start " "FROM slurm WHERE JobID=43974388;").fetchone() assert r[0] == 'spawner-jupyterhub' assert r[1] == 1564601354 +def test_csv(db, data3): + slurm2sql.slurm2sql(db, sacct_filter=[], csv_input=data3) + r = db.execute("SELECT JobName, Start " + "FROM slurm WHERE JobID=1;").fetchone() + print(r) + assert r[0] == 'job1' + assert r[1] == 3600 + def test_main(db, data1): - slurm2sql.main(['dummy'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy'], csv_input=data1, db=db) r = db.execute("SELECT JobName, Start " "FROM slurm WHERE JobID=43974388;").fetchone() assert r[0] == 'spawner-jupyterhub' @@ -86,25 +115,25 @@ def test_main(db, data1): def test_jobs_only(db, data1): """--jobs-only gives two rows""" - slurm2sql.main(['dummy', '--jobs-only'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy', '--jobs-only'], csv_input=data1, db=db) assert db.execute("SELECT count(*) from slurm;").fetchone()[0] == 2 def test_verbose(db, data1, caplog): - slurm2sql.main(['dummy', '--history-days=1', '-v'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy', '--history-days=1', '-v'], csv_input=data1, db=db) assert time.strftime("%Y-%m-%d") in caplog.text def test_quiet(db, data1, caplog, capfd): - slurm2sql.main(['dummy', '-q'], raw_sacct=data1, db=db) - slurm2sql.main(['dummy', '--history=1-5', '-q'], raw_sacct=data1, db=db) - slurm2sql.main(['dummy', '--history-days=1', '-q'], raw_sacct=data1, db=db) - slurm2sql.main(['dummy', '--history-start=2019-01-01', '-q'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy', '-q'], csv_input=data1, db=db) + slurm2sql.main(['dummy', '--history=1-5', '-q'], csv_input=data1, db=db) + slurm2sql.main(['dummy', '--history-days=1', '-q'], csv_input=data1, db=db) + slurm2sql.main(['dummy', '--history-start=2019-01-01', '-q'], csv_input=data1, db=db) #assert caplog.text == "" captured = capfd.readouterr() assert captured.out == "" assert captured.err == "" def test_time(db, data1): - slurm2sql.main(['dummy'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy'], csv_input=data1, db=db) r = db.execute("SELECT Time FROM slurm WHERE JobID=43974388;").fetchone()[0] assert r == unixtime('2019-08-01T02:02:39') # Submit defined, Start defined, End='Unknown' --> timestamp should be "now" @@ -115,10 +144,24 @@ def test_time(db, data1): assert r == unixtime('2019-08-01T00:35:27') def test_queuetime(db, data1): - slurm2sql.main(['dummy'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy'], csv_input=data1, db=db) r = db.execute("SELECT QueueTime FROM slurm WHERE JobID=43974388;").fetchone()[0] assert r == 1 +# +# Test different fields +# +def test_cpueff(db): + data = """ + JobID,CPUTime,TotalCPU + 1,50:00,25:00 + """ + slurm2sql.slurm2sql(db, [], csv_input=csvdata(data)) + print(db.execute('select * from eff;').fetchall()) + assert fetch(db, 1, 'CPUTime') == 3000 + assert fetch(db, 1, 'TotalCPU') == 1500 + assert fetch(db, 1, 'CPUeff', table='eff') == 0.5 + # # Test command line # @@ -206,22 +249,22 @@ def test_history_last_timestamp(db, slurm_version): def test_history_resume_basic(db, data1): """Test --history-resume""" # Run it once. Is the update_time approximately now? - slurm2sql.main(['dummy', '--history-days=1'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy', '--history-days=1'], csv_input=data1, db=db) update_time = slurm2sql.get_last_timestamp(db) assert abs(update_time - time.time()) < 5 # Wait 1s, is update time different? time.sleep(1.1) - slurm2sql.main(['dummy', '--history-resume'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy', '--history-resume'], csv_input=data1, db=db) assert update_time != slurm2sql.get_last_timestamp(db) def test_history_resume_timestamp(db, data1, caplog): """Test --history-resume's exact timestamp""" # Run once to get an update_time - slurm2sql.main(['dummy', '--history-days=1'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy', '--history-days=1'], csv_input=data1, db=db) update_time = slurm2sql.get_last_timestamp(db) caplog.clear() # Run again and make sure that we filter based on that update_time - slurm2sql.main(['dummy', '--history-resume'], raw_sacct=data1, db=db) + slurm2sql.main(['dummy', '--history-resume'], csv_input=data1, db=db) assert slurm2sql.slurm_timestamp(update_time) in caplog.text @pytest.mark.parametrize( diff --git a/tests/test-data3.csv b/tests/test-data3.csv new file mode 100644 index 0000000..cc37bd8 --- /dev/null +++ b/tests/test-data3.csv @@ -0,0 +1,2 @@ +JobIDRaw,JobName,ReqTRES,Start +1,job1,cpu=1,1970-01-01T03:00:00