Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulser gating #241

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions baseband_tasks/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def _get_offsets(self, samples, precision=1.e-3, max_iter=10):
'This should not happen!')

shape = getattr(samples, 'shape', ())
raise ValueError
return offsets.round().astype(int).reshape(shape)

def _read_frame(self, frame_index):
Expand Down
162 changes: 162 additions & 0 deletions baseband_tasks/pulse_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Functions to produce pulse gated data(only with the pulse part)
"""
import numpy as np
import warnings
import astropy.units as u
from .base import getattr_if_none, SetAttribute
from .phases import Phase
from .shaping import GetSlice
from .integration import Integrate


class GatePulse:
"""Return the gated pulse data.

Parameters
----------
ih : `baseband` file handle or task stream reader
Input data stream
phase : callable
Should return pulse phases (with or without cycle count) for given
input time(s), passed in as an '~astropy.time.Time' object. The output
can be an `~astropy.units.Quantity` with angular units or a regular
array of float (in which case units of cycles are assumed).

gate : list of two float numbers
The start gate pulse phase and the end gate pulse phase. The order of
the input gate is given by `(start_gate, end_gate)`. Both gates should
be in between [0, 1]. If the `start_gate` is bigger than the `end_gate`,
it assumes that the gate is from (start_gate, end_gate + 1)
tol : `~astropy.units.Quantity`
The tolarence of the pulse phaes gating in the units of cycle. Tolarence
can not be bigger than 10% of the gate size. Default is 5% of the given
gate size.
pulse_period: `~astropy.unit.Quantity` object, optional
Input pulse period. If not given, it will search from the input
callable `phase` object. If given, it has to be in the unit of time.

"""
def __init__(self, ih, phase, gate, tol=None, pulse_period=None, dtype=None):
self.ih = ih
if pulse_period is None: # Get pulse period from phase class
try:
self.pulse_period = 1.0 / phase.apparent_spin_freq(ih.start_time)
except AttributeError:
raise ValueError("Can not find pulse period from the `phase` "
"class. Please provide a valid"
" `pulse_period`.")
else:
self.pulse_period = pulse_period.to(u.s)

self.dtype = getattr_if_none(ih, 'dtype', dtype)
# Use the phase searching code in the Integrate class.
self.phase = phase
self.gate = self.verify_gate(gate)
self.samples_per_period = int(self.pulse_period * self.ih.sample_rate)
self.phase_per_sample = 1.0 * u.cycle / self.samples_per_period
# Check the tolerence range, tolerance has to smaller than 10% of gate size
if tol is None:
tol = (self.gate[1] - self.gate[0]) / 20
assert tol > 0
assert tol < (self.gate[1] - self.gate[0]) / 10
self.tol = tol
# Compute the number of samples in the tolerance phase, which means, the
# Gated data will be accurate to `self.tol_sample` of samples.
self.tol_sample = tol / self.phase_per_sample
if self.tol_sample < 1:
warnings.warn("Tolarence is smaller than one input time sample. "
"Use one time sample as the tolarence and the edge of"
"the gate will not as accurate as requested.")
self.tol_sample = np.ceil(self.tol_sample)
self.tol = self.tol_sample * self.phase_per_sample
self.tol_sample = self.tol_sample.astype(int)
self.pulse_offset = 0
self.gate_offsets = self.get_gate_offsets()

def read(self, gate_index=None):
"""Read the next pulse.

Parameter
---------
pulse_index: int, optional
The index of gate to read. If not provide, it reads the next pulse
from the current pulse_offset.
"""
if gate_index is None:
gate_index = self.pulse_offset

if gate_index >= len(self.gate_offsets[0]):
raise ValueError("The requested gate index is beyond the total data"
"stream.")

gsh = GetSlice(self.ih, slice(self.gate_offsets[0][gate_index],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make a new get_slice method that returns this slice (everything from the top of this method to here) and then use that for read?

self.gate_offsets[1][gate_index]))
data = gsh.read()
self.pulse_offset = gate_index + 1
return data, gsh
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override __getitem__ to give output of get_slice if the item is an integer?


def next_nsample_phase(self, n_samples):
"""Compute the pulse phase from the current offset with a resolution of input phase tolerance

