Skip to content

Commit

Permalink
Add MPI script for MC mocks
Browse files Browse the repository at this point in the history
  • Loading branch information
andreicuceu committed Nov 17, 2023
1 parent 873bc2d commit 31bdd03
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 22 deletions.
73 changes: 73 additions & 0 deletions bin/run_vega_mc_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python
from vega import VegaInterface
from mpi4py import MPI
from astropy.io import fits
import argparse
import sys


if __name__ == '__main__':
pars = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='Run Vega in parallel.')

pars.add_argument('config', type=str, default=None, help='Config file')
args = pars.parse_args()

mpi_comm = MPI.COMM_WORLD
cpu_rank = mpi_comm.Get_rank()
num_cpus = mpi_comm.Get_size()

def print_func(message):
if cpu_rank == 0:
print(message)
sys.stdout.flush()
mpi_comm.barrier()

print_func('Initializing Vega')

# Initialize Vega and get the sampling parameters
vega = VegaInterface(args.config)
sampling_params = vega.sample_params['limits']

print_func('Finished initializing Vega')

# Check if we need the distortion
use_distortion = vega.main_config['control'].getboolean('use_distortion', True)
if not use_distortion:
for key, data in vega.data.items():
data._distortion_mat = None
test_model = vega.compute_model(vega.params, run_init=True)

# Run monte carlo
run_montecarlo = vega.main_config['control'].getboolean('run_montecarlo', False)
if not run_montecarlo or (vega.mc_config is None):
raise ValueError('Warning: You called "run_vega_mc_mpi.py" without asking'
' for monte carlo. Add "run_montecarlo = True" to the "[control]" section.')

# Check if we need to run a forecast
forecast = vega.main_config['control'].getboolean('forecast', False)
if forecast:
raise ValueError('You asked to run a forecast. Use run_vega.py instead.')

# Get the MC seed and the number of mocks to run
seed = vega.main_config['control'].getint('mc_seed', 0)
num_mc_mocks = vega.main_config['control'].getint('num_mc_mocks', 1)
num_local_mc = num_mc_mocks // num_cpus
if num_mc_mocks % num_cpus != 0:
num_local_mc += 1

# Get fiducial model
fiducial_path = vega.main_config['control'].get('mc_fiducial', None)
if fiducial_path is not None:
with fits.open('fiducial_path') as hdul:
fiducial_model = hdul[1].data['DA']
else:
fiducial_model = vega.compute_model(vega.mc_config['params'], run_init=False)

# Run the mocks
local_seed = int(seed + cpu_rank)
vega.analysis.run_monte_carlo(fiducial_model, num_mocks=num_local_mc, seed=local_seed)

# Write output
vega.output.write_monte_carlo()
141 changes: 130 additions & 11 deletions vega/analysis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import sys

import numpy as np

from vega.minimizer import Minimizer


class Analysis:
"""Vega analysis class.
Expand All @@ -11,23 +15,32 @@ class Analysis:
- Run FastMC analysis
"""

def __init__(self, minimizer, main_config, mc_config=None):
def __init__(self, chi2_func, sampler_params, main_config, corr_items, data, mc_config=None):
"""
Parameters
----------
minimizer : Minimizer
Minimizer object initialized from the same vega instance
chi2_func : function
Chi2 function to minimize
sampler_params : dict
Dictionary with the sampling parameters
main_config : ConfigParser
Main config file
corr_items : dict
Dictionary with the correlation items
data : dict
Dictionary with the data
mc_config : dict, optional
Monte Carlo config with the model and sample parameters,
by default None
"""
self.config = main_config
self.minimizer = minimizer
self._chi2_func = chi2_func
self._scan_minimizer = Minimizer(chi2_func, sampler_params)
self._corr_items = corr_items
self._data = data
self.mc_config = mc_config
pass
self.has_monte_carlo = False

