diff --git a/omicron/cli/archive.py b/omicron/cli/archive.py index cb93859..efda155 100644 --- a/omicron/cli/archive.py +++ b/omicron/cli/archive.py @@ -53,9 +53,10 @@ import os import re +from .. import __version__ + __author__ = 'joseph areeda' __email__ = 'joseph.areeda@ligo.org' -__version__ = '0.0.1' __process_name__ = 'archive' # example channel indir: L1:SUS-PR3_M1_DAMP_T_IN1_DQ diff --git a/omicron/cli/hdf5_merge.py b/omicron/cli/hdf5_merge.py index da9ce25..6bd6598 100644 --- a/omicron/cli/hdf5_merge.py +++ b/omicron/cli/hdf5_merge.py @@ -23,7 +23,7 @@ import os.path import argparse -from omicron import (io, __version__) +from .. import (io, __version__) __author__ = 'Duncan Macleod ' diff --git a/omicron/cli/merge_with_gaps.py b/omicron/cli/merge_with_gaps.py index 2e9793d..3861d74 100644 --- a/omicron/cli/merge_with_gaps.py +++ b/omicron/cli/merge_with_gaps.py @@ -28,7 +28,9 @@ prog_start_time = time.time() import argparse import glob +import gzip import logging +from .. import __version__ from pathlib import Path import re import shutil @@ -37,11 +39,10 @@ __author__ = 'joseph areeda' __email__ = 'joseph.areeda@ligo.org' -__version__ = '0.0.1' __process_name__ = 'omicron_merge_with_gaps' # global logger -log_file_format = "%(asctime)s - %(levelname)s - %(funcName)s %(lineno)d: %(message)s" +log_file_format = "%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s" log_file_date_format = '%m-%d %H:%M:%S' logging.basicConfig(format=log_file_format, datefmt=log_file_date_format) logger = logging.getLogger(__process_name__) @@ -67,6 +68,22 @@ def get_merge_cmd(ext): return ret_path +def is_old_ligolw(path): + flag = "ilwd:char" + if 'gz' in path.name: + with gzip.open(str(path.absolute()), 'r') as gz: + for line in gz: + if flag in str(line): + return True + return False + else: + with path.open('r') as fp: + for line in fp: + if flag in line: + return True + return False + + def do_merge(opath, curfiles, chan, stime, etime, ext, skip_gzip): """ Given the list of trigger files merge them all into a single file @@ -94,6 +111,9 @@ def do_merge(opath, curfiles, chan, stime, etime, ext, skip_gzip): if 'xml' in ext: # also accept xml.gz outfile_path = Path(str(outfile_path.absolute()).replace('.xml.gz', '.xml')) cmd.append(f'--output={outfile_path}') + if is_old_ligolw(curfiles[0]): + cmd.append('--ilwdchar-compat') + logger.debug('Working with old ligolw format') for cur in curfiles: cmd.append(str(cur.absolute())) if 'xml' not in ext: @@ -117,7 +137,7 @@ def do_merge(opath, curfiles, chan, stime, etime, ext, skip_gzip): else: logger.error(f'Return code:{returncode}, stderr:\n{result.stderr.decode("UTF-8")}') - if 'xml' in ext and returncode == 0 and not skip_gzip: + if 'xml' in ext and returncode == 0 and not skip_gzip and outfile_path.suffix != '.gz': logger.info(f'Compressing {outfile_path} with gzip') res2 = subprocess.run(['gzip', '-9', '--force', outfile_path], capture_output=True) if res2.returncode == 0: @@ -161,7 +181,7 @@ def valid_file(path, uint_bug): os.remove(path) else: ret = True - logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), {ret} took {time.time()-vf_strt:.2f}') + logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), took {time.time()-vf_strt:.2f}') return ret @@ -208,6 +228,11 @@ def main(): log_file_handler.setFormatter(log_formatter) logger.addHandler(log_file_handler) + # debugging? + logger.debug('{} called with arguments:'.format(__process_name__)) + for k, v in args.__dict__.items(): + logger.debug(' {} = {}'.format(k, v)) + fpat = '^(.+)-(\\d+)-(\\d+).(.+)$' fmatch = re.compile(fpat) diff --git a/omicron/cli/process.py b/omicron/cli/process.py index 3268d98..13d1054 100644 --- a/omicron/cli/process.py +++ b/omicron/cli/process.py @@ -83,8 +83,7 @@ from gwpy.io.cache import read_cache from gwpy.time import to_gps, tconvert -from omicron import (const, segments, log, data, parameters, utils, condor, io, - __version__) +from .. import (const, segments, log, data, parameters, utils, condor, io, __version__) __author__ = 'Duncan Macleod ' @@ -465,7 +464,13 @@ def main(args=None): args.verbose = max(5 - args.verbose, 0) logger.setLevel(args.verbose * 10) if args.log_file: - logger.add_file_handler(args.log_file) + log_file = Path(args.log_file) + else: + # if not specified default to the output directory + log_file = Path(args.output_dir) / 'omicron-process.log' + log_file.parent.mkdir(mode=0o755, exist_ok=True, parents=True) + logger.add_file_handler(log_file) + logger.debug("Command line args:") for arg in vars(args): logger.debug(f'{arg} = {str(getattr(args, arg))}') @@ -850,7 +855,8 @@ def main(args=None): # segment, or long enough to process safely) if truncate and abs(lastseg) < chunkdur * 2: logger.info( - "The final segment is too short, but ends at the limit of " + "The final segment is too short, " f'Minimum length is {int(chunkdur*2)} ' + "but ends at the limit of " "available data, presumably this is an active segment. It " "will be removed so that it can be processed properly later", ) @@ -942,11 +948,8 @@ def main(args=None): if newdag and len(segs) == 0 and online and alldata: logger.info( "No analysable segments found, but up-to-date data are " - "available. A segments.txt file will be written so we don't " - "have to search these data again", + "available. " ) - segments.write_segments(cachesegs, segfile) - logger.info("Segments written to\n%s" % segfile) clean_dirs(run_dir_list) clean_exit(0, tempfiles) @@ -1013,10 +1016,13 @@ def main(args=None): **condorcmds ) # This allows us to start with a memory request that works maybe 80%, but bumps it if we go over + # we also limit individual jobs to a max runtime to cause them to be vacates to deal with NFS hanging reqmem = condorcmds.pop('request_memory', 1024) ojob.add_condor_cmd('+InitialRequestMemory', f'{reqmem}') ojob.add_condor_cmd('request_memory', f'ifthenelse(isUndefined(MemoryUsage), {reqmem}, int(3*MemoryUsage))') - ojob.add_condor_cmd('periodic_release', '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34) && (JobStatus == 5)') + ojob.add_condor_cmd('periodic_release', '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34 ' + '|| HoldReasonCode =?= 46) && (JobStatus == 5)') + ojob.add_condor_cmd('allowed_job_duration', 3 * 3600) ojob.add_condor_cmd('periodic_remove', '(JobStatus == 1) && MemoryUsage >= 7G') ojob.add_condor_cmd('+OmicronProcess', f'"{group}"') @@ -1030,8 +1036,11 @@ def main(args=None): ppjob.add_condor_cmd('+InitialRequestMemory', f'{ppmem}') ppjob.add_condor_cmd('request_memory', f'ifthenelse(isUndefined(MemoryUsage), {ppmem}, int(3*MemoryUsage))') + ojob.add_condor_cmd('allowed_job_duration', 3 * 3600) ppjob.add_condor_cmd('periodic_release', - '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34) && (JobStatus == 5)') + '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34 ' + '|| HoldReasonCode =?= 46) && (JobStatus == 5)') + ppjob.add_condor_cmd('periodic_remove', '(JobStatus == 1) && MemoryUsage >= 7G') ppjob.add_condor_cmd('environment', '"HDF5_USE_FILE_LOCKING=FALSE"') @@ -1159,7 +1168,7 @@ def main(args=None): hdf5files = ' '.join(omicronfiles[c]['hdf5']) for f in omicronfiles[c]['hdf5']: ppnode.add_input_file(f) - no_merge = '--no-merge' if args.skip_root_merge else '' + no_merge = '--no-merge' if args.skip_hdf5_merge else '' operations.append( f' {prog_path["omicron-merge"]} {no_merge} ' diff --git a/omicron/cli/root_merge.py b/omicron/cli/root_merge.py index 47f3c82..011e3cd 100644 --- a/omicron/cli/root_merge.py +++ b/omicron/cli/root_merge.py @@ -24,7 +24,7 @@ import os.path import argparse -from omicron import (io, __version__) +from .. import (io, __version__) __author__ = 'Duncan Macleod ' diff --git a/omicron/cli/show.py b/omicron/cli/show.py index 07fac45..301acc8 100644 --- a/omicron/cli/show.py +++ b/omicron/cli/show.py @@ -31,7 +31,7 @@ from gwpy.table.filters import in_segmentlist from gwpy.time import to_gps -from omicron import (io, const, __version__) +from .. import (io, const, __version__) from omicron.data import write_cache from omicron.segments import (Segment, SegmentList, cache_segments) diff --git a/omicron/cli/status.py b/omicron/cli/status.py index 4742f5a..7702ca4 100644 --- a/omicron/cli/status.py +++ b/omicron/cli/status.py @@ -51,8 +51,8 @@ from gwpy.plot import Plot from gwpy.plot.segments import SegmentRectangle -from omicron import (condor, const, io, log, segments, __version__) -from omicron.utils import get_omicron_version +from .. import (condor, const, io, log, segments, __version__) +from ..utils import get_omicron_version __author__ = "Duncan Macleod " diff --git a/omicron/utils.py b/omicron/utils.py index 80095d9..97dd701 100644 --- a/omicron/utils.py +++ b/omicron/utils.py @@ -102,10 +102,8 @@ def get_omicron_version(executable=None): "version", ]).decode("utf-8").rsplit(maxsplit=1)[-1], ) - except subprocess.CalledProcessError: - raise RuntimeError( - "failed to determine omicron version from executable" - ) + except subprocess.CalledProcessError as ex: + raise RuntimeError(f"failed to determine omicron version from executable: {str(ex)}") def astropy_config_path(parent, update_environ=True):