Parameters
----------
n_sample: int
The phase for next n upstream samples.

"""
# Compute the data time axis
#
time_axis = (np.arange(0, n_samples,
self.tol_sample) / self.ih.sample_rate)
time_axis = self.ih.time + time_axis
pulse_phase = self.phase(time_axis)
# Search the gate
return time_axis, pulse_phase

def get_gate_offsets(self):
"""Get the offsets for the gate time.

Phase is assumed to increase monotonously with time.
"""
n_sample = self.ih.shape[0] - self.ih.offset
times, phase = self.next_nsample_phase(n_sample)
n_cycles = phase.int[-1] - phase.int[0]
# Find gates for each period
start_phase_int = np.modf(phase[0].value)[1]
# The data's start phase does not cover the whole gate, ignore, and go
# to the next phase.
if start_phase_int * u.cycle + self.gate[0] < phase[0]:
start_phase_int += 1
end_phase_int = np.modf(phase[-1].value)[1]
search_gate = np.arange(start_phase_int, end_phase_int + 1)
search_gate = (Phase(np.broadcast_to(search_gate,
(2, len(search_gate)))) +
Phase(self.gate).reshape(2,1))
gate_idx = np.searchsorted(phase.value, search_gate.value)
# Cut off the gates that is beyond the total samples which is created
# by search gate.
cut_off = min(np.searchsorted(gate_idx[0], len(times)),
np.searchsorted(gate_idx[1], len(times)))
# if self.gate[0].value == 0.8:
# raise ValueError
gate_idx = gate_idx[:, 0:cut_off]
gate_times = times[gate_idx]
# Map the gate_times to the upstream samples
gate_offsets = (((gate_times - self.ih.time) *
self.ih.sample_rate).to(u.one)).astype(int)
return gate_offsets

def verify_gate(self, gate):
assert len(gate) == 2
if gate[0] > gate[1]: #
gate[1] += 1
assert gate[1] - gate[0] < 1
# Normalize gate to 0 to 1
result = np.array(gate) - np.modf(gate[0])[1]
if not hasattr(result, 'unit'):
return result * u.cycle
else:
return result.to(u.cycle)

def close(self):
self.ih.close()
124 changes: 124 additions & 0 deletions baseband_tasks/tests/test_pulse_gating.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Test script for pulsar gating
"""
import os
import pytest
import numpy as np
import astropy.units as u
from astropy.time import Time

from baseband_tasks.pulse_gate import GatePulse
from baseband_tasks.phases import PolycoPhase, Phase
from baseband_tasks.generators import StreamGenerator


test_data = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')


class TestPulseGating:
def setup(self):
self.start_time = Time('2018-05-06T23:00:00', format='isot',
scale='utc')
self.polyco_file = os.path.join(test_data, 'B1937_polyco.dat')
self.polyco = PolycoPhase(self.polyco_file)
self.sample_rate = 128. * u.kHz
self.shape = (164000, 2)
#self.gp_sample = 64000
self.P0 = 31.4 * u.ms
self.P1 = 2.01 * u.us / u.s
self.pulse_phase_location = 0.21 * u.cycle
self.ps = StreamGenerator(self.make_pulses,
shape=self.shape, start_time=self.start_time,
sample_rate=self.sample_rate,
samples_per_frame=1000, dtype=np.float32,
frequency=[299.936, 300.064]*u.MHz,
sideband=np.array((1, -1)))

# @classmethod
# def make_giant_pulse(self, sh):
# data = np.empty((sh.samples_per_frame,) + sh.shape[1:], sh.dtype)
# do_gp = sh.tell() + np.arange(sh.samples_per_frame) == self.gp_sample
# data[...] = do_gp[:, np.newaxis]
# return data

def simple_phase(self, t):
F0 = 1 / self.P0
F1 = -self.P1 * F0 ** 2
dt = (t - self.start_time).to(u.s)
phase = F0 * dt + 0.5 * F1 * dt ** 2
return Phase(phase.to_value(u.one))