def chi2_scan(self):
"""Compute a chi^2 scan over one or two parameters.
Expand Down Expand Up @@ -75,9 +88,9 @@ def chi2_scan(self):
sample_params['values'][par1] = value

# Minimize and get bestfit values
self.minimizer.minimize(sample_params)
result = self.minimizer.values
result['fval'] = self.minimizer.fmin.fval
self._scan_minimizer.minimize(sample_params)
result = self._scan_minimizer.values
result['fval'] = self._scan_minimizer.fmin.fval
self.scan_results.append(result)

print('INFO: finished chi2scan iteration {} of {}'.format(
Expand All @@ -91,13 +104,119 @@ def chi2_scan(self):
sample_params['values'][par2] = value_2

# Minimize and get bestfit values
self.minimizer.minimize(sample_params)
result = self.minimizer.values
result['fval'] = self.minimizer.fmin.fval
self._scan_minimizer.minimize(sample_params)
result = self._scan_minimizer.values
result['fval'] = self._scan_minimizer.fmin.fval
self.scan_results.append(result)

print('INFO: finished chi2scan iteration {} of {}'.format(
i * len(self.grids[par2]) + j + 1,
len(self.grids[par1]) * len(self.grids[par2])))

return self.scan_results

def create_monte_carlo_sim(self, fiducial_model, seed=None, scale=None, forecast=False):
"""Compute Monte Carlo simulations for each Correlation item.
Parameters
----------
fiducial_model : dict
Fiducial model for the correlation functions
seed : int, optional
Seed for the random number generator, by default None
scale : float/dict, optional
Scaling for the covariance, by default None
forecast : boolean, optional
Forecast option. If true, we don't add noise to the mock,
by default False
Returns
-------
dict
Dictionary with MC mocks for each item
"""
mocks = {}
for name in self._corr_items:
# Get scale
if scale is None:
item_scale = self._corr_items[name].cov_rescale
elif type(scale) is float or type(scale) is int:
item_scale = scale
elif type(scale) is dict and name in scale:
item_scale = scale[name]
else:
item_scale = 1.

# Create the mock
mocks[name] = self._data[name].create_monte_carlo(
fiducial_model, item_scale, seed, forecast)

return mocks

def run_monte_carlo(self, fiducial_model, num_mocks=1, seed=0, scale=None):
"""Run Monte Carlo simulations
Parameters
----------
fiducial_model : dict
Fiducial model for the correlation functions
num_mocks : int, optional
Number of mocks to run, by default 1
seed : int, optional
Starting seed, by default 0
scale : float/dict, optional
Scaling for the covariance, by default None
"""
assert self.mc_config is not None, 'No Monte Carlo config provided'

np.random.seed(seed)
sample_params = self.mc_config['sample']
minimizer = Minimizer(self._chi2_func, sample_params)

self.mc_bestfits = {}
self.mc_covariances = []
self.mc_chisq = []
self.mc_valid_minima = []
self.mc_valid_hesse = []
self.mc_mocks = {}
self.mc_failed_mask = []

for i in range(num_mocks):
print(f'INFO: running Monte Carlo realization {i}')
sys.stdout.flush()

# Create the mocks
mocks = self.create_monte_carlo_sim(
fiducial_model, seed=None, scale=scale, forecast=False)

for name, cf_mock in mocks.keys():
if name not in self.mc_mocks:
self.mc_mocks[name] = []
self.mc_mocks[name].append(cf_mock)

try:
# Run minimizer
minimizer.minimize()
self.mc_failed_mask.append(False)
except ValueError:
print('WARNING: minimizer failed for mock {}'.format(i))
self.mc_failed_mask.append(True)
self.mc_chisq.append(np.nan)
self.mc_valid_minima.append(False)
self.mc_valid_hesse.append(False)
continue

for param, value in minimizer.values.items():
if param not in self.mc_bestfits:
self.mc_bestfits[param] = []
self.mc_bestfits[param].append([value, minimizer.errors[param]])

for param in self.mc_bestfits.keys():
self.mc_bestfits[param] = np.array(self.mc_bestfits[param])

self.mc_covariances.append(minimizer.covariance)
self.mc_chisq.append(minimizer.fmin.fval)
self.mc_valid_minima.append(minimizer.fmin.is_valid)
self.mc_valid_hesse.append(not minimizer.fmin.hesse_failed)

self.has_monte_carlo = True
15 changes: 8 additions & 7 deletions vega/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ def _build_mask(self, rp_grid, rt_grid, cuts_config, rp_min, bin_size_rp, bin_si
mask &= (bin_center_mu > self.mu_min_cut) & (bin_center_mu < self.mu_max_cut)

return mask

def _init_metal_tracers(self, metal_config):
assert ('in tracer1' in metal_config) or ('in tracer2' in metal_config)

Expand All @@ -470,9 +470,9 @@ def _init_metal_tracers(self, metal_config):
if metals_in_tracer2 is not None:
for metal in metals_in_tracer2:
tracer_catalog[metal] = {'name': metal, 'type': 'continuous'}

return metals_in_tracer1, metals_in_tracer2, tracer_catalog

def _init_metal_correlations(self, metal_config, metals_in_tracer1, metals_in_tracer2):
metal_correlations = []
if 'in tracer2' in metal_config:
Expand All @@ -495,7 +495,7 @@ def _init_metal_correlations(self, metal_config, metals_in_tracer1, metals_in_tr
if not self._use_correlation(metal1, metal2):
continue
metal_correlations.append((metal1, metal2))

return metal_correlations

def _init_metals(self, metal_config):
Expand Down Expand Up @@ -628,7 +628,7 @@ def _read_metal_correlation(self, metal_hdul, tracers, name, dm_prefix):
raise ValueError("Cannot find correct metal matrices."
" Check that blinding is consistent between cf and metal files.")

def create_monte_carlo(self, fiducial_model, scale=1., seed=0,
def create_monte_carlo(self, fiducial_model, scale=1., seed=None,
forecast=False):
"""Create monte carlo mock of data using a fiducial model.
Expand Down Expand Up @@ -668,12 +668,13 @@ def create_monte_carlo(self, fiducial_model, scale=1., seed=0,
self._cholesky = linalg.cholesky(self._scale * self.cov_mat)

# Create the mock
np.random.seed(seed)
if seed is not None:
np.random.seed(seed)
if forecast:
self.mc_mock = fiducial_model
else:
ran_vec = np.random.randn(self.full_data_size)
self.mc_mock = self._cholesky.dot(ran_vec) + fiducial_model
self.masked_mc_mock = self.mc_mock[self.model_mask]

return self.masked_mc_mock
return self.mc_mock
61 changes: 61 additions & 0 deletions vega/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,67 @@ def _get_components(model_components, name_prefix=''):

return columns

def write_monte_carlo(self, cpu_id=None):
assert self.analysis is not None
assert self.analysis.has_monte_carlo

primary_hdu = fits.PrimaryHDU()

bestfits = self.analysis.mc_bestfits
covariances = np.array(self.analysis.mc_covariances)

names = np.array(list(bestfits.keys()))
bestfit_table = np.array([bestfits[name][:, 0] for name in names])
errors_table = np.array([bestfits[name][:, 1] for name in names])
covariances = covariances.reshape(bestfit_table.shape[0]*len(names), len(names))

# Get the data types for the columns
max_length = np.max([len(name) for name in names])
name_format = str(max_length) + 'A'
fit_format = f'{bestfit_table.shape[0]}D'
cov_format = f'{covariances.shape[0]}D'

# Create the columns with the bestfit data
col1 = fits.Column(name='names', format=name_format, array=names)
col2 = fits.Column(name='values', format=fit_format, array=bestfit_table)
col3 = fits.Column(name='errors', format=fit_format, array=errors_table)
col4 = fits.Column(name='covariance', format=cov_format, array=covariances)

# Create the Table HDU from the columns
bestfit_hdu = fits.BinTableHDU.from_columns([col1, col2, col3, col4])
bestfit_hdu.name = 'Bestfit'

# Create the columns with the fit information
col1 = fits.Column(name='chisq', format='D', array=self.analysis.mc_chisq)
col2 = fits.Column(name='valid_minima', format='L', array=self.analysis.mc_valid_minima)
col3 = fits.Column(name='valid_hesse', format='L', array=self.analysis.mc_valid_hesse)
col4 = fits.Column(name='failed_mask', format='L', array=self.analysis.mc_failed_mask)

# Create the Table HDU from the columns
fitinfo_hdu = fits.BinTableHDU.from_columns([col1, col2, col3, col4])
fitinfo_hdu.name = 'FitInfo'

mocks = self.analysis.mc_mocks
columns = []
for name in mocks.keys():
table = np.array(mocks[name]).T
columns.append(fits.Column(name=name, format=f'{table.shape[0]}D', array=table))

mocks_hdu = fits.BinTableHDU.from_columns(columns)
mocks_hdu.name = 'Mocks'

hdu_list = [primary_hdu, bestfit_hdu, fitinfo_hdu, mocks_hdu]

hdul = fits.HDUList(hdu_list)
dir_path = Path(self.outfile).parent / 'monte_carlo'
dir_path.mkdir(parents=True, exist_ok=True)
if cpu_id is None:
filepath = dir_path / 'monte_carlo.fits'
else:
filepath = dir_path / f'monte_carlo_{cpu_id}.fits'

hdul.writeto(filepath, overwrite=self.overwrite)

def write_results_hdf(self, minimizer, scan_results=None):
"""Write Vega analysis results, including the bestfit
and chi2 scan results if they exist.
Expand Down
Loading

0 comments on commit 31bdd03

Please sign in to comment.