Skip to content

Commit

Permalink
Merge pull request #238 from luojing1211/new_disperse_shift
Browse files Browse the repository at this point in the history
add disperse shift
  • Loading branch information
mhvk authored Dec 10, 2021
2 parents b8b139d + e8b1759 commit 41534cd
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ New Features
- Add a simpler ``ShiftSamples`` task that just shifts channels by integer
number of samples. [#226, #235, #239]

- Add ability for incoherent dedispersion with ``DisperseSamples`` and
``Dedispersamples``. [#238]

- Streams can now carry meta-data in a ``meta`` attribute. This includes
information on ``frequency``, ``sideband``, and ``polarization``, all
of which are stored in ``meta['__attributes__']`` entry (like astropy's
Expand Down
118 changes: 115 additions & 3 deletions baseband_tasks/dispersion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from astropy import units as u
from astropy.utils import lazyproperty

from .base import PaddedTaskBase, getattr_if_none
from .base import PaddedTaskBase, getattr_if_none, SetAttribute
from .fourier import fft_maker
from .dm import DispersionMeasure
from .sampling import ShiftSamples


__all__ = ['Disperse', 'Dedisperse']
__all__ = ['Disperse', 'Dedisperse', 'DisperseSamples', 'DedisperseSamples']


class Disperse(PaddedTaskBase):
Expand Down Expand Up @@ -40,6 +41,8 @@ class Disperse(PaddedTaskBase):
See Also
--------
baseband_tasks.fourier.fft_maker : to select the FFT package used.
baseband_tasks.dispersion.Dedisperse : for coherent dedispersion
baseband_tasks.dispersion.DisperseSamples : for incoherent dispersion
"""

def __init__(self, ih, dm, *, reference_frequency=None,
Expand Down Expand Up @@ -170,10 +173,119 @@ class Dedisperse(Disperse):
See Also
--------
baseband_tasks.fourier.fft_maker : to select the FFT package used.
baseband_tasks.dispersion.Disperse : for coherent dispersion
baseband_tasks.dispersion.DedisperseSamples : for incoherent dedispersion
"""

def __init__(self, ih, dm, *, reference_frequency=None,
samples_per_frame=None, frequency=None, sideband=None):
super().__init__(ih, -dm, reference_frequency=reference_frequency,
samples_per_frame=samples_per_frame,
frequency=frequency, sideband=sideband)

@property
def dm(self):
return -self._dm


class DisperseSamples(ShiftSamples):
"""Incoherently shift a time stream to give it a dispersive time delay.
This task does not handle any in-channel dispersive smearing, but only
shifts the samples according to the mid-channel frequency.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
dm : float or `~baseband_tasks.dm.DispersionMeasure` quantity
Dispersion measure to disperse with. If negative, will dedisperse,
but clearer to use `~baseband_tasks.dispersion.DedisperseSamples`.
reference_frequency : `~astropy.units.Quantity`
Frequency to which the data should be dispersed. Can be an array.
By default, the mean frequency.
samples_per_frame : int, optional
Number of dispersed samples which should be produced in one go.
The number of input samples used will be larger to avoid wrapping.
If not given, as produced by the minimum power of 2 of input
samples that yields at least 75% efficiency.
frequency : `~astropy.units.Quantity`, optional
Frequencies for each channel in ``ih``.
Default: taken from ``ih`` (if available).
sideband : array, optional
Whether frequencies in ``ih`` are upper (+1) or lower (-1) sideband.
Default: taken from ``ih`` (if available). Note that while this is
only used if the data is real (to calculate the mid-channel
frequency), it should always be passed in together with ``frequency``,
since otherwise other tasks cannot interpret frequency correctly.
See Also
--------
baseband_tasks.dispersion.DedisperseSamples : for incoherent dedispersion
baseband_tasks.dispersion.Disperse : for coherent dispersion
"""

def __init__(self, ih, dm, *, reference_frequency=None,
samples_per_frame=None, frequency=None, sideband=None):
# Set possible missing frequency/sideband attributes
if frequency is not None or sideband is not None:
ih = SetAttribute(ih, frequency=frequency, sideband=sideband)
frequency = ih.frequency
if not ih.complex_data:
# Calculate mid-channel frequency for real data (for complex,
# the frequencies are already mid-channel).
frequency = frequency + ih.sideband * ih.sample_rate / 2.

if reference_frequency is None:
reference_frequency = frequency.mean()

# Compute the time shift and use it to set up ShiftSamples.
dm = DispersionMeasure(dm)
time_delay = dm.time_delay(frequency, reference_frequency)
super().__init__(ih, time_delay, samples_per_frame=samples_per_frame)

self.reference_frequency = reference_frequency
self._dm = dm


class DedisperseSamples(DisperseSamples):
"""Incoherently shift a time stream to correct for a dispersive time delay.
This task does not handle any in-channel dispersive smearing, but only
shifts the samples according to the mid-channel frequency.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
dm : float or `~baseband_tasks.dm.DispersionMeasure` quantity
Dispersion measure to correct for. If negative, will disperse,
but clearer to use `~baseband_tasks.dispersion.DisperseSamples`.
reference_frequency : `~astropy.units.Quantity`
Frequency to which the data should be dispersed. Can be an array.
By default, the mean frequency.
samples_per_frame : int, optional
Number of dedispersed samples which should be produced in one go.
The number of input samples used will be larger to avoid wrapping.
If not given, as produced by the minimum power of 2 of input
samples that yields at least 75% efficiency.
frequency : `~astropy.units.Quantity`, optional
Frequencies for each channel in ``ih``.
Default: taken from ``ih`` (if available).
sideband : array, optional
Whether frequencies in ``ih`` are upper (+1) or lower (-1) sideband.
Default: taken from ``ih`` (if available). Note that while this is
only used if the data is real (to calculate the mid-channel
frequency), it should always be passed in together with ``frequency``,
since otherwise other tasks cannot interpret frequency correctly.
See Also
--------
baseband_tasks.dispersion.DisperseSamples : for incoherent dispersion
baseband_tasks.dispersion.Dedisperse : for coherent dedispersion
"""

def __init__(self, ih, dm, reference_frequency=None,
def __init__(self, ih, dm, *, reference_frequency=None,
samples_per_frame=None, frequency=None, sideband=None):
super().__init__(ih, -dm, reference_frequency=reference_frequency,
samples_per_frame=samples_per_frame,
Expand Down
70 changes: 64 additions & 6 deletions baseband_tasks/tests/test_dispersion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from astropy.tests.helper import assert_quantity_allclose

from baseband_tasks.fourier import fft_maker
from baseband_tasks.dispersion import Disperse, Dedisperse, DispersionMeasure
from baseband_tasks.dispersion import (Disperse, Dedisperse, DispersionMeasure,
DisperseSamples, DedisperseSamples)
from baseband_tasks.generators import StreamGenerator


Expand All @@ -21,9 +22,8 @@
299.872 * u.MHz) # Below lower edge


class TestDispersion:

def setup(self):
class GiantPulseSetup:
def setup_class(self):
self.start_time = Time('2010-11-12T13:14:15')
self.sample_rate = 128. * u.kHz
self.shape = (164000, 2)
Expand All @@ -38,12 +38,16 @@ def setup(self):
# Time delay of 0.05 s over 128 kHz band.
self.dm = DispersionMeasure(1000.*0.05/0.039342251)

@classmethod
def make_giant_pulse(self, sh):
data = np.empty((sh.samples_per_frame,) + sh.shape[1:], sh.dtype)
do_gp = sh.tell() + np.arange(sh.samples_per_frame) == self.gp_sample
data[...] = do_gp[:, np.newaxis]
return data


class TestDispersion(GiantPulseSetup):

def test_time_delay(self):
time_delay = self.dm.time_delay(
self.gp.frequency - self.sample_rate / 2.,
Expand Down Expand Up @@ -189,8 +193,8 @@ def test_disperse_closing(self):
assert 'phase_factor' not in disperse.__dict__


class TestDispersionReal(TestDispersion):
def setup(self):
class GiantPulseSetupReal(GiantPulseSetup):
def setup_class(self):
self.start_time = Time('2010-11-12T13:14:15')
self.sample_rate = 256. * u.kHz
self.shape = (328000, 2)
Expand All @@ -207,6 +211,8 @@ def setup(self):
# Time delay of 0.05 s over 128 kHz band.
self.dm = DispersionMeasure(1000.*0.05/0.039342251)


class TestDispersionReal(TestDispersion, GiantPulseSetupReal):
# Override tests that do not simply work for the real data,
# since the sample rate is twice as high.
def test_time_delay(self):
Expand Down Expand Up @@ -287,3 +293,55 @@ def test_disperse_negative_dm(self):
# Lower sideband [1] is dedispersed to earlier.
assert p[10, 0] > 0.99 and p[9, 0] < 0.006
assert p[9, 1] > 0.99 and p[10, 1] < 0.006


class TestDispersSample(GiantPulseSetupReal):

@pytest.mark.parametrize('reference_frequency', REFERENCE_FREQUENCIES)
@pytest.mark.parametrize('frequency, sideband', [
(None, None),
([199.936, 200.064]*u.MHz, np.array((1, -1))), # Far off from normal.
([200.064, 199.936]*u.MHz, np.array((-1, -1)))])
def test_disperse_sample(self, reference_frequency, frequency, sideband):
disperse = DisperseSamples(self.gp, self.dm, frequency=frequency,
reference_frequency=reference_frequency,
sideband=sideband)

# Seek input time of the giant pulse, corrected to the reference
# frequency, and read around it.
center_frequency = (disperse.frequency
+ disperse.sideband * disperse.sample_rate / 2.)
time_delay = self.dm.time_delay(center_frequency,
disperse.reference_frequency)
t_gp = (self.start_time + self.gp_sample / self.sample_rate
+ time_delay)

disperse.seek(t_gp.min())
around_gp = disperse.read(self.gp_sample)

sample_shift_diff = ((time_delay.max() - time_delay.min())
* self.sample_rate)
sample_shift_diff = np.round(sample_shift_diff.to(u.one)).astype(int)
expected = np.zeros_like(around_gp)
expected[0, t_gp.argmin()] = 1.0
expected[sample_shift_diff, t_gp.argmax()] = 1.0
assert np.all(around_gp == expected)

@pytest.mark.parametrize('reference_frequency', [200 * u.MHz, 300 * u.MHz])
def test_disperse_roundtrip1(self, reference_frequency):
self.gp.seek(self.start_time + self.gp_sample / self.sample_rate)
self.gp.seek(-1024, 1)
gp = self.gp.read(2048)
# Set up dispersion as above, and check that one can invert
disperse = DisperseSamples(self.gp, self.dm,
reference_frequency=reference_frequency)
dedisperse = DedisperseSamples(disperse, self.dm,
reference_frequency=reference_frequency)
assert dedisperse.dm == self.dm
assert dedisperse._dm == -self.dm
dedisperse.seek(self.start_time + self.gp_sample / self.sample_rate)
dedisperse.seek(-1024, 1)
gp_dd = dedisperse.read(2048)
# Note: rounding errors mean this doesn't work perfectly.
assert np.any(gp_dd == 1.0)
assert np.all(gp_dd == gp)

0 comments on commit 41534cd

Please sign in to comment.