diff --git a/slurm2sql.py b/slurm2sql.py index 31a87e1..11917ef 100644 --- a/slurm2sql.py +++ b/slurm2sql.py @@ -172,13 +172,13 @@ class linefunc(object): # Generic extractor-generator function. -def ExtractField(name, columnname, fieldname, type_): +def ExtractField(name, columnname, fieldname, type_, wrap=None): """Extract a field out of a column, in the TRES/GRSE column formats. Example: 'gres/gpuutil' """ - _re = re.compile(rf'{fieldname}[=]([^,]*)') + _re = re.compile(rf'\b{fieldname}=([^,]*)\b') @staticmethod def calc(row): if columnname not in row: return None @@ -186,7 +186,10 @@ def calc(row): #print(row[columnname]) m = _re.search(row[columnname]) if m: - return type_(m.group(1)) + val = type_(m.group(1)) + if wrap: + val = wrap(val) + return val return type(name, (linefunc,), {'calc': calc, 'type': type_.type}) @@ -467,6 +470,8 @@ class slurmCPUEff(linefunc): type = 'real' @staticmethod def calc(row): + if not ('Elapsed' in row and 'TotalCPU' in row and 'NCPUS' in row): + return walltime = slurmtime(row['Elapsed']) if not walltime: return None try: @@ -564,6 +569,8 @@ def calc(row): 'NTasks': nullint, #'AllocGRES' 'AllocTRES': nullstr, + 'TRESUsageInTot': nullstr, + 'TRESUsageOutTot': nullstr, # CPU related 'NCPUS': nullint, # === AllocCPUS @@ -606,8 +613,8 @@ def calc(row): #'_GPUMem': slurmGPUMem, # GPU mem extracted from comment field #'_GPUEff': slurmGPUEff, # GPU utilization (0.0 to 1.0) extracted from comment field #'_NGPU': slurmGPUCount, # Number of GPUs, extracted from comment field - '_NGpus': ExtractField('NGpu', 'AllocTRES', 'gres/gpu', float_metric), - '_GpuUtil': ExtractField('GpuUtil', 'TRESUsageInAve', 'gres/gpuutil', float_metric), + '_NGpus': ExtractField('NGpus', 'AllocTRES', 'gres/gpu', float_metric), + '_GpuUtil': ExtractField('GpuUtil', 'TRESUsageInAve', 'gres/gpuutil', float_metric, wrap=lambda x: x/100.), '_GpuMem': ExtractField('GpuMem2', 'TRESUsageInAve', 'gres/gpumem', float_metric), '_GpuUtilTot': ExtractField('GpuUtilTot', 'TRESUsageInTot', 'gres/gpuutil', float_metric), '_GpuMemTot': ExtractField('GpuMemTot', 'TRESUsageInTot', 'gres/gpumem', float_metric), @@ -618,20 +625,15 @@ def calc(row): 'JobIDRaw', 'ConsumedEnergyRaw', 'TRESUsageInAve', - 'TRESUsageInTot', 'TRESUsageOutTot', - ] + ] def main(argv=sys.argv[1:], db=None, raw_sacct=None, csv_input=None): """Parse arguments and use the other API""" - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(usage='slurm2sql DBNAME [other args] [SACCT_FILTER]') parser.add_argument('db', help="Database filename to create or update") - parser.add_argument('sacct_filter', nargs='*', - help="sacct options to filter jobs. For example, one " - "would usually give '-a' or '-S 2019-08-01' " - "here, for example") parser.add_argument('--update', '-u', action='store_true', help="If given, don't delete existing database and " "instead insert or update rows") @@ -652,11 +654,13 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None, csv_input=None): help="Don't parse sacct but import this CSV file. It's read with " "Python's default csv reader (excel format). Beware badly " "formatted inputs, for example line breaks in job names.") + parser.add_argument('--completed', '-c', action='store_true', + help=f"Select for completed job states ({COMPLETED_STATES}) You need to specify --starttime (-S) at some point in the past, due to how saccont default works (for example '-S now-1week'). This option automatically sets '-E now'") parser.add_argument('--quiet', '-q', action='store_true', help="Don't output anything unless errors") parser.add_argument('--verbose', '-v', action='store_true', help="Output more logging info") - args = parser.parse_args(argv) + args, sacct_filter = parser.parse_known_args(argv) if args.verbose: logging.lastResort.setLevel(logging.DEBUG) @@ -664,6 +668,8 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None, csv_input=None): logging.lastResort.setLevel(logging.WARN) LOG.debug(args) + sacct_filter = process_sacct_filter(args, sacct_filter) + # db is only given as an argument in tests (normally) if db is None: # Delete existing database unless --update/-u is given @@ -671,8 +677,6 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None, csv_input=None): os.unlink(args.db) db = sqlite3.connect(args.db) - sacct_filter = args.sacct_filter - # If --history-days, get just this many days history if (args.history is not None or args.history_resume @@ -784,6 +788,7 @@ def create_indexes(db): db.execute('ANALYZE;') db.commit() + def sacct_iter(slurm_cols, sacct_filter, errors=[0], raw_sacct=None): """Iterate through sacct, returning rows as dicts""" # Read data from sacct, or interpert sacct_filter directly as @@ -877,7 +882,7 @@ def infer_type(cd): 'max(MaxRSS) / max(ReqMemNode) AS MemEff, ' 'max(NGpus) AS NGpus, ' 'max(NGpus)*max(Elapsed) AS gpu_s_reserved, ' - 'max(GPUutil)/100. AS GPUeff, ' # Individual job with highest use (check this) + 'max(GPUutil) AS GPUeff, ' # Individual job with highest use (check this) 'max(GPUMem) AS GPUMem, ' 'MaxDiskRead, ' 'MaxDiskWrite, ' @@ -935,6 +940,21 @@ def rows(): return errors[0] +def process_sacct_filter(args, sacct_filter): + """Generate sacct filter args in a standard way + + For example adding a --completed argument that translates into + different sacct arguments. + """ + # A single argument that looks like a jobID is used. + if len(sacct_filter) == 1 and re.match(r'[0-9+_]+(.[0-9a-z]+)?', sacct_filter[0]): + sacct_filter = [f'--jobs={sacct_filter[0]}'] + # Set for completed jobs. + if getattr(args, 'completed', None): + sacct_filter[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}'] + return sacct_filter + + def update_last_timestamp(db, update_time=None): """Update the last update time in the database, for resuming. @@ -987,7 +1007,7 @@ def compact_table(): SACCT_DEFAULT_FIELDS = 'JobID,User,State,Start,End,Partition,ExitCodeRaw,NodeList,NCPUS,CPUtime,CPUEff,ReqMem,MaxRSS,ReqGPUS,GPUUtil,TotDiskRead,TotDiskWrite,ReqTRES,AllocTRES,TRESUsageInTot,TRESUsageOutTot' COMPLETED_STATES = 'CA,CD,DL,F,NF,OOM,PR,RV,TO' -def sacct_cli(argv=sys.argv[1:]): +def sacct_cli(argv=sys.argv[1:], csv_input=None): """A command line that uses slurm2sql to give an sacct-like interface.""" parser = argparse.ArgumentParser(description= "All unknown arguments get passed to sacct to fetch data." @@ -1015,7 +1035,7 @@ def sacct_cli(argv=sys.argv[1:]): help="Don't output anything unless errors") parser.add_argument('--verbose', '-v', action='store_true', help="Output more logging info") - args, unknown_args = parser.parse_known_args(argv) + args, sacct_filter = parser.parse_known_args(argv) if args.verbose: logging.lastResort.setLevel(logging.DEBUG) @@ -1023,16 +1043,15 @@ def sacct_cli(argv=sys.argv[1:]): logging.lastResort.setLevel(logging.WARN) LOG.debug(args) - if args.completed: - unknown_args[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}'] + sacct_filter = process_sacct_filter(args, sacct_filter) - LOG.debug(f'sacct args: {unknown_args}') + LOG.debug(f'sacct args: {sacct_filter}') if args.db: db = sqlite3.connect(args.db) else: db = sqlite3.connect(':memory:') - errors = slurm2sql(db, sacct_filter=unknown_args, - csv_input=args.csv_input) + errors = slurm2sql(db, sacct_filter=sacct_filter, + csv_input=args.csv_input or csv_input) from tabulate import tabulate @@ -1041,7 +1060,7 @@ def sacct_cli(argv=sys.argv[1:]): print(tabulate(cur, headers=headers, tablefmt=args.format)) -def seff_cli(argv=sys.argv[1:]): +def seff_cli(argv=sys.argv[1:], csv_input=None): parser = argparse.ArgumentParser(usage= "slurm2sql-seff [-h] [--order ORDER] [--completed --starttime TIME] SACCT_ARGS", description= @@ -1072,11 +1091,15 @@ def seff_cli(argv=sys.argv[1:]): help="SQL order by (arbitrary SQL expression using column names). NOT safe from SQL injection.") parser.add_argument('--completed', '-c', action='store_true', help=f"Select for completed job states ({COMPLETED_STATES}) You need to specify --starttime (-S) at some point in the past, due to how saccont default works (for example '-S now-1week'). This option automatically sets '-E now'.") + parser.add_argument('--csv-input', + help="Don't parse sacct but import this CSV file. It's read with " + "Python's default csv reader (excel format). Beware badly " + "formatted inputs, for example line breaks in job names.") parser.add_argument('--quiet', '-q', action='store_true', help="Don't output anything unless errors") parser.add_argument('--verbose', '-v', action='store_true', help="Output more logging info") - args, unknown_args = parser.parse_known_args(argv) + args, sacct_filter = parser.parse_known_args(argv) if args.verbose: logging.lastResort.setLevel(logging.DEBUG) @@ -1084,24 +1107,20 @@ def seff_cli(argv=sys.argv[1:]): logging.lastResort.setLevel(logging.WARN) LOG.debug(args) - # A single argument that looks like a jobID is used. - if len(unknown_args) == 1 and re.match(r'[0-9+_]+(.[0-9a-z]+)?', unknown_args[0]): - unknown_args = [f'--jobs={unknown_args[0]}'] - # Set for completed jobs. - if args.completed: - unknown_args[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}'] + sacct_filter = process_sacct_filter(args, sacct_filter) if args.order: order_by = f'ORDER BY {args.order}' else: order_by = '' - LOG.debug(f'sacct args: {unknown_args}') + LOG.debug(f'sacct args: {sacct_filter}') if args.db: db = sqlite3.connect(args.db) else: db = sqlite3.connect(':memory:') - errors = slurm2sql(db, sacct_filter=unknown_args) + errors = slurm2sql(db, sacct_filter=sacct_filter, + csv_input=args.csv_input or csv_input) from tabulate import tabulate diff --git a/test.py b/test.py index 83fe2e4..9081f27 100644 --- a/test.py +++ b/test.py @@ -162,6 +162,16 @@ def test_cpueff(db): assert fetch(db, 1, 'TotalCPU') == 1500 assert fetch(db, 1, 'CPUeff', table='eff') == 0.5 +def test_gpueff(db): + data = """ + JobID,TRESUsageInAve + 1,gres/gpuutil=23 + """ + slurm2sql.slurm2sql(db, [], csv_input=csvdata(data)) + print(db.execute('select * from eff;').fetchall()) + assert fetch(db, 1, 'GPUEff', table='eff') == 0.23 + + # # Test command line # @@ -191,6 +201,56 @@ def test_cmdline_history(dbfile): os.system('python3 slurm2sql.py --history=2-10 %s --'%dbfile) sqlite3.connect(dbfile).execute('SELECT JobName from slurm;') +# +# slurm2sql-sacct +# +def test_sacct(db, capsys): + data = """ + JobID,CPUTime,TotalCPU + 111,50:00,25:00 + """ + slurm2sql.sacct_cli(argv=[], csv_input=csvdata(data)) + captured = capsys.readouterr() + assert '111' in captured.out + assert str(50*60) in captured.out # cputime + +# +# slurm2sql-seff +# +def test_seff(db, capsys): + data = """ + JobID,End,CPUTime,TotalCPU + 111,1970-01-01T00:00:00,50:00,25:00 + 111.2,,,25:00 + """ + slurm2sql.seff_cli(argv=[], csv_input=csvdata(data)) + captured = capsys.readouterr() + assert '111' in captured.out + assert '50%' in captured.out + +def test_seff_mem(db, capsys): + data = """ + JobID,End,ReqMem,MaxRSS + 111,1970-01-01T00:00:00,10G, + 111.2,,,8G + """ + slurm2sql.seff_cli(argv=[], csv_input=csvdata(data)) + captured = capsys.readouterr() + assert '111' in captured.out + assert '80%' in captured.out + +def test_seff_gpu(db, capsys): + data = """ + JobID,End,Elapsed,TotalCPU,NCPUS,AllocTRES,TRESUsageInAve + 111,1970-01-01T00:00:00,,1,1,, + 111.2,1970-01-01T00:00:00,100,1,1,gres/gpu=1,gres/gpuutil=23 + """ + slurm2sql.seff_cli(argv=[], csv_input=csvdata(data)) + captured = capsys.readouterr() + print(captured) + assert '111' in captured.out + assert '23%' in captured.out + # # Misc function tests