def make_pulses(self, sh):
data = np.zeros((sh.samples_per_frame,) + sh.shape[1:], sh.dtype)
times = sh.time + np.arange(sh.samples_per_frame) / sh.sample_rate
phase = self.simple_phase(times)
# Search for the phase where pulse happens
offser_from_pulse = np.abs(phase.frac - self.pulse_phase_location)
tol = 1.0 / (self.P0 * sh.sample_rate) * u.cycle
pulse_sample = np.where(offser_from_pulse < tol)[0]
data[pulse_sample, ...] = 1.0
return data

@pytest.mark.parametrize('gate', [[0.2, 0.5], [1.3, 1.4], [0.8, 0.2],
[3.8, 3.2]])
@pytest.mark.parametrize('tol', [None, 0.001 * u.cycle])
@pytest.mark.parametrize('pulse_period', [None, 1.6 * u.ms, 2*u.min])
@pytest.mark.filterwarnings("ignore:Tolarence is smaller")
def test_pulse_gate_build(self, gate, tol, pulse_period):
gate_pulse = GatePulse(self.ps, self.polyco, gate, tol, pulse_period)
assert gate_pulse.gate[0] == (gate[0] - np.modf(gate[0])[1]) * u.cycle
if gate[0] > gate[1]:
assert gate_pulse.gate[1] == (gete[1] + 1 - np.modf(gate[0])[1]) * u.cycle

@pytest.mark.parametrize('tol', [None, 0.001 * u.cycle])
@pytest.mark.parametrize('pulse_period', [None, 1.6 * u.ms])
@pytest.mark.filterwarnings("ignore:Tolarence is smaller")
def test_computing_phase(self, tol, pulse_period):
gate = [0.34, 0.48]
gate_pulse = GatePulse(self.ps, self.polyco, gate, tol, pulse_period)
phase = gate_pulse.next_nsample_phase(self.shape[0])[1]
phase_diff = np.diff(phase.frac)
#assert np.all(np.isclose(phase_diff[phase_diff > 0],
# gate_pulse.tol * u.cycle))
@pytest.mark.filterwarnings("ignore:Tolarence is smaller")
@pytest.mark.parametrize('gate', [[0.2, 0.5], [1.3, 1.4], [0.8, 0.2],
[3.8, 3.2]])
def test_get_offset(self, gate):
gate_pulse = GatePulse(self.ps, self.polyco, gate)
gate_offsets = gate_pulse.get_gate_offsets()
# Compute the gate time
gate_times = (gate_pulse.ih.time + gate_pulse.ih.offset + gate_offsets /
gate_pulse.ih.sample_rate)
gate_phase = gate_pulse.phase(gate_times)
# Tolerance is higher than the phase resolution
if gate_pulse.tol_sample == 1:
# Since the phase per each sample is averaged, the one sample of
# accurate will not be possible. We relax our tolerance 50%.
tol = gate_pulse.tol * 1.5
else:
tol = gate_pulse.tol
# If the fraction part is negative add 1
gate_phase.frac.value[gate_phase.frac.value < 0] += 1
assert np.all(np.isclose(gate_phase[0].frac.value,
gate_pulse.gate[0].value,
atol=tol.value))
compare_gate_end = gate_phase[1].frac.value
if gate_pulse.gate[1].value > 1:
compare_gate_end += 1
assert np.all(np.isclose(compare_gate_end,
gate_pulse.gate[1].value,
atol=tol.value))

@pytest.mark.parametrize('pulse_number', [None, 3, 5, 20, -4, -1])
def test_read_gated_pulse(self, pulse_number):
gate_pulse = GatePulse(self.ps, self.simple_phase, [0.2, 0.25],
pulse_period=self.P0)
pulse, gsh = gate_pulse.read(pulse_number)
pulse_pos = np.where(pulse == 1.0)
time_axis = gsh.time - np.arange(pulse.shape[0])[::-1] / gsh.sample_rate
phase = self.simple_phase(time_axis)
assert np.all(np.isclose(phase[0].frac, gate_pulse.gate[0],
atol=gate_pulse.tol))
assert np.all(np.isclose(phase[-1].frac, gate_pulse.gate[1],
atol=gate_pulse.tol))
assert np.all(np.isclose(phase[pulse_pos[0]].frac,
self.pulse_phase_location, atol=gate_pulse.tol))