From 13ea0ffca525dc0f24a30b8b0174229bcd4c1b7c Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Mon, 30 Oct 2023 10:35:34 +0100 Subject: [PATCH] Add option single_ob to HDF5Merger for merging chunks of the same ob --- src/ctapipe/io/hdf5merger.py | 44 ++++++++++++++++++++++++++---------- src/ctapipe/tools/merge.py | 4 ++++ 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/ctapipe/io/hdf5merger.py b/src/ctapipe/io/hdf5merger.py index c8d9e8136d5..e51342fbcea 100644 --- a/src/ctapipe/io/hdf5merger.py +++ b/src/ctapipe/io/hdf5merger.py @@ -158,6 +158,15 @@ class HDF5Merger(Component): True, help="Whether to include processing statistics in merged output" ).tag(config=True) + single_ob = traits.Bool( + False, + help=( + "If true, input files are assumed to be multiple chunks from the same" + " observation block and the ob / sb blocks will only be copied from " + " the first input file" + ), + ).tag(config=True) + def __init__(self, output_path=None, **kwargs): # enable using output_path as posarg if output_path not in {None, traits.Undefined}: @@ -207,6 +216,8 @@ def __init__(self, output_path=None, **kwargs): # this will update _merged_obs_ids from existing input file self._check_obs_ids(self.h5file) + self._n_merged = 0 + def __call__(self, other: str | Path | tables.File): """ Append file ``other`` to the output file @@ -217,7 +228,7 @@ def __call__(self, other: str | Path | tables.File): with exit_stack: # first file to be merged - if self.meta is None: + if self._n_merged == 0: self.meta = self._read_meta(other) self.data_model_version = self.meta.product.data_model_version metadata.write_to_hdf5(self.meta.to_dict(), self.h5file) @@ -233,6 +244,7 @@ def __call__(self, other: str | Path | tables.File): self.log.info( "Updated required nodes to %s", sorted(self.required_nodes) ) + self._n_merged += 1 finally: self._update_meta() @@ -288,10 +300,16 @@ def _check_obs_ids(self, other): f" check for duplicated obs_ids. Tried: {keys}" ) - duplicated = self._merged_obs_ids.intersection(obs_ids) - if len(duplicated) > 0: - msg = f"Input file {other.filename} contains obs_ids already included in output file: {duplicated}" - raise CannotMerge(msg) + if self.single_ob and len(self._merged_obs_ids) > 0: + different = self._merged_obs_ids.symmetric_difference(obs_ids) + if len(different) > 0: + msg = f"Input file {other.filename} contains different obs_ids than already merged ({self._merged_obs_ids}) for single_ob=True: {different}" + raise CannotMerge(msg) + else: + duplicated = self._merged_obs_ids.intersection(obs_ids) + if len(duplicated) > 0: + msg = f"Input file {other.filename} contains obs_ids already included in output file: {duplicated}" + raise CannotMerge(msg) self._merged_obs_ids.update(obs_ids) @@ -301,13 +319,15 @@ def _append(self, other): # Configuration self._append_subarray(other) - config_keys = [ - "/configuration/observation/scheduling_block", - "/configuration/observation/observation_block", - ] - for key in config_keys: - if key in other.root: - self._append_table(other, other.root[key]) + # in case of "single_ob", we only copy sb/ob blocks for the first file + if not self.single_ob or self._n_merged == 0: + config_keys = [ + "/configuration/observation/scheduling_block", + "/configuration/observation/observation_block", + ] + for key in config_keys: + if key in other.root: + self._append_table(other, other.root[key]) key = "/configuration/telescope/pointing" if key in other.root: diff --git a/src/ctapipe/tools/merge.py b/src/ctapipe/tools/merge.py index 11285ad9caf..694c208943c 100644 --- a/src/ctapipe/tools/merge.py +++ b/src/ctapipe/tools/merge.py @@ -78,6 +78,10 @@ class MergeTool(Tool): } flags = { + "single-ob": ( + {"HDF5Merger": {"single_ob": True}}, + "Only copy observation config of first file to be merged.", + ), "progress": ( {"MergeTool": {"progress_bar": True}}, "Show a progress bar for all given input files",