Skip to content

Commit

Permalink
Add a module and class for handling sky grids for `pycbc_multi_inspir…
Browse files Browse the repository at this point in the history
…al` (#4741)

* Add basic sky grid module and class

* Use SkyGrid in multi_inspiral

* Run black

* More cleanup

* Fix errors

* Make detectors and ref time properties of the instance (to be continued…)

* Use --instruments and --trigger-time

* CLI improvements

* Fix error

* Add angular units and convention
  • Loading branch information
titodalcanton authored Sep 12, 2024
1 parent 1641227 commit fc4c210
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 78 deletions.
115 changes: 37 additions & 78 deletions bin/pycbc_multi_inspiral
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import argparse
import numpy as np

from pycbc import (
detector,
fft,
init_logging,
inject,
Expand All @@ -49,49 +48,9 @@ from pycbc.io.hdf import HFile
from pycbc.filter import MatchedFilterControl
from pycbc.types import zeros, float32, complex64
from pycbc.vetoes import sgchisq
from pycbc.tmpltbank.sky_grid import SkyGrid


def sky_grid_from_cli(parser, args):
"""Read the sky grid or the single sky position given the CLI arguments."""
if args.sky_grid is not None and (
args.ra is not None or args.dec is not None
):
parser.error(
'Please provide either a sky grid via --sky-grid or a '
'single sky position via --ra and --dec, not both'
)
if args.sky_grid is not None:
with HFile(args.sky_grid, 'r') as sky_grid_file:
ra = sky_grid_file['ra'][:]
dec = sky_grid_file['dec'][:]
elif args.ra is not None and args.dec is not None:
ra = np.array([args.ra])
dec = np.array([args.dec])
else:
parser.error(
'Please specify a sky grid via --sky-grid '
'or a single position via --ra and --dec'
)
return np.array([ra, dec])


def calculate_antenna_pattern(args, sky_pos_indices):
"""Calculate the antenna pattern functions for all detectors and sky
positions.
"""
antenna_pattern = {}
for ifo in args.instruments:
curr_det = detector.Detector(ifo)
antenna_pattern[ifo] = [None] * len(sky_pos_indices)
for index in sky_pos_indices:
antenna_pattern[ifo][index] = curr_det.antenna_pattern(
sky_positions[0][index],
sky_positions[1][index],
polarization=0,
t_gps=t_gps,
)
return antenna_pattern

def slide_limiter(args):
'''
This function computes the number of shortslides used by the coherent
Expand Down Expand Up @@ -120,18 +79,29 @@ def slide_limiter(args):
time_init = time.time()
parser = argparse.ArgumentParser(description=__doc__)
add_common_pycbc_options(parser)
parser.add_argument("--output", type=str)
parser.add_argument(
"--output",
type=str,
required=True,
help="Path to the output file."
)
parser.add_argument(
"--instruments",
nargs="+",
type=str,
required=True,
help="List of instruments to analyze.",
)
parser.add_argument("--bank-file", type=str)
parser.add_argument(
"--bank-file",
type=str,
required=True,
help="Path to the template bank file."
)
parser.add_argument(
"--low-frequency-cutoff",
type=float,
required=True,
help="The low frequency cutoff to use for filtering (Hz).",
)
# Add approximant arg
Expand Down Expand Up @@ -244,16 +214,16 @@ parser.add_argument(
parser.add_argument(
"--ra",
type=float,
help="Right ascension of sky point to search (radians),",
help="Right ascension of sky point to search (radians).",
)
parser.add_argument(
"--dec", type=float, help="Declination of sky point to search (radians),"
"--dec", type=float, help="Declination of sky point to search (radians)."
)
parser.add_argument(
"--sky-grid",
type=str,
help="Sky-grid to search, an hdf file containing two datasets: "
"ra and dec, both in radians",
help="Path to an HDF file containing the sky grid to search over, "
"with two datasets, ra and dec, both in radians.",
)
parser.add_argument(
"--coinc-threshold",
Expand Down Expand Up @@ -338,8 +308,8 @@ parser.add_argument(
help="Size of each time slide shift.",
)
parser.add_argument(
"--do-shortslides",
action="store_true"
"--do-shortslides",
action="store_true"
)
# Add options groups
strain.insert_strain_option_group_multi_ifo(parser)
Expand Down Expand Up @@ -392,8 +362,7 @@ with ctx:
nifo = len(args.instruments[:])
flow = args.low_frequency_cutoff
t_gps = args.trigger_time
sky_positions = sky_grid_from_cli(parser, args)
sky_pos_indices = np.arange(sky_positions.shape[1])
sky_positions = SkyGrid.from_cli(parser, args)
# The following for loop is used to check whether
# the sampling rate, flen and tlen agree for all detectors
# taking the zeroth detector in the list as a reference.
Expand Down Expand Up @@ -448,36 +417,24 @@ with ctx:
# Given the time delays wrt to IFO 0 in time_slides, create a dictionary
# for time delay indices evaluated wrt the geocenter, in units of samples,
# i.e. (time delay from geocenter + time slide)*sampling_rate
time_delay_idx_zerolag = {
position_index: {
ifo: detector.Detector(ifo).time_delay_from_earth_center(
sky_positions[0][position_index],
sky_positions[1][position_index],
t_gps,
)
for ifo in args.instruments
}
for position_index in sky_pos_indices
}
time_delays_zerolag = sky_positions.calculate_time_delays()
time_delay_idx = {
slide: {
position_index: {
ifo: int(
round(
(
time_delay_idx_zerolag[position_index][ifo]
+ time_slides[ifo][slide]
)
* sample_rate
ifo: round(
(
time_delays_zerolag[ifo][position_index]
+ time_slides[ifo][slide]
)
* sample_rate
)
for ifo in args.instruments
}
for position_index in sky_pos_indices
for position_index in range(len(sky_positions))
}
for slide in slide_ids
}
del time_delay_idx_zerolag
del time_delays_zerolag

logging.info("Setting up MatchedFilterControl at each IFO")
# Prototype container for the output of MatchedFilterControl and
Expand Down Expand Up @@ -629,7 +586,7 @@ with ctx:
logging.info("Template bank size after thinning: %d", n_bank)

logging.info("Calculating antenna pattern functions at every sky position")
antenna_pattern = calculate_antenna_pattern(args, sky_pos_indices)
antenna_pattern = sky_positions.calculate_antenna_patterns()

logging.info("Starting the filtering...")
# Loop over templates
Expand Down Expand Up @@ -698,7 +655,9 @@ with ctx:
snr_dict[ifo] = (
snr_ts[matched_filter[ifo].segments[s_num].analyze] * norm
)
assert len(snr_dict[ifo]) > 0, f'SNR time series for {ifo} is empty'
assert (
len(snr_dict[ifo]) > 0
), f'SNR time series for {ifo} is empty'
norm_dict[ifo] = norm
corr_dict[ifo] = corr.copy()
idx[ifo] = ind.copy()
Expand All @@ -713,11 +672,11 @@ with ctx:
for slide in range(num_slides):
logging.info("Analyzing slide %d/%d", slide, num_slides)
# Loop over sky positions
for position_index in sky_pos_indices:
for position_index in range(len(sky_positions)):
logging.info(
"Analyzing sky position %d/%d",
position_index + 1,
len(sky_pos_indices),
len(sky_positions),
)
# Adjust the indices of triggers (if there are any)
# and store trigger indices list in a dictionary;
Expand Down Expand Up @@ -1084,10 +1043,10 @@ with ctx:
)
network_out_vals['nifo'] = [nifo] * num_events
network_out_vals['dec'] = [
sky_positions[1][position_index]
sky_positions.decs[position_index]
] * num_events
network_out_vals['ra'] = [
sky_positions[0][position_index]
sky_positions.ras[position_index]
] * num_events
network_out_vals['slide_id'] = [slide] * num_events
event_mgr.add_template_events_to_network(
Expand Down
136 changes: 136 additions & 0 deletions pycbc/tmpltbank/sky_grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Functionality for handling grids of points in the sky for coherent SNR
calculation via `pycbc_multi_inspiral`. The main operation to be performed on
these points is calculating the antenna pattern functions and time delays from
the Earth center for a network of detectors.
"""

import numpy as np
import h5py

from pycbc.detector import Detector


class SkyGrid:
def __init__(self, ra, dec, detectors, ref_gps_time):
"""Initialize a sky grid from a list of RA/dec coordinates.
Parameters
----------
ra: iterable of floats
Right ascensions for each point in radians, in the interval [0,2π).
dec: iterable of floats
Declination for each point in radians, where π/2 is the North pole,
-π/2 is the South pole, and 0 is the celestial equator.
detectors: iterable of str
List of detector names associated with the sky grid, typically the
detectors for which the grid has been placed or is to be placed.
The detectors will be used when calculating the antenna pattern
functions and time delays from Earth center.
ref_gps_time: float
Reference GPS time associated with the sky grid. This will be used
when calculating the antenna pattern functions and time delays from
Earth center.
"""
# We store the points in a 2D array internally, first dimension runs
# over the list of points, second dimension is RA/dec.
# Question: should we use Astropy sky positions instead?
self.positions = np.vstack([ra, dec]).T
self.detectors = sorted(detectors)
self.ref_gps_time = ref_gps_time

def __len__(self):
"""Returns the number of points in the sky grid."""
return self.positions.shape[0]

def __getitem__(self, index):
"""Returns the coordinates of a single point in the grid."""
return self.positions[index]

@property
def ras(self):
"""Returns all right ascensions in radians, in the interval [0,2π)."""
return self.positions[:, 0]

@property
def decs(self):
"""Returns all declinations in radians, where π/2 is the North pole,
-π/2 is the South pole, and 0 is the celestial equator."""
return self.positions[:, 1]

@classmethod
def from_cli(cls, cli_parser, cli_args):
"""Initialize a sky grid from command-line interface, via argparse
objects.
"""
if cli_args.sky_grid is not None:
if cli_args.ra is not None or cli_args.dec is not None:
cli_parser.error(
'Please provide either a sky grid via --sky-grid or a '
'single sky position via --ra and --dec, not both'
)
return cls.read_from_file(cli_args.sky_grid)
if cli_args.ra is not None and cli_args.dec is not None:
return cls(
[cli_args.ra],
[cli_args.dec],
cli_args.instruments,
cli_args.trigger_time
)
cli_parser.error(
'Please specify a sky grid via --sky-grid or a single sky '
'position via --ra and --dec'
)

@classmethod
def read_from_file(cls, path):
"""Initialize a sky grid from a given HDF5 file."""
with h5py.File(path, 'r') as hf:
ra = hf['ra'][:]
dec = hf['dec'][:]
detectors = hf.attrs['detectors']
ref_gps_time = hf.attrs['ref_gps_time']
return cls(ra, dec, detectors, ref_gps_time)

def write_to_file(self, path):
"""Writes a sky grid to an HDF5 file."""
with h5py.File(path, 'w') as hf:
hf['ra'] = self.ras
hf['dec'] = self.decs
hf.attrs['detectors'] = self.detectors
hf.attrs['ref_gps_time'] = self.ref_gps_time

def calculate_antenna_patterns(self):
"""Calculate the antenna pattern functions at each point in the grid
for the list of GW detectors specified at instantiation. Return a dict,
keyed by detector name, whose items are 2-dimensional Numpy arrays.
The first dimension of these arrays runs over the sky grid, and the
second dimension runs over the plus and cross polarizations.
"""
result = {}
for det_name in self.detectors:
det = Detector(det_name)
result[det_name] = np.empty((len(self), 2))
for i, (ra, dec) in enumerate(self):
result[det_name][i] = det.antenna_pattern(
ra, dec, 0, t_gps=self.ref_gps_time
)
return result

def calculate_time_delays(self):
"""Calculate the time delays from the Earth center to each GW detector
specified at instantiation, for each point in the grid. Return a dict,
keyed by detector name, whose items are 1-dimensional Numpy arrays
containing the time delays for each sky point.
"""
result = {}
for det_name in self.detectors:
det = Detector(det_name)
result[det_name] = np.empty(len(self))
for i, (ra, dec) in enumerate(self):
result[det_name][i] = det.time_delay_from_earth_center(
ra, dec, t_gps=self.ref_gps_time
)
return result


__all__ = ['SkyGrid']

0 comments on commit fc4c210

Please sign in to comment.