Skip to content

Commit ad6c1c8

Browse files
committed
Option to bypass data missing check, ReducedSpectra class
1 parent ef991cd commit ad6c1c8

File tree

4 files changed

+296
-49
lines changed

4 files changed

+296
-49
lines changed

nenupy/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
__copyright__ = "Copyright 2023, nenupy"
66
__credits__ = ["Alan Loh"]
77
__license__ = "MIT"
8-
__version__ = "2.6.10"
8+
__version__ = "2.6.11"
99
__maintainer__ = "Alan Loh"
1010
__email__ = "alan.loh@obspm.fr"
1111

nenupy/io/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#! /usr/bin/python3
22
# -*- coding: utf-8 -*-
33

4-
from .tf import Spectra
4+
from .tf import Spectra, TFTask
5+
from .tf_utils import ReducedSpectra

nenupy/io/tf.py

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,21 @@ def _func_call(self) -> Callable:
115115

116116
@classmethod
117117
def correct_bandpass(cls):
118-
""" :class:`~nenupy.io.tf.TFTask` calling :func:`~nenupy.io.tf_utils.correct_bandpass` to correct the polyphase-filter bandpass reponse.
118+
""" :class:`~nenupy.io.tf.TFTask` to correct for the sub-band bandpass response.
119+
120+
A Poly-Phase Filter is involved in the NenuFAR data acquisition pipeline to split the data stream into sub-bands.
121+
The combination of the filter shape and a Fourier transform results in a non-flat response across each sub-band.
122+
This :class:`~nenupy.io.tf.TFTask` calls the :func:`~nenupy.io.tf_utils.correct_bandpass` function.
123+
124+
Example
125+
-------
126+
.. code-block:: python
127+
:emphasize-lines: 3
128+
129+
>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
130+
>>> sp = Spectra("/my/file.spectra")
131+
>>> sp.pipeline = TFPipeline(sp, TFTask.correct_bandpass())
132+
>>> data = sp.get(...)
119133
120134
.. figure:: ./_images/io_images/tf_bandpass_correction.png
121135
:width: 650
@@ -129,16 +143,28 @@ def wrapper_task(data, channels):
129143

130144
@classmethod
131145
def flatten_subband(cls):
132-
"""_summary_
146+
""" :class:`~nenupy.io.tf.TFTask` to flatten each sub-band bandpass.
147+
Based on the temporal median over each suband, a linear correction is applied to flatten the signal.
148+
This :class:`~nenupy.io.tf.TFTask` calls the :func:`~nenupy.io.tf_utils.flatten_subband` function.
149+
150+
Example
151+
-------
152+
.. code-block:: python
153+
:emphasize-lines: 3
154+
155+
>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
156+
>>> sp = Spectra("/my/file.spectra")
157+
>>> sp.pipeline = TFPipeline(sp, TFTask.flatten_subband())
158+
>>> data = sp.get(...)
133159
134160
.. figure:: ./_images/io_images/tf_sb_flatten.png
135161
:width: 650
136162
:align: center
137163
138164
Warning
139165
-------
140-
This is a warning
141-
166+
This correction assumes that the signal's spectrum could be considered flat at the sub-band resolution.
167+
The method is not recommended for data other than Stokes I.
142168
143169
"""
144170
def wrapper_task(data, channels):
@@ -169,6 +195,11 @@ def wrapper_task(data, channels, remove_channels):
169195

170196
@classmethod
171197
def correct_polarization(cls):
198+
"""_summary_
199+
200+
warning : needs to be done at the beginning.
201+
202+
"""
172203
def wrapper_task(
173204
time_unix,
174205
frequency_hz,
@@ -273,7 +304,32 @@ def wrapper_task(
273304

274305
@classmethod
275306
def time_rebin(cls):
276-
"""_summary_
307+
""" :class:`~nenupy.io.tf.TFTask` to re-bin the data in time.
308+
The targetted time resolution is defined by the ``'rebin_dt'`` argument, set in :attr:`~nenupy.io.tf.TFPipeline.parameters`.
309+
This :class:`~nenupy.io.tf.TFTask` calls the :func:`~nenupy.io.tf_utils.rebin_along_dimension` function.
310+
311+
Example
312+
-------
313+
.. code-block:: python
314+
315+
>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
316+
>>> import astropy.units as u
317+
>>> sp = Spectra("/my/file.spectra")
318+
>>> sp.pipeline = TFPipeline(sp, TFTask.time_rebin())
319+
320+
Then, either perform a one_time application of the `rebin_dt` parameter (that is forgotten after the :meth:`~nenupy.io.tf.Spectra.get` call):
321+
322+
.. code-block:: python
323+
324+
>>> data = sp.get(..., rebin_dt=0.2*u.s,...)
325+
326+
Or, set it for further usage:
327+
328+
.. code-block:: python
329+
330+
>>> sp.pipeline.parameters["rebin_dt"] = 0.2*u.s
331+
>>> data = sp.get(...)
332+
277333
278334
.. figure:: ./_images/io_images/tf_time_rebin.png
279335
:width: 650
@@ -652,7 +708,7 @@ class Spectra:
652708
653709
"""
654710

