Skip to content

Commit

Permalink
Fix a double gzip of ligolw files (#151)
Browse files Browse the repository at this point in the history
* Do not cross metric day boundaries.

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <joseph.areeda@ligo.org>

* Check point merge (#147)

* Do not cross metric day boundaries.

* add log file arg delete empty directories when done

* Tweak remove empty dir removal

* Tweak remove empty dir removal again

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <joseph.areeda@ligo.org>

* rebase agaist last approved PR

* rebase against last approved PR

* rebase against last approved PR again, fix flake8

* Fix a bug in remove empty directories.

Co-authored-by: Joseph Areeda <joseph.areeda@ligo.org>

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <joseph.areeda@ligo.org>

* minor doc changes

* Fix a bug where an xml.gz file could get compressed again in merge-with-gaps

* Implement a periodic vacate to address permanent D-state (uninterupptible wait) causing jobs to fail to complete

* Always create a log file. If not specified put one in the output directory

* Fix a problem with periodic vacate.

* Up the periodic vacate time to 3 hrs

* Found a job killing typo

* Add time limits to post processing also

* Don't save segments.txt file if no sgments founds because we don't know if it's an issue of not finding them or a valid not analyzable state.

* disable periodic vacate to demo the problem.

* Fix reported version in some utilities. Only update segments.txt if omicron is actually run.

* Clarify relative imports. and add details to a few log messages

* Resolve flake8 issues

---------

Co-authored-by: Joseph Areeda <joseph.areeda@ligo.org>
  • Loading branch information
areeda and Joseph Areeda authored Mar 28, 2023
1 parent acb2921 commit a7ea6f5
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 25 deletions.
3 changes: 2 additions & 1 deletion omicron/cli/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@
import os
import re

from .. import __version__

__author__ = 'joseph areeda'
__email__ = 'joseph.areeda@ligo.org'
__version__ = '0.0.1'
__process_name__ = 'archive'

# example channel indir: L1:SUS-PR3_M1_DAMP_T_IN1_DQ
Expand Down
2 changes: 1 addition & 1 deletion omicron/cli/hdf5_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import os.path
import argparse

from omicron import (io, __version__)
from .. import (io, __version__)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

Expand Down
33 changes: 29 additions & 4 deletions omicron/cli/merge_with_gaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
prog_start_time = time.time()
import argparse
import glob
import gzip
import logging
from .. import __version__
from pathlib import Path
import re
import shutil
Expand All @@ -37,11 +39,10 @@

__author__ = 'joseph areeda'
__email__ = 'joseph.areeda@ligo.org'
__version__ = '0.0.1'
__process_name__ = 'omicron_merge_with_gaps'

# global logger
log_file_format = "%(asctime)s - %(levelname)s - %(funcName)s %(lineno)d: %(message)s"
log_file_format = "%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s"
log_file_date_format = '%m-%d %H:%M:%S'
logging.basicConfig(format=log_file_format, datefmt=log_file_date_format)
logger = logging.getLogger(__process_name__)
Expand All @@ -67,6 +68,22 @@ def get_merge_cmd(ext):
return ret_path


def is_old_ligolw(path):
flag = "ilwd:char"
if 'gz' in path.name:
with gzip.open(str(path.absolute()), 'r') as gz:
for line in gz:
if flag in str(line):
return True
return False
else:
with path.open('r') as fp:
for line in fp:
if flag in line:
return True
return False


def do_merge(opath, curfiles, chan, stime, etime, ext, skip_gzip):
"""
Given the list of trigger files merge them all into a single file
Expand Down Expand Up @@ -94,6 +111,9 @@ def do_merge(opath, curfiles, chan, stime, etime, ext, skip_gzip):
if 'xml' in ext: # also accept xml.gz
outfile_path = Path(str(outfile_path.absolute()).replace('.xml.gz', '.xml'))
cmd.append(f'--output={outfile_path}')
if is_old_ligolw(curfiles[0]):
cmd.append('--ilwdchar-compat')
logger.debug('Working with old ligolw format')
for cur in curfiles:
cmd.append(str(cur.absolute()))
if 'xml' not in ext:
Expand All @@ -117,7 +137,7 @@ def do_merge(opath, curfiles, chan, stime, etime, ext, skip_gzip):
else:
logger.error(f'Return code:{returncode}, stderr:\n{result.stderr.decode("UTF-8")}')

if 'xml' in ext and returncode == 0 and not skip_gzip:
if 'xml' in ext and returncode == 0 and not skip_gzip and outfile_path.suffix != '.gz':
logger.info(f'Compressing {outfile_path} with gzip')
res2 = subprocess.run(['gzip', '-9', '--force', outfile_path], capture_output=True)
if res2.returncode == 0:
Expand Down Expand Up @@ -161,7 +181,7 @@ def valid_file(path, uint_bug):
os.remove(path)
else:
ret = True
logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), {ret} took {time.time()-vf_strt:.2f}')
logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), took {time.time()-vf_strt:.2f}')
return ret


Expand Down Expand Up @@ -208,6 +228,11 @@ def main():
log_file_handler.setFormatter(log_formatter)
logger.addHandler(log_file_handler)

# debugging?
logger.debug('{} called with arguments:'.format(__process_name__))
for k, v in args.__dict__.items():
logger.debug(' {} = {}'.format(k, v))

fpat = '^(.+)-(\\d+)-(\\d+).(.+)$'
fmatch = re.compile(fpat)

Expand Down
31 changes: 20 additions & 11 deletions omicron/cli/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@
from gwpy.io.cache import read_cache
from gwpy.time import to_gps, tconvert

from omicron import (const, segments, log, data, parameters, utils, condor, io,
__version__)
from .. import (const, segments, log, data, parameters, utils, condor, io, __version__)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

Expand Down Expand Up @@ -465,7 +464,13 @@ def main(args=None):
args.verbose = max(5 - args.verbose, 0)
logger.setLevel(args.verbose * 10)
if args.log_file:
logger.add_file_handler(args.log_file)
log_file = Path(args.log_file)
else:
# if not specified default to the output directory
log_file = Path(args.output_dir) / 'omicron-process.log'
log_file.parent.mkdir(mode=0o755, exist_ok=True, parents=True)
logger.add_file_handler(log_file)

logger.debug("Command line args:")
for arg in vars(args):
logger.debug(f'{arg} = {str(getattr(args, arg))}')
Expand Down Expand Up @@ -850,7 +855,8 @@ def main(args=None):
# segment, or long enough to process safely)
if truncate and abs(lastseg) < chunkdur * 2:
logger.info(
"The final segment is too short, but ends at the limit of "
"The final segment is too short, " f'Minimum length is {int(chunkdur*2)} '
"but ends at the limit of "
"available data, presumably this is an active segment. It "
"will be removed so that it can be processed properly later",
)
Expand Down Expand Up @@ -942,11 +948,8 @@ def main(args=None):
if newdag and len(segs) == 0 and online and alldata:
logger.info(
"No analysable segments found, but up-to-date data are "
"available. A segments.txt file will be written so we don't "
"have to search these data again",
"available. "
)
segments.write_segments(cachesegs, segfile)

This comment has been minimized.

Copy link
@duncanmmacleod

duncanmmacleod Apr 20, 2023

Member

@areeda, why was this line removed? This (I think) means that in times where the data are available, but the IFO is down, the segments will not be recorded as have been searched already, so the time to search will just grow and grow.

This comment has been minimized.

Copy link
@areeda

areeda Apr 20, 2023

Author Collaborator

@duncanmmacleod
The issue is that we do not distinguish between segment not available and segment not active. So when segdb had a latency of ~15 min we were marking segments as done but gap filling the next day processed much more active time. (that was fun tracking down)

We still use the segments.txt file but only update it when we actually run omicron on something.

logger.info("Segments written to\n%s" % segfile)
clean_dirs(run_dir_list)
clean_exit(0, tempfiles)

Expand Down Expand Up @@ -1013,10 +1016,13 @@ def main(args=None):
**condorcmds
)
# This allows us to start with a memory request that works maybe 80%, but bumps it if we go over
# we also limit individual jobs to a max runtime to cause them to be vacates to deal with NFS hanging
reqmem = condorcmds.pop('request_memory', 1024)
ojob.add_condor_cmd('+InitialRequestMemory', f'{reqmem}')
ojob.add_condor_cmd('request_memory', f'ifthenelse(isUndefined(MemoryUsage), {reqmem}, int(3*MemoryUsage))')
ojob.add_condor_cmd('periodic_release', '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34) && (JobStatus == 5)')
ojob.add_condor_cmd('periodic_release', '(HoldReasonCode =?= 26 || HoldReasonCode =?= 34 '
'|| HoldReasonCode =?= 46) && (JobStatus == 5)')
ojob.add_condor_cmd('allowed_job_duration', 3 * 3600)
ojob.add_condor_cmd('periodic_remove', '(JobStatus == 1) && MemoryUsage >= 7G')

ojob.add_condor_cmd('+OmicronProcess', f'"{group}"')
Expand All @@ -1030,8 +1036,11 @@ def main(args=None):
ppjob.add_condor_cmd('+InitialRequestMemory', f'{ppmem}')
ppjob.add_condor_cmd('request_memory',
f'ifthenelse(isUndefined(MemoryUsage), {ppmem}, int(3*MemoryUsage))')
ojob.add_condor_cmd('allowed_job_duration', 3 * 3600)
ppjob.add_condor_cmd('periodic_release',
'(HoldReasonCode =?= 26 || HoldReasonCode =?= 34) && (JobStatus == 5)')
'(HoldReasonCode =?= 26 || HoldReasonCode =?= 34 '
'|| HoldReasonCode =?= 46) && (JobStatus == 5)')

ppjob.add_condor_cmd('periodic_remove', '(JobStatus == 1) && MemoryUsage >= 7G')

ppjob.add_condor_cmd('environment', '"HDF5_USE_FILE_LOCKING=FALSE"')
Expand Down Expand Up @@ -1159,7 +1168,7 @@ def main(args=None):
hdf5files = ' '.join(omicronfiles[c]['hdf5'])
for f in omicronfiles[c]['hdf5']:
ppnode.add_input_file(f)
no_merge = '--no-merge' if args.skip_root_merge else ''
no_merge = '--no-merge' if args.skip_hdf5_merge else ''

operations.append(
f' {prog_path["omicron-merge"]} {no_merge} '
Expand Down
2 changes: 1 addition & 1 deletion omicron/cli/root_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import os.path
import argparse

from omicron import (io, __version__)
from .. import (io, __version__)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

Expand Down
2 changes: 1 addition & 1 deletion omicron/cli/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from gwpy.table.filters import in_segmentlist
from gwpy.time import to_gps

from omicron import (io, const, __version__)
from .. import (io, const, __version__)
from omicron.data import write_cache
from omicron.segments import (Segment, SegmentList, cache_segments)

Expand Down
4 changes: 2 additions & 2 deletions omicron/cli/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
from gwpy.plot import Plot
from gwpy.plot.segments import SegmentRectangle

from omicron import (condor, const, io, log, segments, __version__)
from omicron.utils import get_omicron_version
from .. import (condor, const, io, log, segments, __version__)
from ..utils import get_omicron_version

__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"

Expand Down
6 changes: 2 additions & 4 deletions omicron/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,8 @@ def get_omicron_version(executable=None):
"version",
]).decode("utf-8").rsplit(maxsplit=1)[-1],
)
except subprocess.CalledProcessError:
raise RuntimeError(
"failed to determine omicron version from executable"
)
except subprocess.CalledProcessError as ex:
raise RuntimeError(f"failed to determine omicron version from executable: {str(ex)}")


def astropy_config_path(parent, update_environ=True):
Expand Down

0 comments on commit a7ea6f5

Please sign in to comment.