Skip to content

Commit

Permalink
Add tests of {seff,sacct} and other fixes to go along with that
Browse files Browse the repository at this point in the history
  • Loading branch information
rkdarst committed Aug 11, 2024
1 parent 3e681f4 commit 69dbebc
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 33 deletions.
85 changes: 52 additions & 33 deletions slurm2sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,21 +172,24 @@ class linefunc(object):


# Generic extractor-generator function.
def ExtractField(name, columnname, fieldname, type_):
def ExtractField(name, columnname, fieldname, type_, wrap=None):
"""Extract a field out of a column, in the TRES/GRSE column formats.
Example: 'gres/gpuutil'
"""

_re = re.compile(rf'{fieldname}[=]([^,]*)')
_re = re.compile(rf'\b{fieldname}=([^,]*)\b')
@staticmethod
def calc(row):
if columnname not in row: return None
# Slurm 20.11 uses gres= within ReqTRES (instead of ReqGRES)
#print(row[columnname])
m = _re.search(row[columnname])
if m:
return type_(m.group(1))
val = type_(m.group(1))
if wrap:
val = wrap(val)
return val

return type(name, (linefunc,), {'calc': calc, 'type': type_.type})

Expand Down Expand Up @@ -467,6 +470,8 @@ class slurmCPUEff(linefunc):
type = 'real'
@staticmethod
def calc(row):
if not ('Elapsed' in row and 'TotalCPU' in row and 'NCPUS' in row):
return
walltime = slurmtime(row['Elapsed'])
if not walltime: return None
try:
Expand Down Expand Up @@ -564,6 +569,8 @@ def calc(row):
'NTasks': nullint,
#'AllocGRES'
'AllocTRES': nullstr,
'TRESUsageInTot': nullstr,
'TRESUsageOutTot': nullstr,

# CPU related
'NCPUS': nullint, # === AllocCPUS
Expand Down Expand Up @@ -606,8 +613,8 @@ def calc(row):
#'_GPUMem': slurmGPUMem, # GPU mem extracted from comment field
#'_GPUEff': slurmGPUEff, # GPU utilization (0.0 to 1.0) extracted from comment field
#'_NGPU': slurmGPUCount, # Number of GPUs, extracted from comment field
'_NGpus': ExtractField('NGpu', 'AllocTRES', 'gres/gpu', float_metric),
'_GpuUtil': ExtractField('GpuUtil', 'TRESUsageInAve', 'gres/gpuutil', float_metric),
'_NGpus': ExtractField('NGpus', 'AllocTRES', 'gres/gpu', float_metric),
'_GpuUtil': ExtractField('GpuUtil', 'TRESUsageInAve', 'gres/gpuutil', float_metric, wrap=lambda x: x/100.),
'_GpuMem': ExtractField('GpuMem2', 'TRESUsageInAve', 'gres/gpumem', float_metric),
'_GpuUtilTot': ExtractField('GpuUtilTot', 'TRESUsageInTot', 'gres/gpuutil', float_metric),
'_GpuMemTot': ExtractField('GpuMemTot', 'TRESUsageInTot', 'gres/gpumem', float_metric),
Expand All @@ -618,20 +625,15 @@ def calc(row):
'JobIDRaw',
'ConsumedEnergyRaw',
'TRESUsageInAve',
'TRESUsageInTot',
'TRESUsageOutTot',
]
]



def main(argv=sys.argv[1:], db=None, raw_sacct=None, csv_input=None):
"""Parse arguments and use the other API"""
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(usage='slurm2sql DBNAME [other args] [SACCT_FILTER]')
parser.add_argument('db', help="Database filename to create or update")
parser.add_argument('sacct_filter', nargs='*',
help="sacct options to filter jobs. For example, one "
"would usually give '-a' or '-S 2019-08-01' "
"here, for example")
parser.add_argument('--update', '-u', action='store_true',
help="If given, don't delete existing database and "
"instead insert or update rows")
Expand All @@ -652,27 +654,29 @@ def main(argv=sys.argv[1:], db=None, raw_sacct=None, csv_input=None):
help="Don't parse sacct but import this CSV file. It's read with "
"Python's default csv reader (excel format). Beware badly "
"formatted inputs, for example line breaks in job names.")
parser.add_argument('--completed', '-c', action='store_true',
help=f"Select for completed job states ({COMPLETED_STATES}) You need to specify --starttime (-S) at some point in the past, due to how saccont default works (for example '-S now-1week'). This option automatically sets '-E now'")
parser.add_argument('--quiet', '-q', action='store_true',
help="Don't output anything unless errors")
parser.add_argument('--verbose', '-v', action='store_true',
help="Output more logging info")
args = parser.parse_args(argv)
args, sacct_filter = parser.parse_known_args(argv)

