Skip to content

Commit

Permalink
Update the spin functions (#37)
Browse files Browse the repository at this point in the history
* Update the spin functions

* PEP8

* Update the function estimating the number of oppositions, and add convergence flag for the least square

* PEP8

* documentation

* Bump versiob
  • Loading branch information
JulienPeloton authored Mar 30, 2023
1 parent 4891807 commit c4d23cd
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 33 deletions.
2 changes: 1 addition & 1 deletion fink_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.10.1"
__version__ = "0.11.0"
179 changes: 147 additions & 32 deletions fink_utils/sso/spins.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from scipy.optimize import least_squares
from scipy import linalg

from fink_utils.test.tester import regular_unit_tests

def func_hg(ph, h, g):
""" Return f(H, G) part of the lightcurve in mag space
Expand Down Expand Up @@ -186,6 +188,43 @@ def estimate_sso_params(
Error estimates on popt elements
chi2_red: float
Reduced chi2
Example
---------
>>> import io
>>> import requests
>>> import pandas as pd
>>> r = requests.post(
... 'https://fink-portal.org/api/v1/sso',
... json={
... 'n_or_d': '1465',
... 'withEphem': True,
... 'output-format': 'json'
... }
... )
# Extract relevant information
>>> pdf = pd.read_json(io.BytesIO(r.content))
# Add color correction in the DataFrame
>>> pdf = add_ztf_color_correction(pdf, combined=True)
>>> bounds = ([0, 0, 0, 1e-1, 0, -np.pi/2], [30, 1, 1, 1, 2*np.pi, np.pi/2])
>>> popt, perr, chisq_red = estimate_sso_params(pdf, func=func_hg1g2_with_spin, bounds=bounds)
>>> assert popt is not None, "Fit failed!"
>>> bounds = ([0, 0, 0], [30, 1, 1])
>>> popt, perr, chisq_red = estimate_sso_params(pdf, func=func_hg1g2, bounds=bounds)
>>> assert popt is not None, "Fit failed!"
>>> bounds = ([0, 0], [30, 1])
>>> popt, perr, chisq_red = estimate_sso_params(pdf, func=func_hg12, bounds=bounds)
>>> assert popt is not None, "Fit failed!"
>>> bounds = ([0, 0], [30, 1])
>>> popt, perr, chisq_red = estimate_sso_params(pdf, func=func_hg, bounds=bounds)
>>> assert popt is not None, "Fit failed!"
"""
ydata = pdf['i:magpsf_red'] + pdf['color_corr']

Expand All @@ -197,15 +236,19 @@ def estimate_sso_params(

if func.__name__ == 'func_hg1g2_with_spin':
nparams = 6
assert len(bounds[0]) == nparams, "You need to specify bounds on all (H, G1, G2, R, alpha, beta) parameters"
x = pha
elif func.__name__ == 'func_hg1g2':
nparams = 3
assert len(bounds[0]) == nparams, "You need to specify bounds on all (H, G1, G2) parameters"
x = alpha
elif func.__name__ == 'func_hg12':
nparams = 2
assert len(bounds[0]) == nparams, "You need to specify bounds on all (H, G12) parameters"
x = alpha
elif func.__name__ == 'func_hg':
nparams = 2
assert len(bounds[0]) == nparams, "You need to specify bounds on all (H, G) parameters"
x = alpha

if not np.alltrue([i == i for i in ydata.values]):
Expand All @@ -215,6 +258,7 @@ def estimate_sso_params(
return popt, perr, chisq_red

try:
# TODO: incorporate jacobian and error estimates
popt, pcov = curve_fit(
func,
x,
Expand Down Expand Up @@ -298,16 +342,45 @@ def build_eqs_for_spins(x, filters=[], ph=[], ra=[], dec=[], rhs=[]):

return np.ravel(eqs)

def estimate_combined_sso_params(
pdf,
def estimate_hybrid_sso_params(
magpsf_red, sigmapsf, phase, ra, dec, filters,
p0=[15.0, 0.0, 0.0, 0.5, np.pi, 0.0],
bounds=([0, 0, 0, 1e-6, 0, -np.pi / 2], [30, 1, 1, 1, 2 * np.pi, np.pi / 2])):
""" Fit for phase curve parameters
""" Fit for phase curve parameters (R, alpha, delta, H^b, G_1^b, G_2^b)
Code for quality `fit`:
0: success
1: bad_vals
2: MiriadeFail
3: RunTimError
4: LinalgError
Code for quality `status` (least square convergence):
-2: failure
-1 : improper input parameters status returned from MINPACK.
0 : the maximum number of function evaluations is exceeded.
1 : gtol termination condition is satisfied.
2 : ftol termination condition is satisfied.
3 : xtol termination condition is satisfied.
4 : Both ftol and xtol termination conditions are satisfied.
Typically, one would only trust status = 2 and 4.
Parameters
----------
pdf: pd.DataFrame
Contain Fink data + ephemerides
magpsf_red: array
Reduced magnitude, that is m_obs - 5 * np.log10('Dobs' * 'Dhelio')
sigmapsf: array
Error estimates on magpsf_red
phase: array
Phase angle [rad]
ra: array
Right ascension [rad]
dec: array
Declination [rad]
filters: array
Filter name for each measurement
p0: list
Initial guess for [H, G1, G2, R, alpha, beta]. Note that even if
there is several bands `b`, we take the same initial guess for all (H^b, G1^b, G2^b).
Expand All @@ -321,34 +394,56 @@ def estimate_combined_sso_params(
outdic: dict
Dictionary containing reduced chi2, and estimated parameters and
error on each parameters.
"""
ydata = pdf['i:magpsf_red']
alpha = np.deg2rad(pdf['Phase'].values)
ra = np.deg2rad(pdf['i:ra'].values)
dec = np.deg2rad(pdf['i:dec'].values)
filters = np.unique(pdf['i:fid'])
Example
---------
>>> import io
>>> import requests
>>> import pandas as pd
>>> r = requests.post(
... 'https://fink-portal.org/api/v1/sso',
... json={
... 'n_or_d': '1465',
... 'withEphem': True,
... 'output-format': 'json'
... }
... )
# Extract relevant information
>>> pdf = pd.read_json(io.BytesIO(r.content))
>>> magpsf_red = pdf['i:magpsf_red'].values
>>> sigmapsf = pdf['i:sigmapsf'].values
>>> phase = np.deg2rad(pdf['Phase'].values)
>>> ra = np.deg2rad(pdf['i:ra'].values)
>>> dec = np.deg2rad(pdf['i:dec'].values)
>>> filters = pdf['i:fid'].values
>>> outdic = estimate_hybrid_sso_params(magpsf_red, sigmapsf, phase, ra, dec, filters)
>>> assert outdic['fit'] == 0, "Fit failed with code {}!".format(outdic['fit'])
>>> assert outdic['status'] == 2, "Convergence failed with code {}!".format(outdic['status'])
"""
ufilters = np.unique(filters)

params = ['R', 'alpha0', 'delta0']
spin_params = ['H', 'G1', 'G2']
for filt in filters:
for filt in ufilters:
spin_params_with_filt = [i + '_{}'.format(str(filt)) for i in spin_params]
params = np.concatenate((params, spin_params_with_filt))

initial_guess = p0[3:]
for filt in filters:
for filt in ufilters:
initial_guess = np.concatenate((initial_guess, p0[:3]))

lower_bounds = bounds[0][3:]
upper_bounds = bounds[1][3:]
for filt in filters:
for filt in ufilters:
lower_bounds = np.concatenate((lower_bounds, bounds[0][:3]))
upper_bounds = np.concatenate((upper_bounds, bounds[1][:3]))

if not np.alltrue([i == i for i in ydata.values]):
outdic = {'chi2red': None}
for i in range(len(params)):
outdic[params[i]] = {'value': None, 'err': None}
if not np.alltrue([i == i for i in magpsf_red]):
outdic = {'fit': 1, 'status': -2}
return outdic

try:
Expand All @@ -358,31 +453,51 @@ def estimate_combined_sso_params(
bounds=(lower_bounds, upper_bounds),
jac='2-point',
loss='linear',
args=(pdf['i:fid'].values, alpha, ra, dec, ydata.values)
args=(filters, phase, ra, dec, magpsf_red)
)

popt = res_lsq.x
except RuntimeError:
outdic = {'fit': 3, 'status': -2}
return outdic

popt = res_lsq.x

# estimate covariance matrix using the jacobian
# estimate covariance matrix using the jacobian
try:
cov = linalg.inv(res_lsq.jac.T @ res_lsq.jac)
chi2dof = np.sum(res_lsq.fun**2) / (res_lsq.fun.size - res_lsq.x.size)
cov *= chi2dof

# 1sigma uncertainty on fitted parameters
perr = np.sqrt(np.diag(cov))
except np.linalg.LinAlgError:
# raised if jacobian is degenerated
outdic = {'fit': 4, 'status': res_lsq.status}
return outdic

# For the chi2, we use the error estimate from the data directly
chisq = np.sum((res_lsq.fun / pdf['i:sigmapsf'])**2)
chisq_red = chisq / (res_lsq.fun.size - res_lsq.x.size - 1)
rms = np.sqrt(np.mean(res_lsq.fun**2))

outdic = {'chi2red': chisq_red}
for i in range(len(params)):
outdic[params[i]] = {'value': popt[i], 'err': perr[i]}
# For the chi2, we use the error estimate from the data directly
chisq = np.sum((res_lsq.fun / sigmapsf)**2)
chisq_red = chisq / (res_lsq.fun.size - res_lsq.x.size - 1)

except RuntimeError as e:
print(e)
outdic = {'chi2red': None}
for i in range(len(params)):
outdic[params[i]] = {'value': None, 'err': None}
outdic = {
'chi2red': chisq_red,
'rms': rms,
'minphase': np.min(phase),
'maxphase': np.max(phase),
'status': res_lsq.status,
'fit': 0
}
for i in range(len(params)):
outdic[params[i]] = popt[i]
outdic['err' + params[i]] = perr[i]

return outdic


if __name__ == "__main__":
"""Execute the unit test suite"""

# Run the Spark test suite
regular_unit_tests(globals())
94 changes: 94 additions & 0 deletions fink_utils/sso/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from astropy.time import Time
import astropy.units as u

from scipy import signal

from fink_utils.test.tester import regular_unit_tests

def query_miriade(ident, jd, observer='I41', rplane='1', tcoor=5, shift=15.):
""" Gets asteroid or comet ephemerides from IMCCE Miriade for a suite of JD for a single SSO
Expand Down Expand Up @@ -287,3 +291,93 @@ def get_miriade_data(pdf, observer='I41', rplane='1', tcoor=5, withecl=True, met
info_out = infos[0]

return info_out

def is_peak(x, y, xpeak, band=50):
""" Estimate if `xpeak` corresponds to a true extremum for a periodic signal `y`
Assuming `y` a sparse signal along `x`, we would first estimate
the period of the signal assuming a sine wave. We would then generate
predictions, and locate the extrema of the sine.
But this first step would generate false positives:
1. As `y` is sparse, some extrema will not coincide with measurements
2. As the signal is not a perfect sine, the fitted signal might shift
from the real signal after several periods.
This function is an extremely quick and dirty attempt to reduce false
positives by looking at the data around a fitted peak, and
estimating if the peak is real:
1. take a band around the peak, and look if data is present
2. if data is present, check the data is above the mean
Parameters
----------
xpeak: int
Candidate peak position
x: array
Array of times
y: array
Array of elongation
band: optional, int
Bandwidth in units of x
Returns
----------
out: bool
True if `xpeak` corresponds to the location of a peak.
False otherwise.
"""
xband = np.where((x > xpeak - band) & (x < xpeak + band))[0]
if (len(xband) >= 10) and (np.mean(y[xband]) > np.mean(y)):
return True
return False

def get_num_opposition(elong, width=4):
""" Estimate the number of opposition according to the solar elongation
Under the hood, it assumes `elong` is peroidic, and uses a periodogram.
Parameters
----------
elong: array
array of solar elongation corresponding to jd
width: optional, int
width of peaks in samples.
Returns
----------
nopposition: int
Number of oppositions estimate
Examples
----------
>>> import io
>>> import requests
>>> import pandas as pd
>>> r = requests.post(
... 'https://fink-portal.org/api/v1/sso',
... json={
... 'n_or_d': '8467',
... 'withEphem': True,
... 'output-format': 'json'
... }
... )
>>> pdf = pd.read_json(io.BytesIO(r.content))
# estimate number of oppositions
>>> noppositions = get_num_opposition(
... pdf['Elong.'].values,
... width=4
... )
>>> assert noppositions == 2, "Found {} oppositions for 8467 instead of 2!".format(noppositions)
"""
peaks, _ = signal.find_peaks(elong, width=4)
return len(peaks)


if __name__ == "__main__":
"""Execute the unit test suite"""

# Run the Spark test suite
regular_unit_tests(globals())

0 comments on commit c4d23cd

Please sign in to comment.