diff --git a/bin/run_vega_mc_mpi.py b/bin/run_vega_mc_mpi.py new file mode 100644 index 0000000..bfe7c3c --- /dev/null +++ b/bin/run_vega_mc_mpi.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +from vega import VegaInterface +from mpi4py import MPI +from astropy.io import fits +import argparse +import sys + + +if __name__ == '__main__': + pars = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + description='Run Vega in parallel.') + + pars.add_argument('config', type=str, default=None, help='Config file') + args = pars.parse_args() + + mpi_comm = MPI.COMM_WORLD + cpu_rank = mpi_comm.Get_rank() + num_cpus = mpi_comm.Get_size() + + def print_func(message): + if cpu_rank == 0: + print(message) + sys.stdout.flush() + mpi_comm.barrier() + + print_func('Initializing Vega') + + # Initialize Vega and get the sampling parameters + vega = VegaInterface(args.config) + sampling_params = vega.sample_params['limits'] + + print_func('Finished initializing Vega') + + # Check if we need the distortion + use_distortion = vega.main_config['control'].getboolean('use_distortion', True) + if not use_distortion: + for key, data in vega.data.items(): + data._distortion_mat = None + test_model = vega.compute_model(vega.params, run_init=True) + + # Run monte carlo + run_montecarlo = vega.main_config['control'].getboolean('run_montecarlo', False) + if not run_montecarlo or (vega.mc_config is None): + raise ValueError('Warning: You called "run_vega_mc_mpi.py" without asking' + ' for monte carlo. Add "run_montecarlo = True" to the "[control]" section.') + + # Check if we need to run a forecast + forecast = vega.main_config['control'].getboolean('forecast', False) + if forecast: + raise ValueError('You asked to run a forecast. Use run_vega.py instead.') + + # Get the MC seed and the number of mocks to run + seed = vega.main_config['control'].getint('mc_seed', 0) + num_mc_mocks = vega.main_config['control'].getint('num_mc_mocks', 1) + num_local_mc = num_mc_mocks // num_cpus + if num_mc_mocks % num_cpus != 0: + num_local_mc += 1 + + # Get fiducial model + fiducial_path = vega.main_config['control'].get('mc_fiducial', None) + if fiducial_path is not None: + with fits.open('fiducial_path') as hdul: + fiducial_model = hdul[1].data['DA'] + else: + fiducial_model = vega.compute_model(vega.mc_config['params'], run_init=False) + + # Run the mocks + local_seed = int(seed + cpu_rank) + vega.analysis.run_monte_carlo(fiducial_model, num_mocks=num_local_mc, seed=local_seed) + + # Write output + vega.output.write_monte_carlo() diff --git a/vega/analysis.py b/vega/analysis.py index 207e634..5fd33a4 100644 --- a/vega/analysis.py +++ b/vega/analysis.py @@ -1,5 +1,9 @@ +import sys + import numpy as np +from vega.minimizer import Minimizer + class Analysis: """Vega analysis class. @@ -11,23 +15,32 @@ class Analysis: - Run FastMC analysis """ - def __init__(self, minimizer, main_config, mc_config=None): + def __init__(self, chi2_func, sampler_params, main_config, corr_items, data, mc_config=None): """ Parameters ---------- - minimizer : Minimizer - Minimizer object initialized from the same vega instance + chi2_func : function + Chi2 function to minimize + sampler_params : dict + Dictionary with the sampling parameters main_config : ConfigParser Main config file + corr_items : dict + Dictionary with the correlation items + data : dict + Dictionary with the data mc_config : dict, optional Monte Carlo config with the model and sample parameters, by default None """ self.config = main_config - self.minimizer = minimizer + self._chi2_func = chi2_func + self._scan_minimizer = Minimizer(chi2_func, sampler_params) + self._corr_items = corr_items + self._data = data self.mc_config = mc_config - pass + self.has_monte_carlo = False def chi2_scan(self): """Compute a chi^2 scan over one or two parameters. @@ -75,9 +88,9 @@ def chi2_scan(self): sample_params['values'][par1] = value # Minimize and get bestfit values - self.minimizer.minimize(sample_params) - result = self.minimizer.values - result['fval'] = self.minimizer.fmin.fval + self._scan_minimizer.minimize(sample_params) + result = self._scan_minimizer.values + result['fval'] = self._scan_minimizer.fmin.fval self.scan_results.append(result) print('INFO: finished chi2scan iteration {} of {}'.format( @@ -91,9 +104,9 @@ def chi2_scan(self): sample_params['values'][par2] = value_2 # Minimize and get bestfit values - self.minimizer.minimize(sample_params) - result = self.minimizer.values - result['fval'] = self.minimizer.fmin.fval + self._scan_minimizer.minimize(sample_params) + result = self._scan_minimizer.values + result['fval'] = self._scan_minimizer.fmin.fval self.scan_results.append(result) print('INFO: finished chi2scan iteration {} of {}'.format( @@ -101,3 +114,109 @@ def chi2_scan(self): len(self.grids[par1]) * len(self.grids[par2]))) return self.scan_results + + def create_monte_carlo_sim(self, fiducial_model, seed=None, scale=None, forecast=False): + """Compute Monte Carlo simulations for each Correlation item. + + Parameters + ---------- + fiducial_model : dict + Fiducial model for the correlation functions + seed : int, optional + Seed for the random number generator, by default None + scale : float/dict, optional + Scaling for the covariance, by default None + forecast : boolean, optional + Forecast option. If true, we don't add noise to the mock, + by default False + + Returns + ------- + dict + Dictionary with MC mocks for each item + """ + mocks = {} + for name in self._corr_items: + # Get scale + if scale is None: + item_scale = self._corr_items[name].cov_rescale + elif type(scale) is float or type(scale) is int: + item_scale = scale + elif type(scale) is dict and name in scale: + item_scale = scale[name] + else: + item_scale = 1. + + # Create the mock + mocks[name] = self._data[name].create_monte_carlo( + fiducial_model, item_scale, seed, forecast) + + return mocks + + def run_monte_carlo(self, fiducial_model, num_mocks=1, seed=0, scale=None): + """Run Monte Carlo simulations + + Parameters + ---------- + fiducial_model : dict + Fiducial model for the correlation functions + num_mocks : int, optional + Number of mocks to run, by default 1 + seed : int, optional + Starting seed, by default 0 + scale : float/dict, optional + Scaling for the covariance, by default None + """ + assert self.mc_config is not None, 'No Monte Carlo config provided' + + np.random.seed(seed) + sample_params = self.mc_config['sample'] + minimizer = Minimizer(self._chi2_func, sample_params) + + self.mc_bestfits = {} + self.mc_covariances = [] + self.mc_chisq = [] + self.mc_valid_minima = [] + self.mc_valid_hesse = [] + self.mc_mocks = {} + self.mc_failed_mask = [] + + for i in range(num_mocks): + print(f'INFO: running Monte Carlo realization {i}') + sys.stdout.flush() + + # Create the mocks + mocks = self.create_monte_carlo_sim( + fiducial_model, seed=None, scale=scale, forecast=False) + + for name, cf_mock in mocks.keys(): + if name not in self.mc_mocks: + self.mc_mocks[name] = [] + self.mc_mocks[name].append(cf_mock) + + try: + # Run minimizer + minimizer.minimize() + self.mc_failed_mask.append(False) + except ValueError: + print('WARNING: minimizer failed for mock {}'.format(i)) + self.mc_failed_mask.append(True) + self.mc_chisq.append(np.nan) + self.mc_valid_minima.append(False) + self.mc_valid_hesse.append(False) + continue + + for param, value in minimizer.values.items(): + if param not in self.mc_bestfits: + self.mc_bestfits[param] = [] + self.mc_bestfits[param].append([value, minimizer.errors[param]]) + + for param in self.mc_bestfits.keys(): + self.mc_bestfits[param] = np.array(self.mc_bestfits[param]) + + self.mc_covariances.append(minimizer.covariance) + self.mc_chisq.append(minimizer.fmin.fval) + self.mc_valid_minima.append(minimizer.fmin.is_valid) + self.mc_valid_hesse.append(not minimizer.fmin.hesse_failed) + + self.has_monte_carlo = True diff --git a/vega/data.py b/vega/data.py index 5b7a7ef..72e1a3c 100644 --- a/vega/data.py +++ b/vega/data.py @@ -446,7 +446,7 @@ def _build_mask(self, rp_grid, rt_grid, cuts_config, rp_min, bin_size_rp, bin_si mask &= (bin_center_mu > self.mu_min_cut) & (bin_center_mu < self.mu_max_cut) return mask - + def _init_metal_tracers(self, metal_config): assert ('in tracer1' in metal_config) or ('in tracer2' in metal_config) @@ -470,9 +470,9 @@ def _init_metal_tracers(self, metal_config): if metals_in_tracer2 is not None: for metal in metals_in_tracer2: tracer_catalog[metal] = {'name': metal, 'type': 'continuous'} - + return metals_in_tracer1, metals_in_tracer2, tracer_catalog - + def _init_metal_correlations(self, metal_config, metals_in_tracer1, metals_in_tracer2): metal_correlations = [] if 'in tracer2' in metal_config: @@ -495,7 +495,7 @@ def _init_metal_correlations(self, metal_config, metals_in_tracer1, metals_in_tr if not self._use_correlation(metal1, metal2): continue metal_correlations.append((metal1, metal2)) - + return metal_correlations def _init_metals(self, metal_config): @@ -628,7 +628,7 @@ def _read_metal_correlation(self, metal_hdul, tracers, name, dm_prefix): raise ValueError("Cannot find correct metal matrices." " Check that blinding is consistent between cf and metal files.") - def create_monte_carlo(self, fiducial_model, scale=1., seed=0, + def create_monte_carlo(self, fiducial_model, scale=1., seed=None, forecast=False): """Create monte carlo mock of data using a fiducial model. @@ -668,7 +668,8 @@ def create_monte_carlo(self, fiducial_model, scale=1., seed=0, self._cholesky = linalg.cholesky(self._scale * self.cov_mat) # Create the mock - np.random.seed(seed) + if seed is not None: + np.random.seed(seed) if forecast: self.mc_mock = fiducial_model else: @@ -676,4 +677,4 @@ def create_monte_carlo(self, fiducial_model, scale=1., seed=0, self.mc_mock = self._cholesky.dot(ran_vec) + fiducial_model self.masked_mc_mock = self.mc_mock[self.model_mask] - return self.masked_mc_mock + return self.mc_mock diff --git a/vega/output.py b/vega/output.py index e834fdc..f57a4a1 100644 --- a/vega/output.py +++ b/vega/output.py @@ -412,6 +412,67 @@ def _get_components(model_components, name_prefix=''): return columns + def write_monte_carlo(self, cpu_id=None): + assert self.analysis is not None + assert self.analysis.has_monte_carlo + + primary_hdu = fits.PrimaryHDU() + + bestfits = self.analysis.mc_bestfits + covariances = np.array(self.analysis.mc_covariances) + + names = np.array(list(bestfits.keys())) + bestfit_table = np.array([bestfits[name][:, 0] for name in names]) + errors_table = np.array([bestfits[name][:, 1] for name in names]) + covariances = covariances.reshape(bestfit_table.shape[0]*len(names), len(names)) + + # Get the data types for the columns + max_length = np.max([len(name) for name in names]) + name_format = str(max_length) + 'A' + fit_format = f'{bestfit_table.shape[0]}D' + cov_format = f'{covariances.shape[0]}D' + + # Create the columns with the bestfit data + col1 = fits.Column(name='names', format=name_format, array=names) + col2 = fits.Column(name='values', format=fit_format, array=bestfit_table) + col3 = fits.Column(name='errors', format=fit_format, array=errors_table) + col4 = fits.Column(name='covariance', format=cov_format, array=covariances) + + # Create the Table HDU from the columns + bestfit_hdu = fits.BinTableHDU.from_columns([col1, col2, col3, col4]) + bestfit_hdu.name = 'Bestfit' + + # Create the columns with the fit information + col1 = fits.Column(name='chisq', format='D', array=self.analysis.mc_chisq) + col2 = fits.Column(name='valid_minima', format='L', array=self.analysis.mc_valid_minima) + col3 = fits.Column(name='valid_hesse', format='L', array=self.analysis.mc_valid_hesse) + col4 = fits.Column(name='failed_mask', format='L', array=self.analysis.mc_failed_mask) + + # Create the Table HDU from the columns + fitinfo_hdu = fits.BinTableHDU.from_columns([col1, col2, col3, col4]) + fitinfo_hdu.name = 'FitInfo' + + mocks = self.analysis.mc_mocks + columns = [] + for name in mocks.keys(): + table = np.array(mocks[name]).T + columns.append(fits.Column(name=name, format=f'{table.shape[0]}D', array=table)) + + mocks_hdu = fits.BinTableHDU.from_columns(columns) + mocks_hdu.name = 'Mocks' + + hdu_list = [primary_hdu, bestfit_hdu, fitinfo_hdu, mocks_hdu] + + hdul = fits.HDUList(hdu_list) + dir_path = Path(self.outfile).parent / 'monte_carlo' + dir_path.mkdir(parents=True, exist_ok=True) + if cpu_id is None: + filepath = dir_path / 'monte_carlo.fits' + else: + filepath = dir_path / f'monte_carlo_{cpu_id}.fits' + + hdul.writeto(filepath, overwrite=self.overwrite) + def write_results_hdf(self, minimizer, scan_results=None): """Write Vega analysis results, including the bestfit and chi2 scan results if they exist. diff --git a/vega/vega_interface.py b/vega/vega_interface.py index 1542a58..f1a68fc 100644 --- a/vega/vega_interface.py +++ b/vega/vega_interface.py @@ -138,8 +138,10 @@ def __init__(self, main_path): self.minimizer = None else: self.minimizer = Minimizer(self.chi2, self.sample_params) - self.analysis = Analysis(Minimizer(self.chi2, self.sample_params), - self.main_config, self.mc_config) + self.analysis = Analysis( + self.chi2, self.sample_params, self.main_config, + self.corr_items, self.data, self.mc_config + ) # Check for sampler self.has_sampler = False @@ -441,8 +443,8 @@ def monte_carlo_sim(self, params=None, scale=None, seed=0, forecast=False): item_scale = 1. # Create the mock - mocks[name] = self.data[name].create_monte_carlo(fiducial_model, item_scale, seed, - forecast) + mocks[name] = self.data[name].create_monte_carlo( + fiducial_model, item_scale, seed, forecast) self.monte_carlo = True return mocks