655-
def __init__(self, filename: str):
711+
def __init__(self, filename: str, check_missing_data: bool = True):
656712
self.filename = filename
657713

658714
# Decode the main header and lazy load the data
@@ -664,7 +720,7 @@ def __init__(self, filename: str):
664720
data = self._lazy_load_data()
665721

666722
# Compute the boolean mask of bad blocks
667-
bad_block_mask = self._get_bad_data_mask(data)
723+
bad_block_mask = self._get_bad_data_mask(data, bypass_verification=~check_missing_data)
668724

669725
# Compute the main data block descriptors (time / frequency / beam)
670726
subband_width_hz = SUBBAND_WIDTH.to_value(u.Hz)
@@ -915,12 +971,15 @@ def get(self, file_name: str = None, **pipeline_kwargs) -> SData:
915971
else:
916972
# Save the result of the pipeline in a file
917973
# No security on the resulting data volume
974+
log.info(f"Estimated data volume to store: {(data.nbytes * u.byte).to(u.Gibyte):.3f}...")
918975
utils.store_dask_tf_data(
919976
file_name=file_name,
920977
data=data,
921978
time=time,
922979
frequency=frequency,
923-
polarization=["XX", "XY", "YX", "YY"] if not self.pipeline.contains("Compute Stokes parameters") else self.pipeline.parameters["stokes"]
980+
polarization=["XX", "XY", "YX", "YY"] if not self.pipeline.contains("Compute Stokes parameters") else self.pipeline.parameters["stokes"],
981+
mode="w" if self.pipeline.parameters["overwrite"] else "auto",
982+
beam=self.pipeline.parameters["beam"]
924983
)
925984
self.pipeline.parameters = parameters_copy # Reset the parameters
926985
return
@@ -1159,9 +1218,13 @@ def _lazy_load_data(self) -> np.ndarray:
11591218
return tmp.view(np.dtype(global_struct))
11601219

11611220
@staticmethod
1162-
def _get_bad_data_mask(data: np.ndarray) -> np.ndarray:
1221+
def _get_bad_data_mask(data: np.ndarray, bypass_verification: bool = False) -> np.ndarray:
11631222
""" """
11641223

1224+
if bypass_verification:
1225+
log.info("Skipping missing data verification...")
1226+
return np.zeros(data["TIMESTAMP"].size, dtype=bool)
1227+
11651228
log.info("Checking for missing data (can take up to 1 min)...")
11661229

11671230
# Either the TIMESTAMP is set to 0, the first idx, or the SB number is negative
@@ -1278,4 +1341,3 @@ def _to_sdata(self, data: np.ndarray, time: Time, frequency: u.Quantity) -> SDat
12781341
freq=frequency,
12791342
polar=self.pipeline.parameters["stokes"],
12801343
)
1281-

0 commit comments

Comments
 (0)