if args.verbose:
logging.lastResort.setLevel(logging.DEBUG)
if args.quiet:
logging.lastResort.setLevel(logging.WARN)
LOG.debug(args)

sacct_filter = process_sacct_filter(args, sacct_filter)

# db is only given as an argument in tests (normally)
if db is None:
# Delete existing database unless --update/-u is given
if not (args.update or args.history_resume) and os.path.exists(args.db):
os.unlink(args.db)
db = sqlite3.connect(args.db)

sacct_filter = args.sacct_filter

# If --history-days, get just this many days history
if (args.history is not None
or args.history_resume
Expand Down Expand Up @@ -784,6 +788,7 @@ def create_indexes(db):
db.execute('ANALYZE;')
db.commit()


def sacct_iter(slurm_cols, sacct_filter, errors=[0], raw_sacct=None):
"""Iterate through sacct, returning rows as dicts"""
# Read data from sacct, or interpert sacct_filter directly as
Expand Down Expand Up @@ -877,7 +882,7 @@ def infer_type(cd):
'max(MaxRSS) / max(ReqMemNode) AS MemEff, '
'max(NGpus) AS NGpus, '
'max(NGpus)*max(Elapsed) AS gpu_s_reserved, '
'max(GPUutil)/100. AS GPUeff, ' # Individual job with highest use (check this)
'max(GPUutil) AS GPUeff, ' # Individual job with highest use (check this)
'max(GPUMem) AS GPUMem, '
'MaxDiskRead, '
'MaxDiskWrite, '
Expand Down Expand Up @@ -935,6 +940,21 @@ def rows():
return errors[0]


def process_sacct_filter(args, sacct_filter):
"""Generate sacct filter args in a standard way
For example adding a --completed argument that translates into
different sacct arguments.
"""
# A single argument that looks like a jobID is used.
if len(sacct_filter) == 1 and re.match(r'[0-9+_]+(.[0-9a-z]+)?', sacct_filter[0]):
sacct_filter = [f'--jobs={sacct_filter[0]}']
# Set for completed jobs.
if getattr(args, 'completed', None):
sacct_filter[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}']
return sacct_filter


def update_last_timestamp(db, update_time=None):
"""Update the last update time in the database, for resuming.
Expand Down Expand Up @@ -987,7 +1007,7 @@ def compact_table():

SACCT_DEFAULT_FIELDS = 'JobID,User,State,Start,End,Partition,ExitCodeRaw,NodeList,NCPUS,CPUtime,CPUEff,ReqMem,MaxRSS,ReqGPUS,GPUUtil,TotDiskRead,TotDiskWrite,ReqTRES,AllocTRES,TRESUsageInTot,TRESUsageOutTot'
COMPLETED_STATES = 'CA,CD,DL,F,NF,OOM,PR,RV,TO'
def sacct_cli(argv=sys.argv[1:]):
def sacct_cli(argv=sys.argv[1:], csv_input=None):
"""A command line that uses slurm2sql to give an sacct-like interface."""
parser = argparse.ArgumentParser(description=
"All unknown arguments get passed to sacct to fetch data."
Expand Down Expand Up @@ -1015,24 +1035,23 @@ def sacct_cli(argv=sys.argv[1:]):
help="Don't output anything unless errors")
parser.add_argument('--verbose', '-v', action='store_true',
help="Output more logging info")
args, unknown_args = parser.parse_known_args(argv)
args, sacct_filter = parser.parse_known_args(argv)

if args.verbose:
logging.lastResort.setLevel(logging.DEBUG)
if args.quiet:
logging.lastResort.setLevel(logging.WARN)
LOG.debug(args)

if args.completed:
unknown_args[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}']
sacct_filter = process_sacct_filter(args, sacct_filter)

LOG.debug(f'sacct args: {unknown_args}')
LOG.debug(f'sacct args: {sacct_filter}')
if args.db:
db = sqlite3.connect(args.db)
else:
db = sqlite3.connect(':memory:')
errors = slurm2sql(db, sacct_filter=unknown_args,
csv_input=args.csv_input)
errors = slurm2sql(db, sacct_filter=sacct_filter,
csv_input=args.csv_input or csv_input)

from tabulate import tabulate

Expand All @@ -1041,7 +1060,7 @@ def sacct_cli(argv=sys.argv[1:]):
print(tabulate(cur, headers=headers, tablefmt=args.format))


def seff_cli(argv=sys.argv[1:]):
def seff_cli(argv=sys.argv[1:], csv_input=None):
parser = argparse.ArgumentParser(usage=
"slurm2sql-seff [-h] [--order ORDER] [--completed --starttime TIME] SACCT_ARGS",
description=
Expand Down Expand Up @@ -1072,36 +1091,36 @@ def seff_cli(argv=sys.argv[1:]):
help="SQL order by (arbitrary SQL expression using column names). NOT safe from SQL injection.")
parser.add_argument('--completed', '-c', action='store_true',
help=f"Select for completed job states ({COMPLETED_STATES}) You need to specify --starttime (-S) at some point in the past, due to how saccont default works (for example '-S now-1week'). This option automatically sets '-E now'.")
parser.add_argument('--csv-input',
help="Don't parse sacct but import this CSV file. It's read with "
"Python's default csv reader (excel format). Beware badly "
"formatted inputs, for example line breaks in job names.")
parser.add_argument('--quiet', '-q', action='store_true',
help="Don't output anything unless errors")
parser.add_argument('--verbose', '-v', action='store_true',
help="Output more logging info")
args, unknown_args = parser.parse_known_args(argv)
args, sacct_filter = parser.parse_known_args(argv)

if args.verbose:
logging.lastResort.setLevel(logging.DEBUG)
if args.quiet:
logging.lastResort.setLevel(logging.WARN)
LOG.debug(args)

# A single argument that looks like a jobID is used.
if len(unknown_args) == 1 and re.match(r'[0-9+_]+(.[0-9a-z]+)?', unknown_args[0]):
unknown_args = [f'--jobs={unknown_args[0]}']
# Set for completed jobs.
if args.completed:
unknown_args[:0] = ['--endtime=now', f'--state={COMPLETED_STATES}']
sacct_filter = process_sacct_filter(args, sacct_filter)

if args.order:
order_by = f'ORDER BY {args.order}'
else:
order_by = ''

LOG.debug(f'sacct args: {unknown_args}')
LOG.debug(f'sacct args: {sacct_filter}')
if args.db:
db = sqlite3.connect(args.db)
else:
db = sqlite3.connect(':memory:')
errors = slurm2sql(db, sacct_filter=unknown_args)
errors = slurm2sql(db, sacct_filter=sacct_filter,
csv_input=args.csv_input or csv_input)

from tabulate import tabulate

Expand Down
60 changes: 60 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ def test_cpueff(db):
assert fetch(db, 1, 'TotalCPU') == 1500
assert fetch(db, 1, 'CPUeff', table='eff') == 0.5

def test_gpueff(db):
data = """
JobID,TRESUsageInAve
1,gres/gpuutil=23
"""
slurm2sql.slurm2sql(db, [], csv_input=csvdata(data))
print(db.execute('select * from eff;').fetchall())
assert fetch(db, 1, 'GPUEff', table='eff') == 0.23


#
# Test command line
#
Expand Down Expand Up @@ -191,6 +201,56 @@ def test_cmdline_history(dbfile):
os.system('python3 slurm2sql.py --history=2-10 %s --'%dbfile)
sqlite3.connect(dbfile).execute('SELECT JobName from slurm;')

#
# slurm2sql-sacct
#
def test_sacct(db, capsys):
data = """
JobID,CPUTime,TotalCPU
111,50:00,25:00
"""
slurm2sql.sacct_cli(argv=[], csv_input=csvdata(data))
captured = capsys.readouterr()
assert '111' in captured.out
assert str(50*60) in captured.out # cputime

#
# slurm2sql-seff
#
def test_seff(db, capsys):
data = """
JobID,End,CPUTime,TotalCPU
111,1970-01-01T00:00:00,50:00,25:00
111.2,,,25:00
"""
slurm2sql.seff_cli(argv=[], csv_input=csvdata(data))
captured = capsys.readouterr()
assert '111' in captured.out
assert '50%' in captured.out

def test_seff_mem(db, capsys):
data = """
JobID,End,ReqMem,MaxRSS
111,1970-01-01T00:00:00,10G,
111.2,,,8G
"""
slurm2sql.seff_cli(argv=[], csv_input=csvdata(data))
captured = capsys.readouterr()
assert '111' in captured.out
assert '80%' in captured.out

def test_seff_gpu(db, capsys):
data = """
JobID,End,Elapsed,TotalCPU,NCPUS,AllocTRES,TRESUsageInAve
111,1970-01-01T00:00:00,,1,1,,
111.2,1970-01-01T00:00:00,100,1,1,gres/gpu=1,gres/gpuutil=23
"""
slurm2sql.seff_cli(argv=[], csv_input=csvdata(data))
captured = capsys.readouterr()
print(captured)
assert '111' in captured.out
assert '23%' in captured.out


#
# Misc function tests
Expand Down

0 comments on commit 69dbebc

Please sign in to comment.