From b51de966ebb1b42677724dd55e30a73ddbddaffb Mon Sep 17 00:00:00 2001 From: Joshita Date: Tue, 8 Apr 2025 01:59:57 -0400 Subject: [PATCH] added type hints in powerspectrum.py --- stingray/powerspectrum.py | 279 ++++++++++++++++++++++---------------- 1 file changed, 165 insertions(+), 114 deletions(-) diff --git a/stingray/powerspectrum.py b/stingray/powerspectrum.py index 947b81fc2..2afed6865 100755 --- a/stingray/powerspectrum.py +++ b/stingray/powerspectrum.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import warnings from collections.abc import Generator, Iterable @@ -5,6 +7,7 @@ import scipy import scipy.optimize import scipy.stats +from typing import TYPE_CHECKING, Optional, Union, Tuple, List from stingray.crossspectrum import AveragedCrossspectrum, Crossspectrum, DynamicalCrossspectrum from stingray.stats import pds_probability, amplitude_upper_limit @@ -19,6 +22,15 @@ from .fourier import poisson_level from .fourier import get_rms_from_unnorm_periodogram +if TYPE_CHECKING: + from astropy.table import Table + import numpy.typing as npt + from stingray.timeseries import Timeseries + from .lightcurve import Lightcurve + from .events import EventList + + TData = Union[Lightcurve, EventList, List[Lightcurve], None] + __all__ = ["Powerspectrum", "AveragedPowerspectrum", "DynamicalPowerspectrum"] @@ -97,7 +109,15 @@ class Powerspectrum(Crossspectrum): """ - def __init__(self, data=None, norm="frac", gti=None, dt=None, lc=None, skip_checks=False): + def __init__( + self, + data: Optional[TData] = None, + norm: str = "frac", + gti: Optional[np.ndarray] = None, + dt: Optional[float] = None, + lc: Optional[Lightcurve] = None, + skip_checks: bool = False, + ) -> None: self._type = None if lc is not None: warnings.warn( @@ -121,7 +141,9 @@ def __init__(self, data=None, norm="frac", gti=None, dt=None, lc=None, skip_chec return self._initialize_from_any_input(data, dt=dt, norm=norm) - def rebin(self, df=None, f=None, method="mean"): + def rebin( + self, df: Optional[float] = None, f: Optional[float] = None, method: str = "mean" + ) -> Powerspectrum: """ Rebin the power spectrum. @@ -146,7 +168,9 @@ def rebin(self, df=None, f=None, method="mean"): return bin_ps - def compute_rms(self, min_freq, max_freq, poisson_noise_level=None): + def compute_rms( + self, min_freq: float, max_freq: float, poisson_noise_level: Optional[float] = None + ) -> Tuple[float, float]: """ Compute the fractional rms amplitude in the power spectrum between two frequencies. @@ -212,7 +236,7 @@ def compute_rms(self, min_freq, max_freq, poisson_noise_level=None): return rms, rmse - def _rms_error(self, powers): + def _rms_error(self, powers: Union[np.ndarray, Iterable[float]]) -> float: r""" Compute the error on the fractional rms amplitude using error propagation. @@ -250,7 +274,9 @@ def _rms_error(self, powers): delta_rms = sq_sum_err * drms_dp return delta_rms - def classical_significances(self, threshold=1, trial_correction=False): + def classical_significances( + self, threshold: float = 1, trial_correction: bool = False + ) -> np.ndarray: """ Compute the classical significances for the powers in the power spectrum, assuming an underlying noise distribution that follows a @@ -329,7 +355,9 @@ def classical_significances(self, threshold=1, trial_correction=False): return pvals - def modulation_upper_limit(self, fmin=None, fmax=None, c=0.95): + def modulation_upper_limit( + self, fmin: Optional[float] = None, fmax: Optional[float] = None, c: float = 0.95 + ) -> float: r""" Upper limit on a sinusoidal modulation. @@ -414,8 +442,14 @@ def modulation_upper_limit(self, fmin=None, fmax=None, c=0.95): @staticmethod def from_time_array( - times, dt, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True - ): + times: np.ndarray, + dt: float, + segment_size: Optional[float] = None, + gti: Optional[np.ndarray] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + ) -> Powerspectrum: """ Calculate an average power spectrum from an array of event times. @@ -464,15 +498,15 @@ def from_time_array( @staticmethod def from_events( - events, - dt, - segment_size=None, - gti=None, - norm="frac", - silent=False, - use_common_mean=True, - save_all=False, - ): + events: EventList, + dt: float, + segment_size: Optional[float] = None, + gti: Optional[np.ndarray] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, + ) -> Powerspectrum: """ Calculate an average power spectrum from an event list. @@ -525,16 +559,16 @@ def from_events( @staticmethod def from_stingray_timeseries( - ts, - flux_attr, - error_flux_attr=None, - segment_size=None, - norm="none", - silent=False, - use_common_mean=True, - gti=None, - save_all=False, - ): + ts: Timeseries, + flux_attr: str, + error_flux_attr: Optional[str] = None, + segment_size: Optional[float] = None, + norm: str = "none", + silent: bool = False, + use_common_mean: bool = True, + gti: Optional[np.ndarray] = None, + save_all: bool = False, + ) -> Powerspectrum: """Calculate AveragedPowerspectrum from a time series. Parameters @@ -586,14 +620,14 @@ def from_stingray_timeseries( @staticmethod def from_lightcurve( - lc, - segment_size=None, - gti=None, - norm="frac", - silent=False, - use_common_mean=True, - save_all=False, - ): + lc: Lightcurve, + segment_size: Optional[float] = None, + gti: Optional[np.ndarray] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, + ) -> Powerspectrum: """ Calculate a power spectrum from a light curve. @@ -645,15 +679,15 @@ def from_lightcurve( @staticmethod def from_lc_iterable( - iter_lc, - dt, - segment_size=None, - gti=None, - norm="frac", - silent=False, - use_common_mean=True, - save_all=False, - ): + iter_lc: Union[Iterable[Lightcurve], Iterable[np.ndarray]], + dt: float, + segment_size: Optional[float] = None, + gti: Optional[np.ndarray] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, + ) -> Powerspectrum: """ Calculate the average power spectrum of an iterable collection of light curves. @@ -706,15 +740,15 @@ def from_lc_iterable( def _initialize_from_any_input( self, - data, - dt=None, - segment_size=None, - gti=None, - norm="frac", - silent=False, - use_common_mean=True, - save_all=False, - ): + data: TData, + dt: Optional[float] = None, + segment_size: Optional[float] = None, + gti: Optional[np.ndarray] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, + ) -> None: """ Initialize the class, trying to understand the input types. @@ -767,7 +801,7 @@ def _initialize_from_any_input( setattr(self, key, val) return - def _initialize_empty(self): + def _initialize_empty(self) -> None: """Set all attributes to None.""" self.freq = None self.power = None @@ -874,18 +908,18 @@ class AveragedPowerspectrum(AveragedCrossspectrum, Powerspectrum): def __init__( self, - data=None, - segment_size=None, - norm="frac", - gti=None, - silent=False, - dt=None, - lc=None, - large_data=False, - save_all=False, - skip_checks=False, - use_common_mean=True, - ): + data: Optional[Union[TData, Generator]] = None, + segment_size: Optional[float] = None, + norm: str = "frac", + gti: Optional[np.ndarray] = None, + silent: bool = False, + dt: Optional[float] = None, + lc: Optional[Lightcurve] = None, + large_data: bool = False, + save_all: bool = False, + skip_checks: bool = False, + use_common_mean: bool = True, + ) -> None: self._type = None if lc is not None: warnings.warn( @@ -940,7 +974,7 @@ def __init__( save_all=save_all, ) - def initial_checks(self, *args, **kwargs): + def initial_checks(self, *args, **kwargs) -> bool: return AveragedCrossspectrum.initial_checks(self, *args, **kwargs) @@ -1020,7 +1054,14 @@ class DynamicalPowerspectrum(DynamicalCrossspectrum): The number of averaged cross spectra. """ - def __init__(self, lc=None, segment_size=None, norm="frac", gti=None, sample_time=None): + def __init__( + self, + lc: Optional[Union[Lightcurve, EventList]] = None, + segment_size: Optional[float] = None, + norm: str = "frac", + gti: Optional[np.ndarray] = None, + sample_time: Optional[float] = None, + ) -> None: self.segment_size = segment_size self.sample_time = sample_time self.gti = gti @@ -1048,7 +1089,9 @@ def __init__(self, lc=None, segment_size=None, norm="frac", gti=None, sample_tim self._make_matrix(lc) - def shift_and_add(self, f0_list, nbins=100, rebin=None): + def shift_and_add( + self, f0_list: List[float], nbins: int = 100, rebin: Optional[int] = None + ) -> AveragedPowerspectrum: """Shift-and-add the dynamical power spectrum. This is the basic operation for the shift-and-add operation used to track @@ -1099,7 +1142,7 @@ def shift_and_add(self, f0_list, nbins=100, rebin=None): f0_list, nbins=nbins, output_obj_type=AveragedPowerspectrum, rebin=rebin ) - def _make_matrix(self, lc): + def _make_matrix(self, lc: Union[Lightcurve, EventList]) -> None: """ Create a matrix of powers for each time step and each frequency step. @@ -1136,10 +1179,10 @@ def _make_matrix(self, lc): def power_colors( self, - freq_edges=[1 / 256, 1 / 32, 0.25, 2.0, 16.0], - freqs_to_exclude=None, - poisson_power=None, - ): + freq_edges: Iterable[float] = [1 / 256, 1 / 32, 0.25, 2.0, 16.0], + freqs_to_exclude: Optional[Union[List[float], List[List[float]]]] = None, + poisson_power: Optional[Union[float, Iterable[float]]] = None, + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Return the power colors of the dynamical power spectrum. @@ -1184,15 +1227,15 @@ def power_colors( def powerspectrum_from_time_array( - times, - dt, - segment_size=None, - gti=None, - norm="frac", - silent=False, - use_common_mean=True, - save_all=False, -): + times: npt.NDArray[np.floating], + dt: float, + segment_size: Optional[float] = None, + gti: Optional[npt.NDArray[np.floating]] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, +) -> Union[Powerspectrum, AveragedPowerspectrum]: """ Calculate a power spectrum from an array of event times. @@ -1254,15 +1297,15 @@ def powerspectrum_from_time_array( def powerspectrum_from_events( - events, - dt, - segment_size=None, - gti=None, - norm="frac", - silent=False, - use_common_mean=True, - save_all=False, -): + events: EventList, + dt: float, + segment_size: Optional[float] = None, + gti: Optional[npt.NDArray[np.floating]] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, +) -> Union[Powerspectrum, AveragedPowerspectrum]: """ Calculate a power spectrum from an event list. @@ -1324,8 +1367,14 @@ def powerspectrum_from_events( def powerspectrum_from_lightcurve( - lc, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True, save_all=False -): + lc: Lightcurve, + segment_size: Optional[float] = None, + gti: Optional[npt.NDArray[np.floating]] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, +) -> Union[Powerspectrum, AveragedPowerspectrum]: """ Calculate a power spectrum from a light curve @@ -1392,16 +1441,16 @@ def powerspectrum_from_lightcurve( def powerspectrum_from_timeseries( - ts, - flux_attr, - error_flux_attr=None, - segment_size=None, - norm="none", - silent=False, - use_common_mean=True, - gti=None, - save_all=False, -): + ts: Timeseries, + flux_attr: str, + error_flux_attr: Optional[str] = None, + segment_size: Optional[float] = None, + norm: str = "none", + silent: bool = False, + use_common_mean: bool = True, + gti: Optional[npt.NDArray[np.floating]] = None, + save_all: bool = False, +) -> Union[Powerspectrum, AveragedPowerspectrum]: """Calculate power spectrum from a time series Parameters @@ -1472,15 +1521,15 @@ def powerspectrum_from_timeseries( def powerspectrum_from_lc_iterable( - iter_lc, - dt, - segment_size=None, - gti=None, - norm="frac", - silent=False, - use_common_mean=True, - save_all=False, -): + iter_lc: Iterable[Union[Lightcurve, npt.NDArray[np.floating]]], + dt: float, + segment_size: Optional[float] = None, + gti: Optional[npt.NDArray[np.floating]] = None, + norm: str = "frac", + silent: bool = False, + use_common_mean: bool = True, + save_all: bool = False, +) -> Union[Powerspectrum, AveragedPowerspectrum]: """ Calculate an average power spectrum from an iterable collection of light curves. @@ -1566,7 +1615,9 @@ def iterate_lc_counts(iter_lc): return _create_powerspectrum_from_result_table(table, force_averaged=force_averaged) -def _create_powerspectrum_from_result_table(table, force_averaged=False): +def _create_powerspectrum_from_result_table( + table: Table, force_averaged: bool = False +) -> Union[Powerspectrum, AveragedPowerspectrum]: """ Copy the columns and metadata from the results of ``stingray.fourier.avg_pds_from_XX`` functions into