diff --git a/ph5/core/pmonitor.py b/ph5/core/pmonitor.py index 873c3568..dde12d11 100755 --- a/ph5/core/pmonitor.py +++ b/ph5/core/pmonitor.py @@ -562,13 +562,20 @@ def get_len(f): return num_lines + def init_fio(f, d): from multiprocessing import cpu_count fio = pforma_io.FormaIO(infile=f, outdir=d) - if cpu_count() > 3: - fio.set_nmini(cpu_count() + 1) + + cores = cpu_count() + # Use at most half the cores worth of families; + # real clamp happens in set_nmini + if cores > 1: + requested = max(1, cores // 2) else: - fio.set_nmini(cpu_count()) + requested = 1 + + fio.set_nmini(requested) fio.initialize_ph5() diff --git a/ph5/utilities/pforma_io.py b/ph5/utilities/pforma_io.py index 1605e53d..de48bb91 100755 --- a/ph5/utilities/pforma_io.py +++ b/ph5/utilities/pforma_io.py @@ -14,6 +14,11 @@ from zlib import adler32 import re +try: + import psutil as _psutil +except Exception: + _psutil = None + from ph5.core import segdreader_smartsolo PROG_VERSION = '2024.249' @@ -30,6 +35,9 @@ 'nodal': 'segdtoph5', 'seg2': 'seg2toph5'} ON_POSIX = 'posix' in sys.builtin_module_names +MAX_INDEX = 999 + +# hard upper bound on how many PH5 famillies can be processed in parallel class FormaIOError(exceptions.Exception): @@ -52,8 +60,10 @@ class FormaIO(): 'WW', 'XX', 'YY', 'ZZ', 'AAA', 'BBB', 'CCC', 'DDD', 'EEE', 'FFF', 'GGG', 'HHH', 'III', 'JJJ', 'LLL', 'MMM') - def __init__(self, infile=None, outdir=None, main_window=None): + def __init__(self, infile=None, outdir=None, main_window=None, + max_families=None): self.main_window = main_window + self.max_families = max_families # limit max parallel families self.infile = infile # Input file (list of raw files) self.infh = None # File handle for infile self.raw_files = {} # Raw files organized by type @@ -71,7 +81,7 @@ def __init__(self, infile=None, outdir=None, main_window=None): self.M = int(self.cfg['M']) if self.cfg and 'N' in self.cfg: - self.nmini = FormaIO.MINIS[0:self.cfg['N']] + self.nmini = FormaIO.MINIS[0:self.cfg['N'] - 1] else: self.nmini = FormaIO.MINIS[0:4] @@ -118,9 +128,56 @@ def set_nmini(self, n): ''' Set the self.nmini list of PH5 families from n and FormaIO.MINIS. Use value for N from pforma.cfg if it exists. + + Clamps n using: + - self.max_families (from GUI), if provided + - CPU core count (don't oversubcribe) ''' - if 'N' not in self.cfg: - self.nmini = FormaIO.MINIS[0:n] + # Respect project config if already fixed: + if 'N' in self.cfg: + return + + try: + n = int(n) + except Exception: + n = 1 + + # 1) GUI override, if provided: + gui_limit = getattr(self, 'max_families', None) + if gui_limit is not None: + try: + gui_limit = int(gui_limit) + except Exception: + gui_limit = None + + # 3) Core-based limit: + cores = None + if _psutil is not None: + try: + cores = _psutil.cpu_count(logical=False) or \ + _psutil.cpu_count() + except Exception: + cores = None + + safe_max = None + if cores: + safe_max = max(1, cores - 1) + + # If GUI set a limit, it wins save_max: + if gui_limit is not None: + safe_max = gui_limit if safe_max is None else min(safe_max, + gui_limit) + # If everything failed, just use requested n: + if safe_max is None: + safe_max = n + # not exceed requested n: + safe_n = max(1, min(n, safe_max)) + + if safe_n != n: + LOGGER.info("Requested {0} PH5 families, limiting to {1} " + "(GUI / env / cores).".format(n, safe_n)) + + self.nmini = FormaIO.MINIS[0:safe_n] def initialize_ph5(self): ''' Set up processing directory structure and set M from @@ -158,16 +215,6 @@ def initialize_ph5(self): os.chdir(self.whereami) - def set_M(self, m): - ''' - Set self.M, the number of mini files in each family. - ''' - try: - self.M = int(m) - except Exception as e: - raise FormaIOError( - errno=10, msg="Failed to set M: {0}".format(e.message)) - def run_simple(self, cmds, x, family): ''' Run a single command in a subprocess. Line buffer output. @@ -229,179 +276,138 @@ def run_cmds(self, cmds, x=0, ems=None): # # Should this be implemented as a closure? # + def build_cmd(self, tp, clprog, list_path, mini_index): + # One mini per DAS, at mini_index + if tp == 'nodal': + if self.UTM: + cmd = ( + "{prog} -n master.ph5 -f {lst} " + "-M 1 -U {utm} -S {S} -c {combine} 2>&1" + ).format( + prog=clprog, + lst=list_path, + utm=self.UTM, + S=mini_index, + combine=self.COMBINE + ) + elif self.TSPF: + cmd = ( + "{prog} -n master.ph5 -f {lst} " + "-M 1 -T -S {S} -c {combine} 2>&1" + ).format( + prog=clprog, + lst=list_path, + S=mini_index, + combine=self.COMBINE + ) + else: + cmd = ( + "{prog} -n master.ph5 -f {lst} " + "-M 1 -S {S} -c {combine} 2>&1" + ).format( + prog=clprog, + lst=list_path, + S=mini_index, + combine=self.COMBINE + ) + else: + # texan, rt-130, seg2 + extra = "" + if tp == 'texan': + extra = " --overide" + cmd = ( + "{prog} -n master.ph5 -f {lst} " + "-M 1 -S {S}{extra} 2>&1" + ).format( + prog=clprog, + lst=list_path, + S=mini_index, + extra=extra + ) + return cmd def run(self, runit=True): - ''' Run processes to convert raw files to ph5 - runit -> If true, execute processes otherwise only return list of - commands to execute. + '''Run processes to convert raw files to ph5. + + Logic: + - A mini (index) can contain multiple DAS. + - A DAS is written to exactly one mini (no splitting). + - max_index (self.M) is the maximum mini index we use. + - DAS are assigned to families round-robin over self.nmini. + - Minis are assigned round-robin 1..max_index, then wrap. ''' - def split(ts): - ''' Split up lists of raw files for processing. - Key by mini ph5 family name. - ''' - ret = {} # Files to load - tot = {} # Total raw size per family - # Initialize - for m in self.nmini: - ret[m] = {} - tot[m] = 0 - - # Check to see if any of these are already loaded (data from a das - # must all be in the same family). - dass = self.resolved.keys() - for d in dass: - raws = self.resolved[d] - for r in raws: - if r['mini'] and r['mini'] in ret: - if d not in ret[r['mini']]: - ret[r['mini']][d] = [] - - tot[r['mini']] += r['size'] - ret[r['mini']][d].append(r) - - # Go through remaining dass and assign them to a family - dass.sort() - i = 0 - for d in dass: - raws = self.resolved[d] - if tot[self.nmini[i]] >= ts: - i += 1 - if i > len(self.nmini) - 1: - i -= 1 - - for r in raws: - if r['mini']: - continue + # DAS -> list of file_info (path, size, type, etc.) + dass = sorted(self.resolved.keys()) + if not dass: + if runit: + return {}, {}, None + else: + return {}, {}, None - if d not in ret[self.nmini[i]]: - ret[self.nmini[i]][d] = [] + families = list(self.nmini) + if not families: + raise FormaIOError(11, "No PH5 families (nmini) defined.") - r['mini'] = self.nmini[i] - tot[self.nmini[i]] += r['size'] - ret[self.nmini[i]][d].append(r) + self.M = MAX_INDEX - return ret + # Prepare return structures + cmds = {} + info = {} + for fam in families: + cmds[fam] = [] + info[fam] = {'lists': [], 'instruments': []} - def setup(tl): - ''' Write sub-lists of raw files to each mini - family directory ''' - ret = {} - for m in self.nmini: - ret[m] = {} - for typ in ('texan', 'rt-130', 'nodal', 'seg2'): - of = None - outfile = "{0}_{1}{2}.lst".format( - typ, str(int(time.time())), m) - os.chdir(os.path.join(self.home, m)) - dass = tl[m] - keys = sorted(dass.keys()) - wrote = False - for d in keys: - files = dass[d] - for f in files: - if f['type'] == typ: - if not of: - of = open(outfile, 'w+') - of.write(f['path'] + '\n') - wrote = True + # Independent counters for family and mini index + family_counter = 0 + mini_counter = 0 - try: - of.close() - except BaseException: - pass - if wrote: - ret[m][typ] = os.path.join(self.home, m, outfile) + for das in dass: + files = self.resolved[das] + if not files: + continue - os.chdir(self.whereami) + # Assume a DAS has a single type from guess_instrument_type + tp = files[0]['type'] + if tp not in INST2PROG: + raise FormaIOError( + 12, + "Unsupported instrument type {0} for DAS {1}".format( + tp, das) + ) + + # Round-robin family: A, B, C, D, A, B, ... + family = families[family_counter % len(families)] + family_counter += 1 + + # Round-robin mini index: 1..MAX_INDEX, then wrap + mini_index = (mini_counter % MAX_INDEX) + 1 + mini_counter += 1 + + # Create list file for this DAS in its family directory + os.chdir(os.path.join(self.home, family)) + outfile = "{0}_{1}.lst".format(tp, das) + with open(outfile, 'w') as of: + for f in files: + of.write(f['path'] + '\n') + + list_path = os.path.join(self.home, family, outfile) + clprog = INST2PROG[tp] + + cmd = self.build_cmd(tp, clprog, list_path, mini_index) + cmds[family].append(cmd) + info[family]['lists'].append(list_path) + info[family]['instruments'].append(tp) - return ret + os.chdir(self.whereami) - def build_cmds(lsts): - ''' Make commands to do the conversion from raw to ph5 - for each mini ph5 family ''' - ret = {} - info = {} + # Save commands JSON (for debugging) + write_json( + cmds, + os.path.join(self.home, + "commands{0}.json".format(str(int(time.time())))) + ) - i = 0 - for m in self.nmini: - cmd = [] - lists = [] - instruments = [] - lst = lsts[m] - if not self.M: - self.M = 1 - ess = i * self.M + 1 - if 'texan' in lst: - lists.append(lst['texan']) - instruments.append('texan') - clprog = INST2PROG['texan'] - cmd.append( - "{3} -n master.ph5 -f {0} -M {1} -S {2}\ - --overide 2>&1".format(lst['texan'], - self.M, ess, clprog)) - if 'rt-130' in lst: - lists.append(lst['rt-130']) - instruments.append('rt-130') - clprog = INST2PROG['rt-130'] - cmd.append( - "{3} -n master.ph5 -f {0} -M {1} -S {2} 2>&1".format( - lst['rt-130'], self.M, ess, clprog)) - - if 'seg2' in lst: - lists.append(lst['seg2']) - instruments.append('seg2') - clprog = INST2PROG['seg2'] - cmd.append( - "{3} -n master.ph5 -f {0} -M {1} -S {2} 2>&1".format( - lst['seg2'], self.M, ess, clprog)) - - if 'nodal' in lst: - lists.append(lst['nodal']) - instruments.append('nodal') - clprog = INST2PROG['nodal'] - if self.UTM: - cmd.append( - "{5} -n master.ph5 -f {0} -M {1} -U {3} -S {2} -c\ - {4} 2>&1".format(lst['nodal'], self.M, - ess, self.UTM, self.COMBINE, - clprog)) - elif self.TSPF: - cmd.append( - "{4} -n master.ph5 -f {0} -M {1} -T -S {2} -c\ - {3} 2>&1".format( - lst['nodal'], self.M, ess, self.COMBINE, - clprog)) - else: - cmd.append( - "{4} -n master.ph5 -f {0} -M {1} -S {2} -c\ - {3} 2>&1".format( - lst['nodal'], self.M, ess, self.COMBINE, - clprog)) - ret[m] = cmd - if m not in info: - info[m] = {} - - info[m]['lists'] = lists - info[m]['instruments'] = instruments - - i += 1 - - return ret, info - - def save_cmds(cmds): - ''' Save commands ''' - write_json(cmds, os.path.join( - self.home, "commands{0}.json".format(str(int(time.time()))))) - - # - # Main - # - target_size = self.total_raw / len(self.nmini) - toload = split(target_size) - lsts = setup(toload) - cmds, info = build_cmds(lsts) - save_cmds(cmds) if runit is True: pees, i = self.run_cmds(cmds) return cmds, pees, i @@ -504,13 +510,6 @@ def read(self): self.average_raw = int(self.total_raw / n) self.number_raw = n self.infh.close() - # Estimate M so each mini file is about 12GB - if self.M is None: - self.M = int( - (((self.total_raw / len( - self.nmini)) / 1024 / 1024 / 1024) / 12) + 0.5) - if self.M == 0: - self.M = 1 def readDB(self): ''' Read JSON file containing files loaded so far. Same format as @@ -567,7 +566,6 @@ def resolveDB(self): # Have not seen this DAS yet else: ret[nk] = new_dass - self.resolved = ret def unite(self, TO='A'): diff --git a/ph5/utilities/pformagui.py b/ph5/utilities/pformagui.py index dd99ce83..49f7f5aa 100755 --- a/ph5/utilities/pformagui.py +++ b/ph5/utilities/pformagui.py @@ -23,6 +23,38 @@ UTMZone = '13N' +class MaxFamiliesDialog(QtWidgets.QDialog): + """ + custom spinning box because the spinning box in python 2 has bug that let + value go over max set + """ + def __init__(self, current_value=0, max_value=32, parent=None): + super(MaxFamiliesDialog, self).__init__(parent) + self.setWindowTitle("Set Max Parallel Families") + + layout = QtWidgets.QVBoxLayout(self) + + label = QtWidgets.QLabel( + "Max number of PH5 families to process in parallel (0 = auto):") + layout.addWidget(label) + + self.spin = QtWidgets.QSpinBox() + self.spin.setMinimum(0) + self.spin.setMaximum(max_value) # THIS MAX IS ENFORCED CORRECTLY + self.spin.setValue(current_value) + layout.addWidget(self.spin) + + buttons = QtWidgets.QDialogButtonBox( + QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel) + + buttons.accepted.connect(self.accept) + buttons.rejected.connect(self.reject) + layout.addWidget(buttons) + + def value(self): + return self.spin.value() + + class GetInputs(QtWidgets.QWidget): ''' Widget to set name of lst file, processing directory, and to start run @@ -169,6 +201,8 @@ def __init__(self): self.timeout = 2000 self.UTMZone = UTMZone self.combine = 1 + # max number of PH5 families processed in parallel + self.max_families = 4 self.readSettings() @@ -358,6 +392,14 @@ def createActions(self): statusTip="Reset all family processes.", triggered=self.resetIt) + # NEW: set max parallel families + self.maxFamiliesAct = QtWidgets.QAction( + "Set &Max Parallel Families...", + self, + statusTip=("Limit how many PH5 families (A, B, C, ...) are " + "processed in parallel."), + triggered=self.setMaxFamilies) + self.exitAct = QtWidgets.QAction( "E&xit", self, @@ -380,6 +422,8 @@ def createMenus(self): self.fileMenu.addAction(self.timeoutAct) self.fileMenu.addAction(self.utmAct) self.fileMenu.addAction(self.combineAct) + # to set max number of families at the same time + self.fileMenu.addAction(self.maxFamiliesAct) self.fileMenu.addSeparator() self.fileMenu.addAction(self.resetAct) self.fileMenu.addAction(self.exitAct) @@ -403,6 +447,39 @@ def createStatusBar(self): type=QtCore.Qt.QueuedConnection) self.statsig.emit("Ready") + def setMaxFamilies(self): + """ + Dialog to set max number of PH5 families processed in parallel. + + None / 0 means "auto" (no explicit GUI limit). + """ + phys = cpu_count(logical=False) or cpu_count(logical=True) + logi = cpu_count(logical=True) + + if phys and phys > 0: + max_family = max(1, phys // 2) + else: + max_family = max(1, logi // 2) + + current = self.max_families if self.max_families is not None else 0 + + dlg = MaxFamiliesDialog( + current_value=current if current < max_family else max_family, + max_value=max_family, + parent=self + ) + if dlg.exec_() == QtWidgets.QDialog.Accepted: + val = dlg.value() + if val <= 0: + self.max_families = None + self.statsig.emit("Max parallel families: auto") + else: + self.max_families = val + self.statsig.emit( + "Max parallel families set to {0}".format(val)) + # Persist in settings: + self.writeSettings() + def readSettings(self): ''' Read position and size from QSettings @@ -413,6 +490,20 @@ def readSettings(self): self.move(pos) self.resize(size) + # restore max_families + maxfam = settings.value('max_families', 4) + # QSettings may give a QString / QVariant: + try: + if maxfam is not None and maxfam != '': + maxfam_int = int(maxfam) + if maxfam_int > 0: + self.max_families = maxfam_int + else: + self.max_families = None + except Exception: + self.max_families = None + + def writeSettings(self): ''' Save QSettings @@ -421,6 +512,10 @@ def writeSettings(self): settings.setValue('pos', self.pos()) settings.setValue('size', self.size()) + settings.setValue('max_families', + self.max_families if self.max_families is not None + else 0) + def activeMdiChild(self): activeSubWindow = self.mdiArea.activeSubWindow() if activeSubWindow: @@ -515,15 +610,33 @@ def init_fio(f, d, utm=None, combine=None, main_window=None): cmds -> list of conversion commands lsts -> info about processing sub-lists and types of instruments ''' - fio = pforma_io.FormaIO(infile=f, outdir=d, main_window=main_window) + # pass max_families from GUI (if available) + max_families = None + if main_window is not None: + max_families = getattr(main_window, 'max_families', None) + + fio = pforma_io.FormaIO( + infile=f, + outdir=d, + main_window=main_window, + max_families=max_families + ) + if utm: fio.set_utm(utm) if combine: fio.set_combine(combine) - if cpu_count(logical=False) > 3: - fio.set_nmini(cpu_count(logical=True) + 1) + + # limit families to around half logical cores. + phys = cpu_count(logical=False) or cpu_count(logical=True) + logi = cpu_count(logical=True) + + if phys and phys > 0: + requested = max(1, phys // 2) else: - fio.set_nmini(cpu_count(logical=True)) + requested = max(1, logi // 2) + + fio.set_nmini(requested) fio.initialize_ph5()