From 225231d20abf807442bf735d74a459a2bba90ac6 Mon Sep 17 00:00:00 2001 From: Esteban Vohringer-Martinez Date: Mon, 24 Aug 2020 11:33:42 -0400 Subject: [PATCH 1/2] Distance based selection of alchemical ions --- Yank/experiment.py | 200 ++---------- Yank/pipeline.py | 551 ++++++++++------------------------ Yank/schema/validator.py | 3 +- Yank/tests/test_experiment.py | 107 +------ Yank/tests/test_pipeline.py | 148 ++------- Yank/tests/test_yank.py | 38 ++- Yank/yank.py | 26 +- 7 files changed, 243 insertions(+), 830 deletions(-) diff --git a/Yank/experiment.py b/Yank/experiment.py index 2b266403..15bbe7c5 100644 --- a/Yank/experiment.py +++ b/Yank/experiment.py @@ -234,7 +234,7 @@ def create_alchemical_phase(self): if self.options['store_solute_trajectory']: # "Solute" is basically just not water. Includes all non-water atoms and ions # Topography ensures the union of solute_atoms and ions_atoms is a null set - solute_atoms = sorted(self.topography.solute_atoms + self.topography.ions_atoms) + solute_atoms = self.topography.solute_atoms + self.topography.ions_atoms if checkpoint_interval == 1: logger.warning("WARNING! You have specified both a solute-only trajectory AND a checkpoint " "interval of 1! You are about write the trajectory of the solute twice!\n" @@ -798,10 +798,6 @@ def status(self): in the experiments have been completed, "pending" if they are both pending, and "ongoing" otherwise. - Sometimes the netcdf file can't be read while the simulation is - running. In this case, the status of the phase (n.b., not the - experiment) is set to "unavailable". - Yields ------ experiment_status : namedtuple @@ -849,10 +845,6 @@ def status(self): except FileNotFoundError: iteration = None phase_status = 'pending' - except OSError: - # The simulation is probably running and the netcdf file is locked. - iteration = None - phase_status = 'unavailable (probably currently running)' else: iteration = phase_status.iteration if _is_phase_completed(phase_status, number_of_iterations): @@ -1510,21 +1502,15 @@ def _validate_protocols(protocols_description): n_samples_per_state: type: integer default: 100 - thermodynamic_distance: + std_potential_threshold: type: float default: 0.5 - distance_tolerance: + threshold_tolerance: type: float default: 0.05 reversed_direction: type: boolean default: yes - bidirectional_redistribution: - type: boolean - default: yes - bidirectional_search_thermo_dist: - required: no - type: float function_variable_name: required: no type: string @@ -1706,14 +1692,6 @@ def _is_pipeline_solvent_with_receptor(field, solvent_id, error): type: string dependencies: [phase1_path, phase2_path] check_with: directory_exists - charmm_parameter_files: - required: no - type: list - schema: - type: string - check_with: file_exists - dependencies: [phase1_path, phase2_path] - # Solvents solvent: @@ -2463,15 +2441,11 @@ def __getattr__(self, _): phase_factory.options['number_of_equilibration_iterations'] = 0 # We only need to create a single thermo state for the equilibration, - # and we need to equilibrate in the state used to start the algorithm. - if trailblazer_options['reversed_direction']: - equilibration_state_idx = -1 - else: - equilibration_state_idx = 0 - # AlchemicalPhase doesn't support alchemical functions yet so we need to set the state manually. - equilibration_protocol = {par[0]: [par[1][equilibration_state_idx]] for par in state_parameters} + # but AlchemicalPhase currently doesn't support alchemical functions yet + # so we need to set the state manually. + equilibration_protocol = {par[0]: [par[1][0]] for par in state_parameters} if function_variable_name is not None: - variable_value = equilibration_protocol.pop(function_variable_name)[equilibration_state_idx] + variable_value = equilibration_protocol.pop(function_variable_name)[0] expression_context = {function_variable_name: variable_value} for parameter_name, alchemical_function in alchemical_functions.items(): equilibration_protocol[parameter_name] = [alchemical_function(expression_context)] @@ -2522,125 +2496,7 @@ def __getattr__(self, _): self._generate_yaml(experiment, script_path, overwrite_protocol=protocol) @staticmethod - def _generate_lambda_alchemical_function(lambda0, lambda1): - """Generate an alchemical function expression for a variable. - - The function will go from 0 to 1 between lambda0 and lambda1 and - remain constant for lambda values above or below the given ones. - """ - if lambda0 < lambda1: - # The function increases from lambda0 to lambda1. - return (f'1 - ({lambda1} - lambda) / ({lambda1} - {lambda0}) * step({lambda1} - lambda) - ' - f'(lambda - {lambda0}) / ({lambda1} - {lambda0}) * step({lambda0} - lambda)') - else: - # The function decreases from lambda0 to lambda1. - return (f'({lambda0} - lambda) / ({lambda0} - {lambda1}) * step({lambda0} - lambda) - ' - f'(({lambda0} - lambda) / ({lambda0} - {lambda1}) - 1)* step({lambda1} - lambda)') - - @classmethod - def _generate_auto_alchemical_functions(cls, phase_factory): - """Generate the default auto protocol with AlchemicalFunctions. - - Using alchemical functions allow us to "cut corners" and avoid forcing - states when variables are completely turned off/on if not necessary to - maintain a constant thermodynamic length between states (e.g. lambda_electrostatics=0 - and lambda_sterics=1). - - On the other hand, these "corners" are tricky because the thermodynamic - tensor can change a lot at these points and the Newton-like algorithm - has more troubles converging. - """ - alchemical_functions = {} - - # In the normal path, we turn on the restraint from lambda=3 to lambda=2, - # then we turn off the electrostatics from lambda=2 to lambda=1, - # and finally the sterics from lambda=1 to lambda=0. - is_vacuum = (len(phase_factory.topography.receptor_atoms) == 0 and - len(phase_factory.topography.solvent_atoms) == 0) - - # Initialize lambda end states. lambda_end_states[0] and [1] - # are the coupled and decoupled states respectively - lambda_end_states = [0, 0] - - # Starting from the decoupled state, first turn the RMSD restraints on 0 to 1. - # At this point the restraint is activated and lambda_restraint == 1 - # so the alchemical function have to go from 0 to -1 between - # lambda == 1 and lambda == 0. - if isinstance(phase_factory.restraint, restraints.RMSD): - lambda_end_states[0] += 1 - alchemical_function = cls._generate_lambda_alchemical_function(*lambda_end_states) - alchemical_functions['lambda_restraints'] = ' - (' + alchemical_function + ')' - - # Remove the sterics right before that. However, there's no need - # to go through electrostatics if this is a vacuum calculation and - # we are only decoupling the Lennard-Jones interactions. - if is_vacuum and not phase_factory.alchemical_regions.annihilate_sterics: - alchemical_functions['lambda_sterics'] = '1.0' - else: - alchemical_function = cls._generate_lambda_alchemical_function(lambda_end_states[0], - lambda_end_states[0]+1) - alchemical_functions['lambda_sterics'] = alchemical_function - lambda_end_states[0] += 1 - - # Same for electrostatics. - if is_vacuum and not phase_factory.alchemical_regions.annihilate_electrostatics: - alchemical_functions['lambda_electrostatics'] = '1.0' - else: - alchemical_function = cls._generate_lambda_alchemical_function(lambda_end_states[0], - lambda_end_states[0]+1) - alchemical_functions['lambda_electrostatics'] = alchemical_function - lambda_end_states[0] += 1 - - # Finally turn off the restraint if there is any. - if phase_factory.restraint is not None: - alchemical_function = cls._generate_lambda_alchemical_function(lambda_end_states[0]+1, - lambda_end_states[0]) - # If this is a RMSD restraint, there should be already a segment - # of the function in which the RMSD restraint is turned off. - try: - alchemical_functions['lambda_restraints'] = alchemical_function + alchemical_functions['lambda_restraints'] - except KeyError: - alchemical_functions['lambda_restraints'] = alchemical_function - lambda_end_states[0] += 1 - - # Convert strings to actual alchemical function objects. - for parameter_name, alchemical_function in alchemical_functions.items(): - alchemical_functions[parameter_name] = mmtools.alchemy.AlchemicalFunction(alchemical_function) - - # The end states for lambda feeded to the trailblaze function have - # to go from the coupled to the decoupled direction. - state_parameters = [('lambda', lambda_end_states)] - - return alchemical_functions, state_parameters - - @staticmethod - def _generate_auto_state_parameters(phase_factory): - """Generate the default auto protocol as a sequence of state_parameters.""" - is_vacuum = (len(phase_factory.topography.receptor_atoms) == 0 and - len(phase_factory.topography.solvent_atoms) == 0) - state_parameters = [] - - # First, turn on the restraint if there are any. - if phase_factory.restraint is not None: - state_parameters.append(('lambda_restraints', [0.0, 1.0])) - # We support only lambda sterics and electrostatics for now. - if is_vacuum and not phase_factory.alchemical_regions.annihilate_electrostatics: - state_parameters.append(('lambda_electrostatics', [1.0, 1.0])) - else: - state_parameters.append(('lambda_electrostatics', [1.0, 0.0])) - if is_vacuum and not phase_factory.alchemical_regions.annihilate_sterics: - state_parameters.append(('lambda_sterics', [1.0, 1.0])) - else: - state_parameters.append(('lambda_sterics', [1.0, 0.0])) - # Turn the RMSD restraints off slowly at the end - if isinstance(phase_factory.restraint, restraints.RMSD): - state_parameters.append(('lambda_restraints', [1.0, 0.0])) - - return state_parameters - - @classmethod - def _determine_trailblaze_path(cls, phase_factory, alchemical_path, - generate_alchemical_functions=False): + def _determine_trailblaze_path(phase_factory, alchemical_path): """Determine the path to discretize feeded to the trailblaze algorithm. Parameters @@ -2650,23 +2506,33 @@ def _determine_trailblaze_path(cls, phase_factory, alchemical_path, alchemical_path : Union[str, Dict[str, Union[str, float Quantity]]] Either 'auto' or the dictionary including mathematical expressions or end states of the path to discretize. - generate_alchemical_functions : bool, optional - If True, ``_generate_auto_alchemical_functions`` is used instead - of ``_generate_auto_state_parameters`` this slightly reduces the - number of intermediate states but it slows down the convergence - of the trailblaze algorithm. """ + state_parameters = [] + alchemical_functions = {} + # If alchemical_path is set to 'auto', discretize the default path. if alchemical_path == 'auto': - if generate_alchemical_functions: - alchemical_functions, state_parameters = cls._generate_auto_alchemical_functions(phase_factory) + is_vacuum = (len(phase_factory.topography.receptor_atoms) == 0 and + len(phase_factory.topography.solvent_atoms) == 0) + state_parameters = [] + + # First, turn on the restraint if there are any. + if phase_factory.restraint is not None: + state_parameters.append(('lambda_restraints', [0.0, 1.0])) + # We support only lambda sterics and electrostatics for now. + if is_vacuum and not phase_factory.alchemical_regions.annihilate_electrostatics: + state_parameters.append(('lambda_electrostatics', [1.0, 1.0])) + else: + state_parameters.append(('lambda_electrostatics', [1.0, 0.0])) + if is_vacuum and not phase_factory.alchemical_regions.annihilate_sterics: + state_parameters.append(('lambda_sterics', [1.0, 1.0])) else: - alchemical_functions = {} - state_parameters = cls._generate_auto_state_parameters(phase_factory) + state_parameters.append(('lambda_sterics', [1.0, 0.0])) + # Turn the RMSD restraints off slowly at the end + if isinstance(phase_factory.restraint, restraints.RMSD): + state_parameters.append(('lambda_restraints', [1.0, 0.0])) else: # Otherwise, differentiate between mathematical expressions and the function variable to build. - state_parameters = [] - alchemical_functions = {} for parameter_name, value in alchemical_path.items(): if isinstance(value, str): alchemical_functions[parameter_name] = mmtools.alchemy.AlchemicalFunction(value) @@ -3089,7 +2955,6 @@ def _build_experiment(self, experiment_path, experiment, use_dummy_protocol=Fals # Get system files. system_files_paths = self._db.get_system(system_id) gromacs_include_dir = self._db.systems[system_id].get('gromacs_include_dir', None) - charmm_parameter_files = self._db.systems[system_id].get('charmm_parameter_files', None) # Prepare Yank arguments phases = [None, None] @@ -3116,7 +2981,7 @@ def _build_experiment(self, experiment_path, experiment, use_dummy_protocol=Fals logger.info("Reading phase {}".format(phase_name)) system, topology, sampler_state = pipeline.read_system_files( positions_file_path, parameters_file_path, system_options, - gromacs_include_dir=gromacs_include_dir, charmm_parameter_files=charmm_parameter_files) + gromacs_include_dir=gromacs_include_dir) # Identify system components. There is a ligand only in the complex phase. if phase_idx == 0: @@ -3142,7 +3007,8 @@ def _build_experiment(self, experiment_path, experiment, use_dummy_protocol=Fals # Start from AlchemicalPhase default alchemical region # and modified it according to the user options. phase_protocol = protocol[phase_name]['alchemical_path'] - alchemical_region = AlchemicalPhase._build_default_alchemical_region(system, topography, + alchemical_region = AlchemicalPhase._build_default_alchemical_region(system, topography, + sampler_state, phase_protocol) alchemical_region = alchemical_region._replace(**alchemical_region_opts) @@ -3165,7 +3031,7 @@ def _build_experiment(self, experiment_path, experiment, use_dummy_protocol=Fals # If we have generated samples with trailblaze, we start the # simulation from those samples. Also, we can turn off minimization # as it has been already performed before trailblazing. - if not use_dummy_protocol and exp_opts['start_from_trailblaze_samples'] is True: + if exp_opts['start_from_trailblaze_samples'] is True: trailblaze_checkpoint_dir_path = self._get_trailblaze_checkpoint_dir_path( experiment_path, phase_name) try: diff --git a/Yank/pipeline.py b/Yank/pipeline.py index 41bbe935..b03b2bd0 100644 --- a/Yank/pipeline.py +++ b/Yank/pipeline.py @@ -16,15 +16,14 @@ # GLOBAL IMPORTS # ============================================================================================= -import collections -import copy -import inspect -import itertools -import json -import logging import os import re import sys +import copy +import inspect +import logging +import itertools +import collections import mdtraj import mpiplus @@ -67,6 +66,10 @@ def compute_squared_distances(molecule1_positions, molecule2_positions): distance between atom i of molecule1 and atom j of molecule 2. """ + # if the molecule is just one atom with 3 dimensions (example counterion) + #if len(molecule1_positions) == 3: + # squared_distances = np.array([((molecule2_positions - molecule1_positions)**2).sum(1)]) + #else: squared_distances = np.array([((molecule2_positions - atom_pos)**2).sum(1) for atom_pos in molecule1_positions]) return squared_distances @@ -100,7 +103,6 @@ def compute_min_dist(mol_positions, *args): # Compute squared distances. Each row is an array of distances # from a mol_positions atom to all argmol_positions atoms. distances2 = compute_squared_distances(mol_positions, argmol_positions) - # Find closest atoms and their distance min_idx = np.unravel_index(distances2.argmin(), distances2.shape) try: @@ -233,15 +235,55 @@ def compute_net_charge(system, atom_indices): net_charge = int(round(net_charge / unit.elementary_charge)) return net_charge +def order_counterions(system, topography, sampler_state): + """Order the counterions in decreasing distance to the ligand/solute. + + Parameters + ---------- + system : simtk.openmm.System + The system object containing the atoms of interest. + topography : Yank topography + The topography object holding the indices of the ions and the + ligand (for binding free energy) or solute (for transfer free + energy). + sampler_state : openmmtools.mdtools.sampler_state + State with positions + + Returns + ------- + ion_index, charge : list + List of ion index and charge -def find_alchemical_counterions(system, topography, region_name): + """ + pos_unit = sampler_state.positions.unit + # atoms of the ligand/solute which are part of the alchemical transformation + alchemical_atoms = topography.ligand_atoms + # if no ligand present but a charged solute + if not alchemical_atoms: + alchemical_atoms = topography.solute_atoms + alchemical_positions = sampler_state.positions[alchemical_atoms] / pos_unit + # compute minimal distance between each ion and the alchemical atoms + ions_distances = np.zeros(len(topography.ions_atoms)) + for i, ion_id in enumerate(topography.ions_atoms): + ion_position = sampler_state.positions[ion_id] / pos_unit + ions_distances[i] = compute_min_dist([ion_position], alchemical_positions) + logger.debug('Min distance of ion {} {}: {}'.format(ion_id, + topography.topology.atom(ion_id).residue.name, + ions_distances[i])) + # list with ion_id ordered in decreasing distance of the ligand + ordered_ion_id = [topography.ions_atoms[ion_id] for ion_id in np.argsort(ions_distances)[::-1]] + return [(ion_id, compute_net_charge(system, [ion_id])) + for ion_id in ordered_ion_id] + +def find_alchemical_counterions(system, topography, sampler_state, region_name): """Return the atom indices of the ligand or solute counter ions. In periodic systems, the solvation box needs to be neutral, and if the decoupled molecule is charged, it will cause trouble. This can be used to find a set of ions in the system that neutralize the molecule, so that the solvation box will remain neutral all - the time. + the time. Ions are selected starting with the ones with the largest + distance to the ligand. Parameters ---------- @@ -251,6 +293,8 @@ def find_alchemical_counterions(system, topography, region_name): The topography object holding the indices of the ions and the ligand (for binding free energy) or solute (for transfer free energy). + sampler_state : openmmtools.mdtools.sampler_state + State with positions region_name : str The region name in the topography (e.g. "ligand_atoms") for which to find counter ions. @@ -280,19 +324,20 @@ def find_alchemical_counterions(system, topography, region_name): if mol_net_charge == 0: return [] - # Find net charge of all ions in the system. - ions_net_charges = [(ion_id, compute_net_charge(system, [ion_id])) - for ion_id in topography.ions_atoms] + # Find net charge of all ions in the system and order them according to the + # largest distance from the ligand/solute + ions_net_charges = order_counterions(system, topography, sampler_state) topology = topography.topology ions_names_charges = [(topology.atom(ion_id).residue.name, ion_net_charge) for ion_id, ion_net_charge in ions_net_charges] - logger.debug('Ions net charges: {}'.format(ions_names_charges)) + # Find minimal subset of counterions whose charges sums to -mol_net_charge. for n_ions in range(1, len(ions_net_charges) + 1): for ion_subset in itertools.combinations(ions_net_charges, n_ions): counterions_indices, counterions_charges = zip(*ion_subset) if sum(counterions_charges) == -mol_net_charge: + logger.debug('Index of alchemical counter ion {}'.format(counterions_indices)) return counterions_indices # We couldn't find any subset of counterions neutralizing the region. @@ -411,7 +456,7 @@ def create_system(parameters_file, box_vectors, create_system_args, system_optio def read_system_files(positions_file_path, parameters_file_path, system_options, - gromacs_include_dir=None, charmm_parameter_files=None): + gromacs_include_dir=None): """Create a Yank arguments for a phase from system files. Parameters @@ -419,7 +464,7 @@ def read_system_files(positions_file_path, parameters_file_path, system_options, positions_file_path : str Path to system position file (e.g. 'complex.inpcrd/.gro/.pdb'). parameters_file_path : str - Path to system parameters file (e.g. 'complex.prmtop/.top/.xml/.psf'). + Path to system parameters file (e.g. 'complex.prmtop/.top/.xml'). system_options : dict ``system_options[phase]`` is a a dictionary containing options to pass to ``createSystem()``. If the parameters file is an OpenMM @@ -427,8 +472,6 @@ def read_system_files(positions_file_path, parameters_file_path, system_options, gromacs_include_dir : str, optional Path to directory in which to look for other files included from the gromacs top file. - charmm_parameter_files : str, optional - Path to additional parameter files Returns ------- @@ -495,24 +538,6 @@ def read_system_files(positions_file_path, parameters_file_path, system_options, system = create_system(parameters_file, box_vectors, create_system_args, system_options) - # Read CHARMM format psf and pdb files - elif parameters_file_extension == '.psf': - logger.debug("psf: {}".format(parameters_file_path)) - logger.debug("pdb: {}".format(positions_file_path)) - - parameters_file = openmm.app.CharmmPsfFile(parameters_file_path) - positions_file = openmm.app.PDBFile(positions_file_path) - params = openmm.app.CharmmParameterSet(*charmm_parameter_files) - - box_vectors = positions_file.topology.getPeriodicBoxVectors() - if box_vectors is None: - box_vectors = system.getDefaultPeriodicBoxVectors() - - parameters_file.setBox(box_vectors[0][0], box_vectors[1][1], box_vectors[2][2]) - create_system_args = set(inspect.getargspec(openmm.app.CharmmPsfFile.createSystem).args) - system_options['params'] = params - system = create_system(parameters_file, box_vectors, create_system_args, system_options) - # Unsupported file format. else: raise ValueError('Unsupported format for parameter file {}'.format(parameters_file_extension)) @@ -1178,7 +1203,7 @@ def get_system_files_paths(self, system_id): parameters_path=os.path.join(system_dir, 'solvent2.prmtop')) ] else: - parameter_file_extensions = {'prmtop', 'top', 'xml', 'psf'} + parameter_file_extensions = {'prmtop', 'top', 'xml'} system_files_paths = [] for phase_path_name in ['phase1_path', 'phase2_path']: file_paths = self.systems[system_id][phase_path_name] @@ -2020,17 +2045,13 @@ def write_sampler_state(self, sampler_state): (a, b, c), (alpha, beta, gamma)) -def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path, redistributed=True): +def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path): """Read positions and box vectors stored as checkpoint by the trailblaze algorithm. Parameters ---------- checkpoint_dir_path : str The path to the directory containing the checkpoint information. - redistributed : bool, optional - If True, the function will check if the states were redistributed, - and will returned the set of coordinates that are more representative - of the redistributed protocol. Returns ------- @@ -2044,7 +2065,6 @@ def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path, redistributed=Tr If no file with the coordinates was found. """ positions_file_path = os.path.join(checkpoint_dir_path, 'coordinates.dcd') - states_map_file_path = os.path.join(checkpoint_dir_path, 'states_map.json') # Open the file if it exist. try: @@ -2057,236 +2077,15 @@ def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path, redistributed=Tr sampler_states = trajectory_file.read_as_sampler_states() finally: trajectory_file.close() - - # If the protocol was redistributed, use the states map to create - # a new set of sampler states that can be used as starting conditions - # for the redistributed protocol. - if redistributed: - try: - with open(states_map_file_path, 'r') as f: - states_map = json.load(f) - sampler_states = [sampler_states[i] for i in states_map] - except FileNotFoundError: - pass - return sampler_states -def _resume_thermodynamic_trailblazing(checkpoint_dir_path, initial_protocol): - """Resume a previously-run trailblaze execution. - - Parameters - ---------- - checkpoint_dir_path : str - The path to the directory used to store the trailblaze information. - initial_protocol : Dict[str, List[float]] - The initial protocol containing only the first state of the path. - If no checkpoint file for the protocol is found, the file will - be initialized using this state. - - Returns - ------- - resumed_protocol : Dict[str, List[float]] - The resumed optimal protocol. - trajectory_file : _DCDTrajectoryFile - A DCD file open for appening. - sampler_state : SamplerState or None - The last saved SamplerState or None if no frame was saved yet. - - """ - # We save the protocol in a YAML file and the positions in netcdf. - protocol_file_path = os.path.join(checkpoint_dir_path, 'protocol.yaml') - stds_file_path = os.path.join(checkpoint_dir_path, 'states_stds.json') - positions_file_path = os.path.join(checkpoint_dir_path, 'coordinates.dcd') - - # Create the directory, if it doesn't exist. - os.makedirs(checkpoint_dir_path, exist_ok=True) - - # Load protocol and stds checkpoint file. - try: - # Parse the previously calculated optimal_protocol dict. - with open(protocol_file_path, 'r') as file_stream: - resumed_protocol = yaml.load(file_stream, Loader=yaml.FullLoader) - - # Load the energy difference stds. - with open(stds_file_path, 'r') as f: - states_stds = json.load(f) - except FileNotFoundError: - resumed_protocol = initial_protocol - states_stds = [[], []] - - # Check if there's an existing positions information. - try: - # We want the coordinates of the states that were sampled - # during the search not the states after redistribution. - sampler_states = read_trailblaze_checkpoint_coordinates( - checkpoint_dir_path, redistributed=False) - except FileNotFoundError: - len_trajectory = 0 - else: - len_trajectory = len(sampler_states) - - # Raise an error if the algorithm was interrupted *during* - # writing on disk. We store only the states that we simulated - # so there should be one less here. - for state_values in resumed_protocol.values(): - if len_trajectory < len(state_values) - 1: - err_msg = ("The trailblaze algorithm was interrupted while " - "writing the checkpoint file and it is now unable " - "to resume. Please delete the files " - f"in {checkpoint_dir_path} and restart.") - raise RuntimeError(err_msg) - - # When this is resumed, but the trajectory is already completed, - # the frame of the final end state has been already written, but - # we don't want to add it twice at the end of the trailblaze function. - len_trajectory = len(state_values) - 1 - - # Whether the file exist or not, MDTraj doesn't support appending - # files so we open a new one and rewrite the configurations we - # have generated in the previous run. - trajectory_file = _DCDTrajectoryFile(positions_file_path, 'w', - force_overwrite=True) - if len_trajectory > 0: - for i in range(len_trajectory): - trajectory_file.write_sampler_state(sampler_states[i]) - # Make sure the next search starts from the last saved position - # unless the previous calculation was interrupted before the - # first position could be saved. - sampler_state = sampler_states[-1] - else: - sampler_state = None - - return resumed_protocol, states_stds, trajectory_file, sampler_state - - -def _cache_trailblaze_data(checkpoint_dir_path, optimal_protocol, states_stds, - trajectory_file, sampler_state): - """Store on disk current state of the trailblaze run.""" - - # Determine the file paths of the stored data. - protocol_file_path = os.path.join(checkpoint_dir_path, 'protocol.yaml') - stds_file_path = os.path.join(checkpoint_dir_path, 'states_stds.json') - - # Update protocol. - with open(protocol_file_path, 'w') as file_stream: - yaml.dump(optimal_protocol, file_stream) - - # Update the stds between states. - with open(stds_file_path, 'w') as f: - json.dump(states_stds, f) - - # Append positions of the state that we just simulated. - trajectory_file.write_sampler_state(sampler_state) - - -def _redistribute_trailblaze_states(old_protocol, states_stds, thermodynamic_distance): - """Redistribute the states using a bidirectional estimate of the thermodynamic length. - - Parameters - ---------- - old_protocol : Dict[str, List[float]] - The unidirectional optimal protocol. - states_stds : List[List[float]] - states_stds[j][i] is the standard deviation of the potential - difference between states i-1 and i computed in the j direction. - thermodynamic_distance : float - The distance between each pair of states. - - Returns - ------- - new_protocol : Dict[str, List[float]] - The new estimate of the optimal protocol. - states_map : List[int] - states_map[i] is the index of the state in the old protocol that - is closest to the i-th state in the new protocol. This allows to - map coordinates generated during trailblazing to the redistributed - protocol. - """ - # The parameter names in a fixed order. - parameter_names = [par_name for par_name in old_protocol] - - # Initialize the new protocol from the first state the optimal protocol. - new_protocol = {par_name: [values[0]] for par_name, values in old_protocol.items()} - - # The first state of the new protocol always maps to the first state of the old one. - states_map = [0] - - def _get_old_protocol_state(state_idx): - """Return a representation of the thermo state as a list of parameter values.""" - return np.array([old_protocol[par_name][state_idx] for par_name in parameter_names]) - - def _add_state_to_new_protocol(state): - for parameter_name, new_state_value in zip(parameter_names, state.tolist()): - new_protocol[parameter_name].append(new_state_value) - - # The thermodynamic length at 0 is 0.0. - states_stds[0] = [0.0] + states_stds[0] - states_stds[1] = [0.0] + states_stds[1] - - # We don't have the energy difference std in the - # direction opposite to the search direction so we - # pad the list. - states_stds[1].append(states_stds[0][-1]) - states_stds = np.array(states_stds) - - # Compute a bidirectional estimate of the thermodynamic length. - old_protocol_thermo_length = np.cumsum(np.mean(states_stds, axis=0)) - - # Trailblaze again interpolating the thermodynamic length function. - current_state_idx = 0 - current_state = _get_old_protocol_state(0) - last_state = _get_old_protocol_state(-1) - new_protocol_cum_thermo_length = 0.0 - while (current_state != last_state).any(): - # Find first state for which the accumulated standard - # deviation is greater than the thermo length threshold. - try: - while old_protocol_thermo_length[current_state_idx+1] - new_protocol_cum_thermo_length <= thermodynamic_distance: - current_state_idx += 1 - except IndexError: - # If we got to the end, we just add the last state - # to the protocol and stop the while loop. - _add_state_to_new_protocol(last_state) - break - - # Update current state. - current_state = _get_old_protocol_state(current_state_idx) - - # The thermodynamic length from the last redistributed state to current state. - pair_thermo_length = old_protocol_thermo_length[current_state_idx] - new_protocol_cum_thermo_length - - # Now interpolate between the current state state and the next to find - # the exact state for which the thermo length equal the threshold. - next_state = _get_old_protocol_state(current_state_idx+1) - differential = thermodynamic_distance - pair_thermo_length - differential /= old_protocol_thermo_length[current_state_idx+1] - old_protocol_thermo_length[current_state_idx] - new_state = current_state + differential * (next_state - current_state) - - # Update cumulative thermo length. - new_protocol_cum_thermo_length += thermodynamic_distance - - # Update states map. - closest_state_idx = current_state_idx if differential <= 0.5 else current_state_idx+1 - states_map.append(closest_state_idx) - - # Update redistributed protocol. - _add_state_to_new_protocol(new_state) - - # The last state of the new protocol always maps to the last state of the old one. - states_map.append(len(old_protocol_thermo_length)-1) - - return new_protocol, states_map - - def run_thermodynamic_trailblazing( thermodynamic_state, sampler_state, mcmc_move, state_parameters, - parameter_setters=None, thermodynamic_distance=1.0, - distance_tolerance=0.05, n_samples_per_state=100, - reversed_direction=False, bidirectional_redistribution=True, - bidirectional_search_thermo_dist='auto', - global_parameter_functions=None, function_variables=tuple(), - checkpoint_dir_path=None + parameter_setters=None, std_potential_threshold=0.5, + threshold_tolerance=0.05, n_samples_per_state=100, + reversed_direction=False, global_parameter_functions=None, + function_variables=tuple(), checkpoint_dir_path=None ): """ Find an alchemical path by placing alchemical states at a fixed distance. @@ -2296,17 +2095,15 @@ def run_thermodynamic_trailblazing( and computing the standard deviation of the difference of potential energies between the two states at those configurations. - The states of the protocol are chosen so that each pair has a distance - (in thermodynamic length) of ``thermodynamic_distance +- distance_tolerance``. - The thermodynamic length estimate (in kT) is based on the standard deviation - of the difference in potential energy between the two states. + Two states are chosen for the protocol if their standard deviation is + within ``std_potential_threshold +- threshold_tolerance``. The function is capable of resuming when interrupted if ``checkpoint_dir_path`` is specified. This will create two files called 'protocol.yaml' and 'coordinates.dcd' storing the protocol and initial positions and box vectors for each state that are generated while running the algorithm. - It is also possible to discretize a path specified through maathematical + It is also possibleto discretize a path specified through maathematical expressions through the arguments ``global_parameter_function`` and ``function_variables``. @@ -2330,33 +2127,19 @@ def run_thermodynamic_trailblazing( ``setter(thermodynamic_state, parameter_name, value)``. This is useful for example to set global parameter function variables with ``openmmtools.states.GlobalParameterState.set_function_variable``. - thermodynamic_distance : float, optional - The target distance (in thermodynamic length) between each pair of - states in kT. Default is 1.0 (kT). - distance_tolerance : float, optional - The tolerance on the found standard deviation. Default is 0.05 (kT). + std_potential_threshold : float, optional + The threshold that determines how to separate the states between + each others. + threshold_tolerance : float, optional + The tolerance on the found standard deviation. n_samples_per_state : int, optional How many samples to collect to estimate the overlap between two - states. Default is 100. + states. reversed_direction : bool, optional - If ``True``, the algorithm starts from the final state and traverses + If True, the algorithm starts from the final state and traverses the path from the end to the beginning. The returned path discretization will still be ordered from the beginning to the - end following the order in ``state_parameters``. Default is ``False``. - bidirectional_redistribution : bool, optional - If ``True``, the states will be redistributed using the standard - deviation of the potential difference between states in both - directions. Default is ``True``. - bidirectional_search_thermo_dist : float or 'auto', optional - If ``bidirectional_redistribution`` is ``True``, the thermodynamic - distance between the sampled states used to collect data along - the path can be different than the thermodynamic distance after - redistribution. The default ('auto') caps the thermodynamic - distance used for trailblazing at 1 kT. Keeping this value small - lower the chance of obtaining very large stds in the opposite direction - due to rare, dominating events in sections of the path where the overlap - decreases quickly, which in turn may results in unreasonably long - protocols. + end following the order in ``state_parameters``. global_parameter_functions : Dict[str, Union[str, openmmtools.states.GlobalParameterFunction]], optional Map a parameter name to a mathematical expression as a string or a ``openmmtools.states.GlobalParameterFunction`` object. @@ -2389,15 +2172,6 @@ def _function_variable_setter(state, parameter_name, value): # Make sure that the state parameters to optimize have a clear order. assert (isinstance(state_parameters, list) or isinstance(state_parameters, tuple)) - # Determine the thermo distance to achieve during the search. - if not bidirectional_redistribution: - search_thermo_dist = thermodynamic_distance - else: - if bidirectional_search_thermo_dist == 'auto': - search_thermo_dist = min(1.0, thermodynamic_distance) - else: - search_thermo_dist = bidirectional_search_thermo_dist - # Create unordered helper variable. state_parameter_dict = {x[0]: x[1] for x in state_parameters} @@ -2446,42 +2220,71 @@ def _function_variable_setter(state, parameter_name, value): # Reverse the direction of the algorithm if requested. if reversed_direction: state_parameters = [(par_name, end_states.__class__(reversed(end_states))) - for par_name, end_states in reversed(state_parameters)] + for par_name, end_states in state_parameters] # Initialize protocol with the starting value. optimal_protocol = {par: [values[0]] for par, values in state_parameters} - # Keep track of potential std between states in both directions - # of the path so that we can redistribute the states later. - # At the end of the protocol this will have the same length - # of the protocol minus one. The inner lists are for the forward - # and reversed direction stds respectively. - states_stds = [[], []] - # Check to see whether a trailblazing algorithm is already in progress, # and if so, restore to the previously checkpointed state. if checkpoint_dir_path is not None: - optimal_protocol, states_stds, trajectory_file, resumed_sampler_state = _resume_thermodynamic_trailblazing( - checkpoint_dir_path, optimal_protocol) - # Start from the last saved conformation. - if resumed_sampler_state is not None: - sampler_state = resumed_sampler_state - - # We keep track of the previous state in the optimal protocol - # that we'll use to compute the stds in the opposite direction. - if len(states_stds[0]) == 0: - previous_thermo_state = None - else: - previous_thermo_state = copy.deepcopy(thermodynamic_state) + # We save the protocol in a YAML file and the positions in netcdf. + protocol_file_path = os.path.join(checkpoint_dir_path, 'protocol.yaml') + positions_file_path = os.path.join(checkpoint_dir_path, 'coordinates.dcd') + + # Create the directory, if it doesn't exist. + os.makedirs(checkpoint_dir_path, exist_ok=True) + + # Create/load protocol checkpoint file. + try: + # Parse the previously calculated optimal_protocol dict. + with open(protocol_file_path, 'r') as file_stream: + optimal_protocol = yaml.load(file_stream, Loader=yaml.FullLoader) + except FileNotFoundError: + # The checkpoint doesn't exist. Create one. + with open(protocol_file_path, 'w') as file_stream: + yaml.dump(optimal_protocol, file_stream) + + # Check if there's an existing positions information. + try: + sampler_states = read_trailblaze_checkpoint_coordinates(checkpoint_dir_path) + except FileNotFoundError: + len_trajectory = 0 + else: + len_trajectory = len(sampler_states) + + # Raise an error if the algorithm was interrupted *during* + # writing on disk. We store only the states that we simulated + # so there should be one less here. + if len_trajectory < len(optimal_protocol[state_parameters[0][0]])-1: + err_msg = ("The trailblaze algorithm was interrupted while " + "writing the checkpoint file and it is now unable " + "to resume. Please delete the files " + f"in {checkpoint_dir_path} and restart.") + raise RuntimeError(err_msg) + + # When this is resumed, but the trajectory is already completed, + # the frame of the final end state has been already written, but + # we don't want to add it twice at the end of this function. + len_trajectory = len(optimal_protocol[state_parameters[0][0]])-1 + + # Whether the file exist or not, MDTraj doesn't support appending + # files so we open a new one and rewrite the configurations we + # have generated in the previous run. + trajectory_file = _DCDTrajectoryFile(positions_file_path, 'w', + force_overwrite=True) + if len_trajectory > 0: + for i in range(len_trajectory): + trajectory_file.write_sampler_state(sampler_states[i]) + # Make sure the next search starts from the last saved position + # unless the previous calculation was interrupted before the + # first position could be saved. + sampler_state = sampler_states[-1] # Make sure that thermodynamic_state is in the last explored # state, whether the algorithm was resumed or not. for state_parameter in optimal_protocol: - parameter_setters[state_parameter](thermodynamic_state, state_parameter, - optimal_protocol[state_parameter][-1]) - if previous_thermo_state is not None: - parameter_setters[state_parameter](previous_thermo_state, state_parameter, - optimal_protocol[state_parameter][-2]) + parameter_setters[state_parameter](thermodynamic_state, state_parameter, optimal_protocol[state_parameter][-1]) # We change only one parameter at a time. for state_parameter, values in state_parameters: @@ -2495,24 +2298,23 @@ def _function_variable_setter(state, parameter_name, value): # Gather data until we get to the last value. while optimal_protocol[state_parameter][-1] != values[-1]: - # Simulate current thermodynamic state to collect samples. + # Simulate current thermodynamic state to obtain energies. sampler_states = [] simulated_energies = np.zeros(n_samples_per_state) for i in range(n_samples_per_state): mcmc_move.apply(thermodynamic_state, sampler_state) + context, _ = mmtools.cache.global_context_cache.get_context(thermodynamic_state) + sampler_state.apply_to_context(context, ignore_velocities=True) + simulated_energies[i] = thermodynamic_state.reduced_potential(context) sampler_states.append(copy.deepcopy(sampler_state)) - # Keep track of the thermo state we use for the reweighting. - reweighted_thermo_state = None - # Find first state that doesn't overlap with simulated one - # with std(du) within search_thermo_dist +- distance_tolerance. + # with std(du) within std_potential_threshold +- threshold_tolerance. # We stop anyway if we reach the last value of the protocol. std_energy = 0.0 current_parameter_value = optimal_protocol[state_parameter][-1] - while (abs(std_energy - search_thermo_dist) > distance_tolerance and - not (current_parameter_value == values[1] and std_energy < search_thermo_dist)): - + while (abs(std_energy - std_potential_threshold) > threshold_tolerance and + not (current_parameter_value == values[1] and std_energy < std_potential_threshold)): # Determine next parameter value to compute. if np.isclose(std_energy, 0.0): # This is the first iteration or the two state overlap significantly @@ -2524,59 +2326,31 @@ def _function_variable_setter(state, parameter_name, value): derivative_std_energy = ((std_energy - old_std_energy) / (current_parameter_value - old_parameter_value)) old_parameter_value = current_parameter_value - current_parameter_value += (search_thermo_dist - std_energy) / derivative_std_energy + current_parameter_value += (std_potential_threshold - std_energy) / derivative_std_energy # Keep current_parameter_value inside bound interval. if search_direction * current_parameter_value > values[1]: current_parameter_value = values[1] assert search_direction * (optimal_protocol[state_parameter][-1] - current_parameter_value) < 0 - # Determine the thermo states at which we need to compute the energies. - # If this is the first attempt, compute also the reduced potential of - # the simulated energies and the previous state to estimate the standard - # deviation in the opposite direction. - if reweighted_thermo_state is None: - # First attempt. - reweighted_thermo_state = copy.deepcopy(thermodynamic_state) - computed_thermo_states = [reweighted_thermo_state, thermodynamic_state] - if previous_thermo_state is not None: - computed_thermo_states.append(previous_thermo_state) - else: - computed_thermo_states = [reweighted_thermo_state] - - # Set the reweighted state to the current parameter value. - parameter_setters[state_parameter](reweighted_thermo_state, state_parameter, current_parameter_value) + # Get context in new thermodynamic state. + parameter_setters[state_parameter](thermodynamic_state, state_parameter, current_parameter_value) + context, integrator = mmtools.cache.global_context_cache.get_context(thermodynamic_state) - # Compute all energies. - energies = np.empty(shape=(len(computed_thermo_states), n_samples_per_state)) + # Compute the energies at the sampled positions. + reweighted_energies = np.zeros(n_samples_per_state) for i, sampler_state in enumerate(sampler_states): - energies[:,i] = mmtools.states.reduced_potential_at_states( - sampler_state, computed_thermo_states, mmtools.cache.global_context_cache) - - # Cache the simulated energies for the next iteration. - if len(computed_thermo_states) > 1: - simulated_energies = energies[1] + sampler_state.apply_to_context(context, ignore_velocities=True) + reweighted_energies[i] = thermodynamic_state.reduced_potential(context) - # Compute the energy difference std in the direction: simulated state -> previous state. - if len(computed_thermo_states) > 2: - denergies = energies[2] - simulated_energies - states_stds[1].append(float(np.std(denergies, ddof=1))) - - # Compute the energy difference std between the currently simulated and the reweighted states. + # Compute standard deviation of the difference. old_std_energy = std_energy - denergies = energies[0] - simulated_energies - std_energy = np.std(denergies, ddof=1) + denergies = reweighted_energies - simulated_energies + std_energy = np.std(denergies) logger.debug('trailblazing: state_parameter {}, simulated_value {}, current_parameter_value {}, ' 'std_du {}'.format(state_parameter, optimal_protocol[state_parameter][-1], current_parameter_value, std_energy)) - # Store energy difference std in the direction: simulated state -> reweighted state. - states_stds[0].append(float(std_energy)) - - # Update variables for next iteration. - previous_thermo_state = copy.deepcopy(thermodynamic_state) - thermodynamic_state = reweighted_thermo_state - # Update the optimal protocol with the new value of this parameter. # The other parameters remain fixed. for par_name in optimal_protocol: @@ -2590,32 +2364,17 @@ def _function_variable_setter(state, parameter_name, value): # Save the updated checkpoint file to disk. if checkpoint_dir_path is not None: - _cache_trailblaze_data(checkpoint_dir_path, optimal_protocol, states_stds, - trajectory_file, sampler_state) + # Update protocol. + with open(protocol_file_path, 'w') as file_stream: + yaml.dump(optimal_protocol, file_stream) + # Update positions of the state that we just simulated. + trajectory_file.write_sampler_state(sampler_state) if checkpoint_dir_path is not None: # We haven't simulated the last state so we just set the positions of the second to last. trajectory_file.write_sampler_state(sampler_state) trajectory_file.close() - # Redistribute the states using the standard deviation estimates in both directions. - if bidirectional_redistribution: - optimal_protocol, states_map = _redistribute_trailblaze_states( - optimal_protocol, states_stds, thermodynamic_distance) - - # Save the states map for reading the coordinates correctly. - if checkpoint_dir_path is not None: - states_map_file_path = os.path.join(checkpoint_dir_path, 'states_map.json') - - if bidirectional_redistribution: - with open(states_map_file_path, 'w') as f: - json.dump(states_map, f) - elif os.path.isfile(states_map_file_path): - # If there's an old file because the path was previously redistributed, - # we delete it so that read_trailblaze_checkpoint_coordinates will - # return the coordinates associated to the most recently-generated path. - os.remove(states_map_file_path) - # If we have traversed the path in the reversed direction, re-invert # the order of the discretized path. if reversed_direction: @@ -2623,7 +2382,7 @@ def _function_variable_setter(state, parameter_name, value): optimal_protocol[par_name] = values.__class__(reversed(values)) # If we used global parameter functions, the optimal_protocol at this - # point is a discretization of the function_variables, not the original + # point is a discratization of the function_variables, not the original # parameters so we convert it back. len_protocol = len(next(iter(optimal_protocol.values()))) function_variables_protocol = {var: optimal_protocol.pop(var) for var in function_variables} diff --git a/Yank/schema/validator.py b/Yank/schema/validator.py index 7ad030ca..83332bbd 100644 --- a/Yank/schema/validator.py +++ b/Yank/schema/validator.py @@ -138,8 +138,7 @@ def _check_with_supported_system_file_format(self, field, file_paths): ('amber', {'inpcrd', 'prmtop'}), ('amber', {'rst7', 'prmtop'}), ('gromacs', {'gro', 'top'}), - ('openmm', {'pdb', 'xml'}), - ('charmm', {'pdb', 'psf'}) + ('openmm', {'pdb', 'xml'}) ] file_extension_type = None for extension_type, valid_extensions in expected_extensions: diff --git a/Yank/tests/test_experiment.py b/Yank/tests/test_experiment.py index 840737ae..f1c7beeb 100644 --- a/Yank/tests/test_experiment.py +++ b/Yank/tests/test_experiment.py @@ -14,12 +14,10 @@ # ============================================================================================= import itertools -from pprint import pformat import shutil import tempfile import textwrap import time -import typing import unittest import mdtraj @@ -995,9 +993,9 @@ def test_validation_correct_protocols(): trailblazer_options = [ {'n_equilibration_iterations': 1000, 'n_samples_per_state': 100, - 'thermodynamic_distance': 0.5, 'distance_tolerance': 0.05}, + 'std_potential_threshold': 0.5, 'threshold_tolerance': 0.05}, {'n_equilibration_iterations': 100, 'n_samples_per_state': 10}, - {'thermodynamic_distance': 1.0, 'distance_tolerance': 0.5}, + {'std_potential_threshold': 1.0, 'threshold_tolerance': 0.5}, {'function_variable_name': 'lambda'}, {'function_variable_name': 'lambda', 'reversed_direction': False} ] @@ -2965,7 +2963,7 @@ def test_run_solvation_experiment(): class TestTrailblazeAlchemicalPath: """Test suite for the automatic discretization of the alchemical path.""" - def _get_hydration_free_energy_script(self, tmp_dir, alchemical_path='auto', trailblazer_options=None): + def _get_harmonic_oscillator_script(self, tmp_dir, alchemical_path='auto', trailblazer_options=None): # Setup only 1 hydration free energy system in implicit solvent and vacuum. yaml_script = get_template_script(tmp_dir, systems=['hydration-system']) yaml_script['systems']['hydration-system']['solvent1'] = 'GBSA-OBC2' @@ -2986,104 +2984,11 @@ def _get_hydration_free_energy_script(self, tmp_dir, alchemical_path='auto', tra return yaml_script - def test_generate_lambda_alchemical_function(self): - """The alchemical functions generated by ExperimentBuilder._generate_lambda_alchemical_function are correct.""" - from openmmtools.utils import math_eval - - def evaluate(expr, l): - variables = {'lambda': l} - return math_eval(expr, variables) - - # Each test case are [(lambda0, lambda1), (f(lambda0), f([lambda0+lambda1]/2), f(lambda1))] - # where the second tuple in the list are the expected values of the function. - test_cases = [(0, 1), (1, 0), (2, 3), (3, 2), (4, 8), (10, 5)] - - for lambda0, lambda1 in test_cases: - expr = ExperimentBuilder._generate_lambda_alchemical_function(lambda0, lambda1) - print(lambda0, lambda1, ':', expr) - assert evaluate(expr, lambda0) == 0.0 - assert evaluate(expr, (lambda0 + lambda1)/2) == 0.5 - assert evaluate(expr, lambda1) == 1.0 - - # The funciton must be constant after the end states. - if lambda0 < lambda1: - assert evaluate(expr, lambda0-1) == 0.0 - assert evaluate(expr, lambda1+1) == 1.0 - else: - assert evaluate(expr, lambda0+1) == 0.0 - assert evaluate(expr, lambda1-1) == 1.0 - - def test_determine_trailblaze_path(self): - """Test the various paths generated by ExperimentBuilder._determine_trailblaze_path with alchemical functions""" - - # Mock objects to test the static function. - class MockTopography(typing.NamedTuple): - receptor_atoms: typing.List - solvent_atoms: typing.List - - class MockAlchemicalRegions(typing.NamedTuple): - annihilate_electrostatics: typing.List - annihilate_sterics: typing.List - - class MockPhaseFactory: - def __init__(self, restraint, is_vacuum, annihilate_electrostatics, annihilate_sterics): - self.restraint = restraint - if is_vacuum: - self.topography = MockTopography([], []) - else: - self.topography = MockTopography([0], [1]) - self.alchemical_regions = MockAlchemicalRegions(annihilate_electrostatics, annihilate_sterics) - - # Each test case is (mock_phase_factory_init_kwargs, expected_alchemical_functions, expected_state_parameters). - p = ExperimentBuilder._generate_lambda_alchemical_function # Shortcut. - test_cases = [ - ( - dict(restraint=True, is_vacuum=False, annihilate_electrostatics=True, annihilate_sterics=False), - dict(lambda_restraints=p(3, 2), lambda_electrostatics=p(1, 2), lambda_sterics=p(0, 1)), - [('lambda', [3.0, 0.0])] - ), - ( - dict(restraint=None, is_vacuum=False, annihilate_electrostatics=True, annihilate_sterics=False), - dict(lambda_electrostatics=p(1, 2), lambda_sterics=p(0, 1)), - [('lambda', [2.0, 0.0])] - ), - ( - dict(restraint=None, is_vacuum=True, annihilate_electrostatics=True, annihilate_sterics=False), - dict(lambda_electrostatics=p(0, 1), lambda_sterics='1.0'), - [('lambda', [1.0, 0.0])] - ), - ( - dict(restraint=True, is_vacuum=True, annihilate_electrostatics=False, annihilate_sterics=False), - dict(lambda_restraints=p(1, 0), lambda_electrostatics='1.0', lambda_sterics='1.0'), - [('lambda', [1.0, 0.0])] - ), - ( - dict(restraint=restraints.RMSD(), is_vacuum=False, annihilate_electrostatics=True, annihilate_sterics=False), - dict(lambda_restraints=p(4, 3) + ' - (' + p(1, 0) + ')', lambda_electrostatics=p(2, 3), lambda_sterics=p(1, 2)), - [('lambda', [4.0, 0.0])] - ), - ] - - for idx, (phase_factory_kwargs, expected_alchemical_functions, expected_state_parameters) in enumerate(test_cases): - phase_factory = MockPhaseFactory(**phase_factory_kwargs) - alchemical_functions, states_parameters = ExperimentBuilder._determine_trailblaze_path( - phase_factory, alchemical_path='auto', generate_alchemical_functions=True) - - # Convert alchemical functions objects to string expressions for comparison. - for parameter_name, alchemical_function in alchemical_functions.items(): - alchemical_functions[parameter_name] = alchemical_function._expression - - err_msg = 'test case ' + str(idx) + ':\n\nExpected:\n{}\n\nObtained:\n{}' - assert alchemical_functions == expected_alchemical_functions, err_msg.format(pformat(expected_alchemical_functions), - pformat(alchemical_functions)) - assert states_parameters == expected_state_parameters, err_msg.format(pformat(expected_state_parameters), - pformat(states_parameters)) - def test_auto_alchemical_path(self): """Test automatic alchemical path found by thermodynamic trailblazing when the option 'auto' is set.""" with mmtools.utils.temporary_directory() as tmp_dir: # Setup only 1 hydration free energy system in implicit solvent and vacuum. - yaml_script = self._get_hydration_free_energy_script( + yaml_script = self._get_harmonic_oscillator_script( tmp_dir, alchemical_path='auto', trailblazer_options={'n_equilibration_iterations': 0} ) @@ -3131,7 +3036,7 @@ def test_start_from_trailblaze_samples_path(self): """Test the correct implementation of the option start_from_trailblaze_samples.""" with mmtools.utils.temporary_directory() as tmp_dir: # Setup only 1 hydration free energy system in implicit solvent and vacuum. - yaml_script = self._get_hydration_free_energy_script( + yaml_script = self._get_harmonic_oscillator_script( tmp_dir, alchemical_path='auto', trailblazer_options={'n_equilibration_iterations': 0} ) @@ -3157,7 +3062,7 @@ def test_alchemical_functions_path(self): """Test automatic alchemical path found from alchemical functions.""" with mmtools.utils.temporary_directory() as tmp_dir: # Setup only 1 hydration free energy system in implicit solvent and vacuum. - yaml_script = self._get_hydration_free_energy_script( + yaml_script = self._get_harmonic_oscillator_script( tmp_dir, alchemical_path={'lambda_electrostatics': 'lambda', 'lambda_sterics': 'lambda', diff --git a/Yank/tests/test_pipeline.py b/Yank/tests/test_pipeline.py index 780b93ed..8b3c40de 100644 --- a/Yank/tests/test_pipeline.py +++ b/Yank/tests/test_pipeline.py @@ -14,7 +14,6 @@ # ============================================================================= from yank.pipeline import * -from yank.pipeline import _redistribute_trailblaze_states from nose.tools import assert_raises_regexp @@ -92,8 +91,7 @@ def test_pack_transformation(): class TestThermodynamicTrailblazing: """Test suite for the thermodynamic trailblazing function.""" - PAR_NAME_X0 = 'testsystems_HarmonicOscillator_x0' - PAR_NAME_K = 'testsystems_HarmonicOscillator_K' + PAR_NAME = 'testsystems_HarmonicOscillator_x0' @classmethod def get_harmonic_oscillator(cls): @@ -103,20 +101,13 @@ def get_harmonic_oscillator(cls): # Create composable state that control offset of harmonic oscillator. class X0State(GlobalParameterState): - testsystems_HarmonicOscillator_x0 = GlobalParameterState.GlobalParameter( - cls.PAR_NAME_X0, 1.0) - testsystems_HarmonicOscillator_K = GlobalParameterState.GlobalParameter( - cls.PAR_NAME_K, (1.0*unit.kilocalories_per_mole/unit.nanometer**2).value_in_unit_system(unit.md_unit_system)) + testsystems_HarmonicOscillator_x0 = GlobalParameterState.GlobalParameter(cls.PAR_NAME, 1.0) # Create a harmonic oscillator thermo state. - k = 1.0*unit.kilocalories_per_mole/unit.nanometer**2 - oscillator = mmtools.testsystems.HarmonicOscillator(K=k) + oscillator = mmtools.testsystems.HarmonicOscillator(K=1.0*unit.kilocalories_per_mole/unit.nanometer**2) sampler_state = SamplerState(positions=oscillator.positions) thermo_state = ThermodynamicState(oscillator.system, temperature=300*unit.kelvin) - x0_state = X0State( - testsystems_HarmonicOscillator_x0=0.0, - testsystems_HarmonicOscillator_K=k.value_in_unit_system(unit.md_unit_system) - ) + x0_state = X0State(testsystems_HarmonicOscillator_x0=0.0) compound_state = CompoundThermodynamicState(thermo_state, composable_states=[x0_state]) return compound_state, sampler_state @@ -145,7 +136,7 @@ def _check_checkpoint_files(checkpoint_dir_path, expected_protocol, n_atoms): assert checkpoint_protocol == expected_protocol # The positions and box vectors have the correct dimension. - expected_n_states = len(expected_protocol[self.PAR_NAME_X0]) + expected_n_states = len(expected_protocol[self.PAR_NAME]) trajectory_file = mdtraj.formats.DCDTrajectoryFile(checkpoint_positions_path, 'r') xyz, cell_lengths, cell_angles = trajectory_file.read() assert (xyz.shape[0], xyz.shape[1]) == (expected_n_states, n_atoms) @@ -163,7 +154,7 @@ def _check_checkpoint_files(checkpoint_dir_path, expected_protocol, n_atoms): first_protocol = run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, checkpoint_dir_path=checkpoint_dir_path, - state_parameters=[(self.PAR_NAME_X0, [0.0, 1.0])] + state_parameters=[(self.PAR_NAME, [0.0, 1.0])] ) # The info in the checkpoint files is correct. @@ -174,11 +165,10 @@ def _check_checkpoint_files(checkpoint_dir_path, expected_protocol, n_atoms): second_protocol = run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, checkpoint_dir_path=checkpoint_dir_path, - state_parameters=[(self.PAR_NAME_X0, [0.0, 2.0])], - bidirectional_redistribution=False + state_parameters=[(self.PAR_NAME, [0.0, 2.0])] ) - len_first_protocol = len(first_protocol[self.PAR_NAME_X0]) - assert second_protocol[self.PAR_NAME_X0][:len_first_protocol] == first_protocol[self.PAR_NAME_X0] + len_first_protocol = len(first_protocol[self.PAR_NAME]) + assert second_protocol[self.PAR_NAME][:len_first_protocol] == first_protocol[self.PAR_NAME] # The info in the checkpoint files is correct. _check_checkpoint_files(checkpoint_dir_path, second_protocol, n_atoms) @@ -191,15 +181,15 @@ def test_trailblaze_setters(self): # Assign to the parameter a function and run the # trailblaze algorithm over the function variable. - global_parameter_functions = {self.PAR_NAME_X0: 'lambda**2'} + global_parameter_functions = {self.PAR_NAME: 'lambda**2'} function_variables = ['lambda'] # Make sure it's not possible to have a parameter defined as a function and as a parameter state as well. - err_msg = f"Cannot specify {self.PAR_NAME_X0} in 'state_parameters' and 'global_parameter_functions'" + err_msg = f"Cannot specify {self.PAR_NAME} in 'state_parameters' and 'global_parameter_functions'" with assert_raises_regexp(ValueError, err_msg): run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, - state_parameters=[(self.PAR_NAME_X0, [0.0, 1.0])], + state_parameters=[(self.PAR_NAME, [0.0, 1.0])], global_parameter_functions=global_parameter_functions, function_variables=function_variables, ) @@ -211,8 +201,8 @@ def test_trailblaze_setters(self): global_parameter_functions=global_parameter_functions, function_variables=function_variables, ) - assert list(protocol.keys()) == [self.PAR_NAME_X0] - parameter_protocol = protocol[self.PAR_NAME_X0] + assert list(protocol.keys()) == [self.PAR_NAME] + parameter_protocol = protocol[self.PAR_NAME] assert parameter_protocol[0] == 0 assert parameter_protocol[-1] == 1 @@ -221,113 +211,9 @@ def test_reversed_direction(self): # Create a harmonic oscillator system to test. compound_state, sampler_state = self.get_harmonic_oscillator() mcmc_move = self.get_langevin_dynamics_move() - - # For this, we run through two variables to make sure they are executed in the correct order. - k_start = getattr(compound_state, self.PAR_NAME_K) - k_end = k_start * 2 protocol = run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, - state_parameters=[ - (self.PAR_NAME_X0, [1.0, 0.0]), - (self.PAR_NAME_K, [k_start, k_end]), - ], - reversed_direction=True, - bidirectional_redistribution=False + state_parameters=[(self.PAR_NAME, [1.0, 0.0])], + reversed_direction=True ) - assert protocol[self.PAR_NAME_X0] == [1.0, 0.0, 0.0], protocol[self.PAR_NAME_X0] - assert protocol[self.PAR_NAME_K] == [k_start, k_start, k_end], protocol[self.PAR_NAME_K] - - def test_redistribute_trailblaze_states(self): - """States are redistributed correctly as a function of the bidirectional thermo length.""" - # This simulates the protocol found after trailblazing. - optimal_protocol = { - 'lambda_electrostatics': [1.0, 0.5, 0.0, 0.0, 0.0], - 'lambda_sterics': [1.0, 1.0, 1.0, 0.5, 0.0] - } - - # Each test case is a tuple (states_stds, expected_redistributed_protocol, expected_states_map). - test_cases = [ - ( - [[0.5, 0.5, 0.5, 0.5], - [1.5, 1.5, 1.5]], - {'lambda_electrostatics': [1.0, 0.75, 0.5, 0.25, 0.0, 0.0, 0.0, 0.0], - 'lambda_sterics': [1.0, 1.0, 1.0, 1.0, 1.0, 0.75, 0.5, 0.0]}, - [0, 0, 1, 1, 2, 2, 3, 4] - ), - ( - [[0.5, 0.5, 0.5, 0.5], - [0.0, 0.0, 0.0]], - {'lambda_electrostatics': [1.0, 0.0, 0.0, 0.0], - 'lambda_sterics': [1.0, 1.0, 0.25, 0.0]}, - [0, 2, 3, 4] - ), - ] - - for states_stds, expected_redistributed_protocol, expected_states_map in test_cases: - redistributed_protocol, states_map = _redistribute_trailblaze_states( - optimal_protocol, states_stds, thermodynamic_distance=0.5) - assert expected_redistributed_protocol == redistributed_protocol, \ - f'{expected_redistributed_protocol} != {redistributed_protocol}' - assert expected_states_map == states_map, f'{expected_states_map} != {states_map}' - - def test_read_trailblaze_checkpoint_coordinates(self): - """read_trailblaze_checkpoint_coordinates() returns the correct number of SamplerStates.""" - # Create a harmonic oscillator system to test. - compound_state, sampler_state = self.get_harmonic_oscillator() - mcmc_move = self.get_langevin_dynamics_move() - - # The end states differ only in the spring constant - k_initial = getattr(compound_state, self.PAR_NAME_K) - k_final = 250 * k_initial - - # Helper function. - def _call_run_thermodynamic_trailblazing( - thermodynamic_distance, bidirectional_redistribution, - bidirectional_search_thermo_dist, checkpoint_dir_path - ): - return run_thermodynamic_trailblazing( - compound_state, sampler_state, mcmc_move, - state_parameters=[ - (self.PAR_NAME_X0, [0.0, 1.0]), - (self.PAR_NAME_K, [k_initial, k_final]) - ], - thermodynamic_distance=thermodynamic_distance, - bidirectional_redistribution=bidirectional_redistribution, - bidirectional_search_thermo_dist=bidirectional_search_thermo_dist, - checkpoint_dir_path=checkpoint_dir_path - ) - - # Each test case is (bidirectional_redistribution, thermodynamic_distance, bidirectional_search_thermo_dist). - test_cases = [ - (False, 1.0, 'auto'), - (True, 2.0, 1.0), - (True, 0.5, 1.0), - ] - - for bidirectional_redistribution, thermodynamic_distance, bidirectional_search_thermo_dist in test_cases: - with mmtools.utils.temporary_directory() as checkpoint_dir_path: - # Compute the protocol. - protocol = _call_run_thermodynamic_trailblazing( - thermodynamic_distance, bidirectional_redistribution, - bidirectional_search_thermo_dist, checkpoint_dir_path - ) - - # The number of frames returned should be equal to the number of - # states in the protocol. If this was redistributed, this might - # be different than the number of frames generated. - sampler_states = read_trailblaze_checkpoint_coordinates(checkpoint_dir_path) - len_protocol = len(protocol['testsystems_HarmonicOscillator_x0']) - err_msg = ( - f'bidirectional_redistribution={bidirectional_redistribution}, ' - f'thermodynamic_distance={thermodynamic_distance}, ' - f'bidirectional_search_thermo_dist={bidirectional_search_thermo_dist}: ' - f'{len(sampler_states)} != {len_protocol}: {protocol}' - ) - assert len(sampler_states) == len_protocol, err_msg - - # Now, resuming should give me the same exact protocol. - resumed_protocol = _call_run_thermodynamic_trailblazing( - thermodynamic_distance, bidirectional_redistribution, - bidirectional_search_thermo_dist, checkpoint_dir_path - ) - assert protocol == resumed_protocol + assert protocol[self.PAR_NAME] == [1.0, 0.0] diff --git a/Yank/tests/test_yank.py b/Yank/tests/test_yank.py index 7ae42e85..3856bbe0 100644 --- a/Yank/tests/test_yank.py +++ b/Yank/tests/test_yank.py @@ -71,23 +71,20 @@ def test_topography(): def test_topography_subset_regions(): """Test that topography subset region selection works""" - # This test relies on all other tests for topography passing. + # This test relies on all other tests for topography passing host_guest_explicit = testsystems.HostGuestExplicit() topography = Topography(host_guest_explicit.topology, ligand_atoms='resname B2') ligand_list = list(range(126, 156)) receptor_list = list(range(126)) n_slice = 3 # Number of atoms to slice from front and back assert topography.ligand_atoms == ligand_list - - # Add regions that will be tested. topography.add_region('lig_dsl', 'resname B2') topography.add_region('lig_list', ligand_list) topography.add_region('rec_list', receptor_list) topography.add_region('rec_lig_slice', receptor_list[-n_slice:] + ligand_list[:n_slice]) - - # Selection list: selection, subset, sort_by, expected result. + # Selection list: selection, subset, sort_by, result selections = ( - # DSL select, same DSL subset, small to large index. + # DSL select, same DSL subset, small to large ('resname B2', 'resname B2', 'index', ligand_list), # DSL select, region subset ('resname B2', 'lig_list', 'index', ligand_list), @@ -101,19 +98,19 @@ def test_topography_subset_regions(): ('lig_list', 'resname CB7', 'index', []), # compound Region, partial subset in bad order, order of region ('lig_list or rec_list', 'rec_lig_slice', 'region_order', ligand_list[:n_slice] + receptor_list[-n_slice:]) - ) + ) for selection, subset, sort_by, result in selections: selected = topography.select(selection, sort_by=sort_by, subset=subset, as_set=False) - assert result == selected, (f"Failed to match {selection}, subset {subset}, " - f"by {sort_by} to {result},\ngot {selected}") + assert result == selected, "Failed to match {}, subset {}, by {} to {},\ngot {}".format(selection, subset, + sort_by, result, + selected) def test_topography_regions(): """Test that topography regions are created and fetched""" toluene_vacuum = testsystems.TolueneVacuum() topography = Topography(toluene_vacuum.topology) - - # Should do nothing and return nothing without error. + # Should do nothing and return nothing without error assert topography.remove_region('Nothing') is None topography.add_region('A hard list', [0, 1, 2]) assert 'A hard list' in topography @@ -121,13 +118,12 @@ def test_topography_regions(): topography.remove_region('A junk list') assert 'A junk list' not in topography assert topography.get_region('A hard list') == [0, 1, 2] - - # Confirm that string typing is handled. + # Confirm that string typing is handled topography.add_region('carbon', 'element C') assert len(topography.get_region('carbon')) > 0 with nose.tools.assert_raises(ValueError): topography.add_region('failure', 'Bad selection string') - # Ensure region was not added. + # Ensure region was not added assert 'failure' not in topography @@ -393,14 +389,14 @@ def test_default_alchemical_region(self): # Test case: (system, topography, topography_region) test_cases = [ - (self.host_guest_implicit[1].system, self.host_guest_implicit[3], 'ligand_atoms'), - (self.host_guest_explicit[1].system, self.host_guest_explicit[3], 'ligand_atoms'), - (self.alanine_explicit[1].system, self.alanine_explicit[3], 'solute_atoms'), - (sodium_chloride.system, negative_solute, 'solute_atoms'), - (sodium_chloride.system, positive_solute, 'solute_atoms') + (self.host_guest_implicit[1].system, self.host_guest_implicit[3], self.host_guest_implicit[2], 'ligand_atoms'), + (self.host_guest_explicit[1].system, self.host_guest_explicit[3], self.host_guest_explicit[2], 'ligand_atoms'), + (self.alanine_explicit[1].system, self.alanine_explicit[3], self.alanine_explicit[2], 'solute_atoms'), + (sodium_chloride.system, negative_solute, sodium_chloride[2], 'solute_atoms'), + (sodium_chloride.system, positive_solute, sodium_chloride[2], 'solute_atoms') ] - for system, topography, alchemical_region_name in test_cases: + for system, topography, sampler_state, alchemical_region_name in test_cases: expected_alchemical_atoms = getattr(topography, alchemical_region_name) # Compute net charge of alchemical atoms. @@ -411,7 +407,7 @@ def test_default_alchemical_region(self): for protocol in protocols: alchemical_region = AlchemicalPhase._build_default_alchemical_region( - system, topography, protocol) + system, topography, sampler_state, protocol) assert alchemical_region.alchemical_atoms == expected_alchemical_atoms # The alchemical region must be neutralized. diff --git a/Yank/yank.py b/Yank/yank.py index b117365a..c13eb905 100644 --- a/Yank/yank.py +++ b/Yank/yank.py @@ -325,14 +325,16 @@ def select(self, selection, as_set=False, sort_by='auto', subset=None) -> Union[ no effect. """ - selection = copy.deepcopy(selection) - topology = self.topology - # Make sure that the subset of atoms is a list of atom indices - # and slice the topology will be matching against accordingly. + selection = copy.deepcopy(selection) + # Handle subset. Define a subset topology to manipulate, then define a common call to convert subset atom + # into absolute atom if subset is not None: - subset = self.select(subset, sort_by='index', as_set=False, subset=None) - topology = self.topology.subset(subset) + subset_atoms = self.select(subset, sort_by='index', as_set=False, subset=None) + topology = self.topology.subset(subset_atoms) + else: + subset_atoms = None + topology = self.topology class AtomMap(object): """Atom mapper class""" @@ -357,7 +359,7 @@ def __contains__(self, item): else: return item in self.subset_atoms - atom_map = AtomMap(subset) + atom_map = AtomMap(subset_atoms) # Shorthand for later atom_mapping = atom_map.atom_mapping @@ -1007,7 +1009,7 @@ def create(self, thermodynamic_state, sampler_states, topography, protocol, # Handle default alchemical region. if alchemical_regions is None: alchemical_regions = self._build_default_alchemical_region( - reference_system, topography, protocol) + reference_system, topography, sampler_states[0], protocol) # Check that we have atoms to alchemically modify. if len(alchemical_regions.alchemical_atoms) == 0: @@ -1296,7 +1298,7 @@ def _expand_state_cutoff(thermodynamic_state, expanded_cutoff_distance, raise RuntimeError('Barostated box sides must be at least {} Angstroms ' 'to correct for missing dispersion interactions. The ' 'minimum dimension of the provided box is {} Angstroms' - ''.format(expanded_cutoff_distance/unit.angstrom * 2 / fluctuation_size, + ''.format(expanded_cutoff_distance/unit.angstrom * 2, min_box_dimension/unit.angstrom)) logger.debug('Setting cutoff for fully interacting system to {}. The minimum box ' @@ -1331,7 +1333,7 @@ def _expand_state_cutoff(thermodynamic_state, expanded_cutoff_distance, return thermodynamic_state @staticmethod - def _build_default_alchemical_region(system, topography, protocol): + def _build_default_alchemical_region(system, topography, sampler_state, protocol): """Create a default AlchemicalRegion if the user hasn't provided one.""" # TODO: we should probably have a second region that annihilate sterics of counterions. alchemical_region_kwargs = {} @@ -1348,8 +1350,8 @@ def _build_default_alchemical_region(system, topography, protocol): # counterions to make sure that the solvation box is always neutral. if system.usesPeriodicBoundaryConditions(): alchemical_counterions = mpiplus.run_single_node(0, pipeline.find_alchemical_counterions, - system, topography, alchemical_region_name, - broadcast_result=True) + system, topography, sampler_state, + alchemical_region_name, broadcast_result=True) alchemical_atoms += alchemical_counterions # Sort them by index for safety. We don't want to From 66bd09508fa4616d83496f5b473b595a61bde476 Mon Sep 17 00:00:00 2001 From: Esteban Vohringer-Martinez Date: Tue, 25 Aug 2020 07:37:54 -0400 Subject: [PATCH 2/2] update to version 0.25.3 --- Yank/experiment.py | 197 +++++++++++--- Yank/pipeline.py | 496 ++++++++++++++++++++++++++-------- Yank/schema/validator.py | 3 +- Yank/tests/test_experiment.py | 107 +++++++- Yank/tests/test_pipeline.py | 147 ++++++++-- Yank/tests/test_yank.py | 23 +- Yank/yank.py | 12 +- 7 files changed, 808 insertions(+), 177 deletions(-) diff --git a/Yank/experiment.py b/Yank/experiment.py index 15bbe7c5..5e4d3f73 100644 --- a/Yank/experiment.py +++ b/Yank/experiment.py @@ -234,7 +234,7 @@ def create_alchemical_phase(self): if self.options['store_solute_trajectory']: # "Solute" is basically just not water. Includes all non-water atoms and ions # Topography ensures the union of solute_atoms and ions_atoms is a null set - solute_atoms = self.topography.solute_atoms + self.topography.ions_atoms + solute_atoms = sorted(self.topography.solute_atoms + self.topography.ions_atoms) if checkpoint_interval == 1: logger.warning("WARNING! You have specified both a solute-only trajectory AND a checkpoint " "interval of 1! You are about write the trajectory of the solute twice!\n" @@ -798,6 +798,10 @@ def status(self): in the experiments have been completed, "pending" if they are both pending, and "ongoing" otherwise. + Sometimes the netcdf file can't be read while the simulation is + running. In this case, the status of the phase (n.b., not the + experiment) is set to "unavailable". + Yields ------ experiment_status : namedtuple @@ -845,6 +849,10 @@ def status(self): except FileNotFoundError: iteration = None phase_status = 'pending' + except OSError: + # The simulation is probably running and the netcdf file is locked. + iteration = None + phase_status = 'unavailable (probably currently running)' else: iteration = phase_status.iteration if _is_phase_completed(phase_status, number_of_iterations): @@ -1502,15 +1510,21 @@ def _validate_protocols(protocols_description): n_samples_per_state: type: integer default: 100 - std_potential_threshold: + thermodynamic_distance: type: float default: 0.5 - threshold_tolerance: + distance_tolerance: type: float default: 0.05 reversed_direction: type: boolean default: yes + bidirectional_redistribution: + type: boolean + default: yes + bidirectional_search_thermo_dist: + required: no + type: float function_variable_name: required: no type: string @@ -1692,6 +1706,14 @@ def _is_pipeline_solvent_with_receptor(field, solvent_id, error): type: string dependencies: [phase1_path, phase2_path] check_with: directory_exists + charmm_parameter_files: + required: no + type: list + schema: + type: string + check_with: file_exists + dependencies: [phase1_path, phase2_path] + # Solvents solvent: @@ -2441,11 +2463,15 @@ def __getattr__(self, _): phase_factory.options['number_of_equilibration_iterations'] = 0 # We only need to create a single thermo state for the equilibration, - # but AlchemicalPhase currently doesn't support alchemical functions yet - # so we need to set the state manually. - equilibration_protocol = {par[0]: [par[1][0]] for par in state_parameters} + # and we need to equilibrate in the state used to start the algorithm. + if trailblazer_options['reversed_direction']: + equilibration_state_idx = -1 + else: + equilibration_state_idx = 0 + # AlchemicalPhase doesn't support alchemical functions yet so we need to set the state manually. + equilibration_protocol = {par[0]: [par[1][equilibration_state_idx]] for par in state_parameters} if function_variable_name is not None: - variable_value = equilibration_protocol.pop(function_variable_name)[0] + variable_value = equilibration_protocol.pop(function_variable_name)[equilibration_state_idx] expression_context = {function_variable_name: variable_value} for parameter_name, alchemical_function in alchemical_functions.items(): equilibration_protocol[parameter_name] = [alchemical_function(expression_context)] @@ -2496,7 +2522,125 @@ def __getattr__(self, _): self._generate_yaml(experiment, script_path, overwrite_protocol=protocol) @staticmethod - def _determine_trailblaze_path(phase_factory, alchemical_path): + def _generate_lambda_alchemical_function(lambda0, lambda1): + """Generate an alchemical function expression for a variable. + + The function will go from 0 to 1 between lambda0 and lambda1 and + remain constant for lambda values above or below the given ones. + """ + if lambda0 < lambda1: + # The function increases from lambda0 to lambda1. + return (f'1 - ({lambda1} - lambda) / ({lambda1} - {lambda0}) * step({lambda1} - lambda) - ' + f'(lambda - {lambda0}) / ({lambda1} - {lambda0}) * step({lambda0} - lambda)') + else: + # The function decreases from lambda0 to lambda1. + return (f'({lambda0} - lambda) / ({lambda0} - {lambda1}) * step({lambda0} - lambda) - ' + f'(({lambda0} - lambda) / ({lambda0} - {lambda1}) - 1)* step({lambda1} - lambda)') + + @classmethod + def _generate_auto_alchemical_functions(cls, phase_factory): + """Generate the default auto protocol with AlchemicalFunctions. + + Using alchemical functions allow us to "cut corners" and avoid forcing + states when variables are completely turned off/on if not necessary to + maintain a constant thermodynamic length between states (e.g. lambda_electrostatics=0 + and lambda_sterics=1). + + On the other hand, these "corners" are tricky because the thermodynamic + tensor can change a lot at these points and the Newton-like algorithm + has more troubles converging. + """ + alchemical_functions = {} + + # In the normal path, we turn on the restraint from lambda=3 to lambda=2, + # then we turn off the electrostatics from lambda=2 to lambda=1, + # and finally the sterics from lambda=1 to lambda=0. + is_vacuum = (len(phase_factory.topography.receptor_atoms) == 0 and + len(phase_factory.topography.solvent_atoms) == 0) + + # Initialize lambda end states. lambda_end_states[0] and [1] + # are the coupled and decoupled states respectively + lambda_end_states = [0, 0] + + # Starting from the decoupled state, first turn the RMSD restraints on 0 to 1. + # At this point the restraint is activated and lambda_restraint == 1 + # so the alchemical function have to go from 0 to -1 between + # lambda == 1 and lambda == 0. + if isinstance(phase_factory.restraint, restraints.RMSD): + lambda_end_states[0] += 1 + alchemical_function = cls._generate_lambda_alchemical_function(*lambda_end_states) + alchemical_functions['lambda_restraints'] = ' - (' + alchemical_function + ')' + + # Remove the sterics right before that. However, there's no need + # to go through electrostatics if this is a vacuum calculation and + # we are only decoupling the Lennard-Jones interactions. + if is_vacuum and not phase_factory.alchemical_regions.annihilate_sterics: + alchemical_functions['lambda_sterics'] = '1.0' + else: + alchemical_function = cls._generate_lambda_alchemical_function(lambda_end_states[0], + lambda_end_states[0]+1) + alchemical_functions['lambda_sterics'] = alchemical_function + lambda_end_states[0] += 1 + + # Same for electrostatics. + if is_vacuum and not phase_factory.alchemical_regions.annihilate_electrostatics: + alchemical_functions['lambda_electrostatics'] = '1.0' + else: + alchemical_function = cls._generate_lambda_alchemical_function(lambda_end_states[0], + lambda_end_states[0]+1) + alchemical_functions['lambda_electrostatics'] = alchemical_function + lambda_end_states[0] += 1 + + # Finally turn off the restraint if there is any. + if phase_factory.restraint is not None: + alchemical_function = cls._generate_lambda_alchemical_function(lambda_end_states[0]+1, + lambda_end_states[0]) + # If this is a RMSD restraint, there should be already a segment + # of the function in which the RMSD restraint is turned off. + try: + alchemical_functions['lambda_restraints'] = alchemical_function + alchemical_functions['lambda_restraints'] + except KeyError: + alchemical_functions['lambda_restraints'] = alchemical_function + lambda_end_states[0] += 1 + + # Convert strings to actual alchemical function objects. + for parameter_name, alchemical_function in alchemical_functions.items(): + alchemical_functions[parameter_name] = mmtools.alchemy.AlchemicalFunction(alchemical_function) + + # The end states for lambda feeded to the trailblaze function have + # to go from the coupled to the decoupled direction. + state_parameters = [('lambda', lambda_end_states)] + + return alchemical_functions, state_parameters + + @staticmethod + def _generate_auto_state_parameters(phase_factory): + """Generate the default auto protocol as a sequence of state_parameters.""" + is_vacuum = (len(phase_factory.topography.receptor_atoms) == 0 and + len(phase_factory.topography.solvent_atoms) == 0) + state_parameters = [] + + # First, turn on the restraint if there are any. + if phase_factory.restraint is not None: + state_parameters.append(('lambda_restraints', [0.0, 1.0])) + # We support only lambda sterics and electrostatics for now. + if is_vacuum and not phase_factory.alchemical_regions.annihilate_electrostatics: + state_parameters.append(('lambda_electrostatics', [1.0, 1.0])) + else: + state_parameters.append(('lambda_electrostatics', [1.0, 0.0])) + if is_vacuum and not phase_factory.alchemical_regions.annihilate_sterics: + state_parameters.append(('lambda_sterics', [1.0, 1.0])) + else: + state_parameters.append(('lambda_sterics', [1.0, 0.0])) + # Turn the RMSD restraints off slowly at the end + if isinstance(phase_factory.restraint, restraints.RMSD): + state_parameters.append(('lambda_restraints', [1.0, 0.0])) + + return state_parameters + + @classmethod + def _determine_trailblaze_path(cls, phase_factory, alchemical_path, + generate_alchemical_functions=False): """Determine the path to discretize feeded to the trailblaze algorithm. Parameters @@ -2506,33 +2650,23 @@ def _determine_trailblaze_path(phase_factory, alchemical_path): alchemical_path : Union[str, Dict[str, Union[str, float Quantity]]] Either 'auto' or the dictionary including mathematical expressions or end states of the path to discretize. + generate_alchemical_functions : bool, optional + If True, ``_generate_auto_alchemical_functions`` is used instead + of ``_generate_auto_state_parameters`` this slightly reduces the + number of intermediate states but it slows down the convergence + of the trailblaze algorithm. """ - state_parameters = [] - alchemical_functions = {} - # If alchemical_path is set to 'auto', discretize the default path. if alchemical_path == 'auto': - is_vacuum = (len(phase_factory.topography.receptor_atoms) == 0 and - len(phase_factory.topography.solvent_atoms) == 0) - state_parameters = [] - - # First, turn on the restraint if there are any. - if phase_factory.restraint is not None: - state_parameters.append(('lambda_restraints', [0.0, 1.0])) - # We support only lambda sterics and electrostatics for now. - if is_vacuum and not phase_factory.alchemical_regions.annihilate_electrostatics: - state_parameters.append(('lambda_electrostatics', [1.0, 1.0])) - else: - state_parameters.append(('lambda_electrostatics', [1.0, 0.0])) - if is_vacuum and not phase_factory.alchemical_regions.annihilate_sterics: - state_parameters.append(('lambda_sterics', [1.0, 1.0])) + if generate_alchemical_functions: + alchemical_functions, state_parameters = cls._generate_auto_alchemical_functions(phase_factory) else: - state_parameters.append(('lambda_sterics', [1.0, 0.0])) - # Turn the RMSD restraints off slowly at the end - if isinstance(phase_factory.restraint, restraints.RMSD): - state_parameters.append(('lambda_restraints', [1.0, 0.0])) + alchemical_functions = {} + state_parameters = cls._generate_auto_state_parameters(phase_factory) else: # Otherwise, differentiate between mathematical expressions and the function variable to build. + state_parameters = [] + alchemical_functions = {} for parameter_name, value in alchemical_path.items(): if isinstance(value, str): alchemical_functions[parameter_name] = mmtools.alchemy.AlchemicalFunction(value) @@ -2955,6 +3089,7 @@ def _build_experiment(self, experiment_path, experiment, use_dummy_protocol=Fals # Get system files. system_files_paths = self._db.get_system(system_id) gromacs_include_dir = self._db.systems[system_id].get('gromacs_include_dir', None) + charmm_parameter_files = self._db.systems[system_id].get('charmm_parameter_files', None) # Prepare Yank arguments phases = [None, None] @@ -2981,7 +3116,7 @@ def _build_experiment(self, experiment_path, experiment, use_dummy_protocol=Fals logger.info("Reading phase {}".format(phase_name)) system, topology, sampler_state = pipeline.read_system_files( positions_file_path, parameters_file_path, system_options, - gromacs_include_dir=gromacs_include_dir) + gromacs_include_dir=gromacs_include_dir, charmm_parameter_files=charmm_parameter_files) # Identify system components. There is a ligand only in the complex phase. if phase_idx == 0: @@ -3031,7 +3166,7 @@ def _build_experiment(self, experiment_path, experiment, use_dummy_protocol=Fals # If we have generated samples with trailblaze, we start the # simulation from those samples. Also, we can turn off minimization # as it has been already performed before trailblazing. - if exp_opts['start_from_trailblaze_samples'] is True: + if not use_dummy_protocol and exp_opts['start_from_trailblaze_samples'] is True: trailblaze_checkpoint_dir_path = self._get_trailblaze_checkpoint_dir_path( experiment_path, phase_name) try: diff --git a/Yank/pipeline.py b/Yank/pipeline.py index b03b2bd0..5649e850 100644 --- a/Yank/pipeline.py +++ b/Yank/pipeline.py @@ -16,14 +16,15 @@ # GLOBAL IMPORTS # ============================================================================================= -import os -import re -import sys +import collections import copy import inspect -import logging import itertools -import collections +import json +import logging +import os +import re +import sys import mdtraj import mpiplus @@ -66,10 +67,6 @@ def compute_squared_distances(molecule1_positions, molecule2_positions): distance between atom i of molecule1 and atom j of molecule 2. """ - # if the molecule is just one atom with 3 dimensions (example counterion) - #if len(molecule1_positions) == 3: - # squared_distances = np.array([((molecule2_positions - molecule1_positions)**2).sum(1)]) - #else: squared_distances = np.array([((molecule2_positions - atom_pos)**2).sum(1) for atom_pos in molecule1_positions]) return squared_distances @@ -103,6 +100,7 @@ def compute_min_dist(mol_positions, *args): # Compute squared distances. Each row is an array of distances # from a mol_positions atom to all argmol_positions atoms. distances2 = compute_squared_distances(mol_positions, argmol_positions) + # Find closest atoms and their distance min_idx = np.unravel_index(distances2.argmin(), distances2.shape) try: @@ -456,7 +454,7 @@ def create_system(parameters_file, box_vectors, create_system_args, system_optio def read_system_files(positions_file_path, parameters_file_path, system_options, - gromacs_include_dir=None): + gromacs_include_dir=None, charmm_parameter_files=None): """Create a Yank arguments for a phase from system files. Parameters @@ -464,7 +462,7 @@ def read_system_files(positions_file_path, parameters_file_path, system_options, positions_file_path : str Path to system position file (e.g. 'complex.inpcrd/.gro/.pdb'). parameters_file_path : str - Path to system parameters file (e.g. 'complex.prmtop/.top/.xml'). + Path to system parameters file (e.g. 'complex.prmtop/.top/.xml/.psf'). system_options : dict ``system_options[phase]`` is a a dictionary containing options to pass to ``createSystem()``. If the parameters file is an OpenMM @@ -472,6 +470,8 @@ def read_system_files(positions_file_path, parameters_file_path, system_options, gromacs_include_dir : str, optional Path to directory in which to look for other files included from the gromacs top file. + charmm_parameter_files : str, optional + Path to additional parameter files Returns ------- @@ -538,6 +538,24 @@ def read_system_files(positions_file_path, parameters_file_path, system_options, system = create_system(parameters_file, box_vectors, create_system_args, system_options) + # Read CHARMM format psf and pdb files + elif parameters_file_extension == '.psf': + logger.debug("psf: {}".format(parameters_file_path)) + logger.debug("pdb: {}".format(positions_file_path)) + + parameters_file = openmm.app.CharmmPsfFile(parameters_file_path) + positions_file = openmm.app.PDBFile(positions_file_path) + params = openmm.app.CharmmParameterSet(*charmm_parameter_files) + + box_vectors = positions_file.topology.getPeriodicBoxVectors() + if box_vectors is None: + box_vectors = system.getDefaultPeriodicBoxVectors() + + parameters_file.setBox(box_vectors[0][0], box_vectors[1][1], box_vectors[2][2]) + create_system_args = set(inspect.getargspec(openmm.app.CharmmPsfFile.createSystem).args) + system_options['params'] = params + system = create_system(parameters_file, box_vectors, create_system_args, system_options) + # Unsupported file format. else: raise ValueError('Unsupported format for parameter file {}'.format(parameters_file_extension)) @@ -1203,7 +1221,7 @@ def get_system_files_paths(self, system_id): parameters_path=os.path.join(system_dir, 'solvent2.prmtop')) ] else: - parameter_file_extensions = {'prmtop', 'top', 'xml'} + parameter_file_extensions = {'prmtop', 'top', 'xml', 'psf'} system_files_paths = [] for phase_path_name in ['phase1_path', 'phase2_path']: file_paths = self.systems[system_id][phase_path_name] @@ -2045,13 +2063,17 @@ def write_sampler_state(self, sampler_state): (a, b, c), (alpha, beta, gamma)) -def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path): +def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path, redistributed=True): """Read positions and box vectors stored as checkpoint by the trailblaze algorithm. Parameters ---------- checkpoint_dir_path : str The path to the directory containing the checkpoint information. + redistributed : bool, optional + If True, the function will check if the states were redistributed, + and will returned the set of coordinates that are more representative + of the redistributed protocol. Returns ------- @@ -2065,6 +2087,7 @@ def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path): If no file with the coordinates was found. """ positions_file_path = os.path.join(checkpoint_dir_path, 'coordinates.dcd') + states_map_file_path = os.path.join(checkpoint_dir_path, 'states_map.json') # Open the file if it exist. try: @@ -2077,15 +2100,236 @@ def read_trailblaze_checkpoint_coordinates(checkpoint_dir_path): sampler_states = trajectory_file.read_as_sampler_states() finally: trajectory_file.close() + + # If the protocol was redistributed, use the states map to create + # a new set of sampler states that can be used as starting conditions + # for the redistributed protocol. + if redistributed: + try: + with open(states_map_file_path, 'r') as f: + states_map = json.load(f) + sampler_states = [sampler_states[i] for i in states_map] + except FileNotFoundError: + pass + return sampler_states +def _resume_thermodynamic_trailblazing(checkpoint_dir_path, initial_protocol): + """Resume a previously-run trailblaze execution. + + Parameters + ---------- + checkpoint_dir_path : str + The path to the directory used to store the trailblaze information. + initial_protocol : Dict[str, List[float]] + The initial protocol containing only the first state of the path. + If no checkpoint file for the protocol is found, the file will + be initialized using this state. + + Returns + ------- + resumed_protocol : Dict[str, List[float]] + The resumed optimal protocol. + trajectory_file : _DCDTrajectoryFile + A DCD file open for appening. + sampler_state : SamplerState or None + The last saved SamplerState or None if no frame was saved yet. + + """ + # We save the protocol in a YAML file and the positions in netcdf. + protocol_file_path = os.path.join(checkpoint_dir_path, 'protocol.yaml') + stds_file_path = os.path.join(checkpoint_dir_path, 'states_stds.json') + positions_file_path = os.path.join(checkpoint_dir_path, 'coordinates.dcd') + + # Create the directory, if it doesn't exist. + os.makedirs(checkpoint_dir_path, exist_ok=True) + + # Load protocol and stds checkpoint file. + try: + # Parse the previously calculated optimal_protocol dict. + with open(protocol_file_path, 'r') as file_stream: + resumed_protocol = yaml.load(file_stream, Loader=yaml.FullLoader) + + # Load the energy difference stds. + with open(stds_file_path, 'r') as f: + states_stds = json.load(f) + except FileNotFoundError: + resumed_protocol = initial_protocol + states_stds = [[], []] + + # Check if there's an existing positions information. + try: + # We want the coordinates of the states that were sampled + # during the search not the states after redistribution. + sampler_states = read_trailblaze_checkpoint_coordinates( + checkpoint_dir_path, redistributed=False) + except FileNotFoundError: + len_trajectory = 0 + else: + len_trajectory = len(sampler_states) + + # Raise an error if the algorithm was interrupted *during* + # writing on disk. We store only the states that we simulated + # so there should be one less here. + for state_values in resumed_protocol.values(): + if len_trajectory < len(state_values) - 1: + err_msg = ("The trailblaze algorithm was interrupted while " + "writing the checkpoint file and it is now unable " + "to resume. Please delete the files " + f"in {checkpoint_dir_path} and restart.") + raise RuntimeError(err_msg) + + # When this is resumed, but the trajectory is already completed, + # the frame of the final end state has been already written, but + # we don't want to add it twice at the end of the trailblaze function. + len_trajectory = len(state_values) - 1 + + # Whether the file exist or not, MDTraj doesn't support appending + # files so we open a new one and rewrite the configurations we + # have generated in the previous run. + trajectory_file = _DCDTrajectoryFile(positions_file_path, 'w', + force_overwrite=True) + if len_trajectory > 0: + for i in range(len_trajectory): + trajectory_file.write_sampler_state(sampler_states[i]) + # Make sure the next search starts from the last saved position + # unless the previous calculation was interrupted before the + # first position could be saved. + sampler_state = sampler_states[-1] + else: + sampler_state = None + + return resumed_protocol, states_stds, trajectory_file, sampler_state + + +def _cache_trailblaze_data(checkpoint_dir_path, optimal_protocol, states_stds, + trajectory_file, sampler_state): + """Store on disk current state of the trailblaze run.""" + + # Determine the file paths of the stored data. + protocol_file_path = os.path.join(checkpoint_dir_path, 'protocol.yaml') + stds_file_path = os.path.join(checkpoint_dir_path, 'states_stds.json') + + # Update protocol. + with open(protocol_file_path, 'w') as file_stream: + yaml.dump(optimal_protocol, file_stream) + + # Update the stds between states. + with open(stds_file_path, 'w') as f: + json.dump(states_stds, f) + + # Append positions of the state that we just simulated. + trajectory_file.write_sampler_state(sampler_state) + + +def _redistribute_trailblaze_states(old_protocol, states_stds, thermodynamic_distance): + """Redistribute the states using a bidirectional estimate of the thermodynamic length. + + Parameters + ---------- + old_protocol : Dict[str, List[float]] + The unidirectional optimal protocol. + states_stds : List[List[float]] + states_stds[j][i] is the standard deviation of the potential + difference between states i-1 and i computed in the j direction. + thermodynamic_distance : float + The distance between each pair of states. + + Returns + ------- + new_protocol : Dict[str, List[float]] + The new estimate of the optimal protocol. + states_map : List[int] + states_map[i] is the index of the state in the old protocol that + is closest to the i-th state in the new protocol. This allows to + map coordinates generated during trailblazing to the redistributed + protocol. + """ + # The parameter names in a fixed order. + parameter_names = [par_name for par_name in old_protocol] + + # Initialize the new protocol from the first state the optimal protocol. + new_protocol = {par_name: [values[0]] for par_name, values in old_protocol.items()} + + # The first state of the new protocol always maps to the first state of the old one. + states_map = [0] + + def _get_old_protocol_state(state_idx): + """Return a representation of the thermo state as a list of parameter values.""" + return np.array([old_protocol[par_name][state_idx] for par_name in parameter_names]) + + def _add_state_to_new_protocol(state): + for parameter_name, new_state_value in zip(parameter_names, state.tolist()): + new_protocol[parameter_name].append(new_state_value) + + # The thermodynamic length at 0 is 0.0. + states_stds[0] = [0.0] + states_stds[0] + states_stds[1] = [0.0] + states_stds[1] + + # We don't have the energy difference std in the + # direction opposite to the search direction so we + # pad the list. + states_stds[1].append(states_stds[0][-1]) + states_stds = np.array(states_stds) + + # Compute a bidirectional estimate of the thermodynamic length. + old_protocol_thermo_length = np.cumsum(np.mean(states_stds, axis=0)) + + # Trailblaze again interpolating the thermodynamic length function. + current_state_idx = 0 + current_state = _get_old_protocol_state(0) + last_state = _get_old_protocol_state(-1) + new_protocol_cum_thermo_length = 0.0 + while (current_state != last_state).any(): + # Find first state for which the accumulated standard + # deviation is greater than the thermo length threshold. + try: + while old_protocol_thermo_length[current_state_idx+1] - new_protocol_cum_thermo_length <= thermodynamic_distance: + current_state_idx += 1 + except IndexError: + # If we got to the end, we just add the last state + # to the protocol and stop the while loop. + _add_state_to_new_protocol(last_state) + break + + # Update current state. + current_state = _get_old_protocol_state(current_state_idx) + + # The thermodynamic length from the last redistributed state to current state. + pair_thermo_length = old_protocol_thermo_length[current_state_idx] - new_protocol_cum_thermo_length + + # Now interpolate between the current state state and the next to find + # the exact state for which the thermo length equal the threshold. + next_state = _get_old_protocol_state(current_state_idx+1) + differential = thermodynamic_distance - pair_thermo_length + differential /= old_protocol_thermo_length[current_state_idx+1] - old_protocol_thermo_length[current_state_idx] + new_state = current_state + differential * (next_state - current_state) + + # Update cumulative thermo length. + new_protocol_cum_thermo_length += thermodynamic_distance + + # Update states map. + closest_state_idx = current_state_idx if differential <= 0.5 else current_state_idx+1 + states_map.append(closest_state_idx) + + # Update redistributed protocol. + _add_state_to_new_protocol(new_state) + + # The last state of the new protocol always maps to the last state of the old one. + states_map.append(len(old_protocol_thermo_length)-1) + + return new_protocol, states_map + + def run_thermodynamic_trailblazing( thermodynamic_state, sampler_state, mcmc_move, state_parameters, - parameter_setters=None, std_potential_threshold=0.5, - threshold_tolerance=0.05, n_samples_per_state=100, - reversed_direction=False, global_parameter_functions=None, - function_variables=tuple(), checkpoint_dir_path=None + parameter_setters=None, thermodynamic_distance=1.0, + distance_tolerance=0.05, n_samples_per_state=100, + reversed_direction=False, bidirectional_redistribution=True, + bidirectional_search_thermo_dist='auto', + global_parameter_functions=None, function_variables=tuple(), + checkpoint_dir_path=None ): """ Find an alchemical path by placing alchemical states at a fixed distance. @@ -2095,15 +2339,17 @@ def run_thermodynamic_trailblazing( and computing the standard deviation of the difference of potential energies between the two states at those configurations. - Two states are chosen for the protocol if their standard deviation is - within ``std_potential_threshold +- threshold_tolerance``. + The states of the protocol are chosen so that each pair has a distance + (in thermodynamic length) of ``thermodynamic_distance +- distance_tolerance``. + The thermodynamic length estimate (in kT) is based on the standard deviation + of the difference in potential energy between the two states. The function is capable of resuming when interrupted if ``checkpoint_dir_path`` is specified. This will create two files called 'protocol.yaml' and 'coordinates.dcd' storing the protocol and initial positions and box vectors for each state that are generated while running the algorithm. - It is also possibleto discretize a path specified through maathematical + It is also possible to discretize a path specified through maathematical expressions through the arguments ``global_parameter_function`` and ``function_variables``. @@ -2127,19 +2373,33 @@ def run_thermodynamic_trailblazing( ``setter(thermodynamic_state, parameter_name, value)``. This is useful for example to set global parameter function variables with ``openmmtools.states.GlobalParameterState.set_function_variable``. - std_potential_threshold : float, optional - The threshold that determines how to separate the states between - each others. - threshold_tolerance : float, optional - The tolerance on the found standard deviation. + thermodynamic_distance : float, optional + The target distance (in thermodynamic length) between each pair of + states in kT. Default is 1.0 (kT). + distance_tolerance : float, optional + The tolerance on the found standard deviation. Default is 0.05 (kT). n_samples_per_state : int, optional How many samples to collect to estimate the overlap between two - states. + states. Default is 100. reversed_direction : bool, optional - If True, the algorithm starts from the final state and traverses + If ``True``, the algorithm starts from the final state and traverses the path from the end to the beginning. The returned path discretization will still be ordered from the beginning to the - end following the order in ``state_parameters``. + end following the order in ``state_parameters``. Default is ``False``. + bidirectional_redistribution : bool, optional + If ``True``, the states will be redistributed using the standard + deviation of the potential difference between states in both + directions. Default is ``True``. + bidirectional_search_thermo_dist : float or 'auto', optional + If ``bidirectional_redistribution`` is ``True``, the thermodynamic + distance between the sampled states used to collect data along + the path can be different than the thermodynamic distance after + redistribution. The default ('auto') caps the thermodynamic + distance used for trailblazing at 1 kT. Keeping this value small + lower the chance of obtaining very large stds in the opposite direction + due to rare, dominating events in sections of the path where the overlap + decreases quickly, which in turn may results in unreasonably long + protocols. global_parameter_functions : Dict[str, Union[str, openmmtools.states.GlobalParameterFunction]], optional Map a parameter name to a mathematical expression as a string or a ``openmmtools.states.GlobalParameterFunction`` object. @@ -2172,6 +2432,15 @@ def _function_variable_setter(state, parameter_name, value): # Make sure that the state parameters to optimize have a clear order. assert (isinstance(state_parameters, list) or isinstance(state_parameters, tuple)) + # Determine the thermo distance to achieve during the search. + if not bidirectional_redistribution: + search_thermo_dist = thermodynamic_distance + else: + if bidirectional_search_thermo_dist == 'auto': + search_thermo_dist = min(1.0, thermodynamic_distance) + else: + search_thermo_dist = bidirectional_search_thermo_dist + # Create unordered helper variable. state_parameter_dict = {x[0]: x[1] for x in state_parameters} @@ -2220,71 +2489,42 @@ def _function_variable_setter(state, parameter_name, value): # Reverse the direction of the algorithm if requested. if reversed_direction: state_parameters = [(par_name, end_states.__class__(reversed(end_states))) - for par_name, end_states in state_parameters] + for par_name, end_states in reversed(state_parameters)] # Initialize protocol with the starting value. optimal_protocol = {par: [values[0]] for par, values in state_parameters} + # Keep track of potential std between states in both directions + # of the path so that we can redistribute the states later. + # At the end of the protocol this will have the same length + # of the protocol minus one. The inner lists are for the forward + # and reversed direction stds respectively. + states_stds = [[], []] + # Check to see whether a trailblazing algorithm is already in progress, # and if so, restore to the previously checkpointed state. if checkpoint_dir_path is not None: - # We save the protocol in a YAML file and the positions in netcdf. - protocol_file_path = os.path.join(checkpoint_dir_path, 'protocol.yaml') - positions_file_path = os.path.join(checkpoint_dir_path, 'coordinates.dcd') - - # Create the directory, if it doesn't exist. - os.makedirs(checkpoint_dir_path, exist_ok=True) - - # Create/load protocol checkpoint file. - try: - # Parse the previously calculated optimal_protocol dict. - with open(protocol_file_path, 'r') as file_stream: - optimal_protocol = yaml.load(file_stream, Loader=yaml.FullLoader) - except FileNotFoundError: - # The checkpoint doesn't exist. Create one. - with open(protocol_file_path, 'w') as file_stream: - yaml.dump(optimal_protocol, file_stream) - - # Check if there's an existing positions information. - try: - sampler_states = read_trailblaze_checkpoint_coordinates(checkpoint_dir_path) - except FileNotFoundError: - len_trajectory = 0 - else: - len_trajectory = len(sampler_states) - - # Raise an error if the algorithm was interrupted *during* - # writing on disk. We store only the states that we simulated - # so there should be one less here. - if len_trajectory < len(optimal_protocol[state_parameters[0][0]])-1: - err_msg = ("The trailblaze algorithm was interrupted while " - "writing the checkpoint file and it is now unable " - "to resume. Please delete the files " - f"in {checkpoint_dir_path} and restart.") - raise RuntimeError(err_msg) - - # When this is resumed, but the trajectory is already completed, - # the frame of the final end state has been already written, but - # we don't want to add it twice at the end of this function. - len_trajectory = len(optimal_protocol[state_parameters[0][0]])-1 - - # Whether the file exist or not, MDTraj doesn't support appending - # files so we open a new one and rewrite the configurations we - # have generated in the previous run. - trajectory_file = _DCDTrajectoryFile(positions_file_path, 'w', - force_overwrite=True) - if len_trajectory > 0: - for i in range(len_trajectory): - trajectory_file.write_sampler_state(sampler_states[i]) - # Make sure the next search starts from the last saved position - # unless the previous calculation was interrupted before the - # first position could be saved. - sampler_state = sampler_states[-1] + optimal_protocol, states_stds, trajectory_file, resumed_sampler_state = _resume_thermodynamic_trailblazing( + checkpoint_dir_path, optimal_protocol) + # Start from the last saved conformation. + if resumed_sampler_state is not None: + sampler_state = resumed_sampler_state + + # We keep track of the previous state in the optimal protocol + # that we'll use to compute the stds in the opposite direction. + if len(states_stds[0]) == 0: + previous_thermo_state = None + else: + previous_thermo_state = copy.deepcopy(thermodynamic_state) # Make sure that thermodynamic_state is in the last explored # state, whether the algorithm was resumed or not. for state_parameter in optimal_protocol: - parameter_setters[state_parameter](thermodynamic_state, state_parameter, optimal_protocol[state_parameter][-1]) + parameter_setters[state_parameter](thermodynamic_state, state_parameter, + optimal_protocol[state_parameter][-1]) + if previous_thermo_state is not None: + parameter_setters[state_parameter](previous_thermo_state, state_parameter, + optimal_protocol[state_parameter][-2]) # We change only one parameter at a time. for state_parameter, values in state_parameters: @@ -2298,23 +2538,24 @@ def _function_variable_setter(state, parameter_name, value): # Gather data until we get to the last value. while optimal_protocol[state_parameter][-1] != values[-1]: - # Simulate current thermodynamic state to obtain energies. + # Simulate current thermodynamic state to collect samples. sampler_states = [] simulated_energies = np.zeros(n_samples_per_state) for i in range(n_samples_per_state): mcmc_move.apply(thermodynamic_state, sampler_state) - context, _ = mmtools.cache.global_context_cache.get_context(thermodynamic_state) - sampler_state.apply_to_context(context, ignore_velocities=True) - simulated_energies[i] = thermodynamic_state.reduced_potential(context) sampler_states.append(copy.deepcopy(sampler_state)) + # Keep track of the thermo state we use for the reweighting. + reweighted_thermo_state = None + # Find first state that doesn't overlap with simulated one - # with std(du) within std_potential_threshold +- threshold_tolerance. + # with std(du) within search_thermo_dist +- distance_tolerance. # We stop anyway if we reach the last value of the protocol. std_energy = 0.0 current_parameter_value = optimal_protocol[state_parameter][-1] - while (abs(std_energy - std_potential_threshold) > threshold_tolerance and - not (current_parameter_value == values[1] and std_energy < std_potential_threshold)): + while (abs(std_energy - search_thermo_dist) > distance_tolerance and + not (current_parameter_value == values[1] and std_energy < search_thermo_dist)): + # Determine next parameter value to compute. if np.isclose(std_energy, 0.0): # This is the first iteration or the two state overlap significantly @@ -2326,31 +2567,59 @@ def _function_variable_setter(state, parameter_name, value): derivative_std_energy = ((std_energy - old_std_energy) / (current_parameter_value - old_parameter_value)) old_parameter_value = current_parameter_value - current_parameter_value += (std_potential_threshold - std_energy) / derivative_std_energy + current_parameter_value += (search_thermo_dist - std_energy) / derivative_std_energy # Keep current_parameter_value inside bound interval. if search_direction * current_parameter_value > values[1]: current_parameter_value = values[1] assert search_direction * (optimal_protocol[state_parameter][-1] - current_parameter_value) < 0 - # Get context in new thermodynamic state. - parameter_setters[state_parameter](thermodynamic_state, state_parameter, current_parameter_value) - context, integrator = mmtools.cache.global_context_cache.get_context(thermodynamic_state) + # Determine the thermo states at which we need to compute the energies. + # If this is the first attempt, compute also the reduced potential of + # the simulated energies and the previous state to estimate the standard + # deviation in the opposite direction. + if reweighted_thermo_state is None: + # First attempt. + reweighted_thermo_state = copy.deepcopy(thermodynamic_state) + computed_thermo_states = [reweighted_thermo_state, thermodynamic_state] + if previous_thermo_state is not None: + computed_thermo_states.append(previous_thermo_state) + else: + computed_thermo_states = [reweighted_thermo_state] - # Compute the energies at the sampled positions. - reweighted_energies = np.zeros(n_samples_per_state) + # Set the reweighted state to the current parameter value. + parameter_setters[state_parameter](reweighted_thermo_state, state_parameter, current_parameter_value) + + # Compute all energies. + energies = np.empty(shape=(len(computed_thermo_states), n_samples_per_state)) for i, sampler_state in enumerate(sampler_states): - sampler_state.apply_to_context(context, ignore_velocities=True) - reweighted_energies[i] = thermodynamic_state.reduced_potential(context) + energies[:,i] = mmtools.states.reduced_potential_at_states( + sampler_state, computed_thermo_states, mmtools.cache.global_context_cache) + + # Cache the simulated energies for the next iteration. + if len(computed_thermo_states) > 1: + simulated_energies = energies[1] + + # Compute the energy difference std in the direction: simulated state -> previous state. + if len(computed_thermo_states) > 2: + denergies = energies[2] - simulated_energies + states_stds[1].append(float(np.std(denergies, ddof=1))) - # Compute standard deviation of the difference. + # Compute the energy difference std between the currently simulated and the reweighted states. old_std_energy = std_energy - denergies = reweighted_energies - simulated_energies - std_energy = np.std(denergies) + denergies = energies[0] - simulated_energies + std_energy = np.std(denergies, ddof=1) logger.debug('trailblazing: state_parameter {}, simulated_value {}, current_parameter_value {}, ' 'std_du {}'.format(state_parameter, optimal_protocol[state_parameter][-1], current_parameter_value, std_energy)) + # Store energy difference std in the direction: simulated state -> reweighted state. + states_stds[0].append(float(std_energy)) + + # Update variables for next iteration. + previous_thermo_state = copy.deepcopy(thermodynamic_state) + thermodynamic_state = reweighted_thermo_state + # Update the optimal protocol with the new value of this parameter. # The other parameters remain fixed. for par_name in optimal_protocol: @@ -2364,17 +2633,32 @@ def _function_variable_setter(state, parameter_name, value): # Save the updated checkpoint file to disk. if checkpoint_dir_path is not None: - # Update protocol. - with open(protocol_file_path, 'w') as file_stream: - yaml.dump(optimal_protocol, file_stream) - # Update positions of the state that we just simulated. - trajectory_file.write_sampler_state(sampler_state) + _cache_trailblaze_data(checkpoint_dir_path, optimal_protocol, states_stds, + trajectory_file, sampler_state) if checkpoint_dir_path is not None: # We haven't simulated the last state so we just set the positions of the second to last. trajectory_file.write_sampler_state(sampler_state) trajectory_file.close() + # Redistribute the states using the standard deviation estimates in both directions. + if bidirectional_redistribution: + optimal_protocol, states_map = _redistribute_trailblaze_states( + optimal_protocol, states_stds, thermodynamic_distance) + + # Save the states map for reading the coordinates correctly. + if checkpoint_dir_path is not None: + states_map_file_path = os.path.join(checkpoint_dir_path, 'states_map.json') + + if bidirectional_redistribution: + with open(states_map_file_path, 'w') as f: + json.dump(states_map, f) + elif os.path.isfile(states_map_file_path): + # If there's an old file because the path was previously redistributed, + # we delete it so that read_trailblaze_checkpoint_coordinates will + # return the coordinates associated to the most recently-generated path. + os.remove(states_map_file_path) + # If we have traversed the path in the reversed direction, re-invert # the order of the discretized path. if reversed_direction: @@ -2382,7 +2666,7 @@ def _function_variable_setter(state, parameter_name, value): optimal_protocol[par_name] = values.__class__(reversed(values)) # If we used global parameter functions, the optimal_protocol at this - # point is a discratization of the function_variables, not the original + # point is a discretization of the function_variables, not the original # parameters so we convert it back. len_protocol = len(next(iter(optimal_protocol.values()))) function_variables_protocol = {var: optimal_protocol.pop(var) for var in function_variables} diff --git a/Yank/schema/validator.py b/Yank/schema/validator.py index 83332bbd..7ad030ca 100644 --- a/Yank/schema/validator.py +++ b/Yank/schema/validator.py @@ -138,7 +138,8 @@ def _check_with_supported_system_file_format(self, field, file_paths): ('amber', {'inpcrd', 'prmtop'}), ('amber', {'rst7', 'prmtop'}), ('gromacs', {'gro', 'top'}), - ('openmm', {'pdb', 'xml'}) + ('openmm', {'pdb', 'xml'}), + ('charmm', {'pdb', 'psf'}) ] file_extension_type = None for extension_type, valid_extensions in expected_extensions: diff --git a/Yank/tests/test_experiment.py b/Yank/tests/test_experiment.py index f1c7beeb..840737ae 100644 --- a/Yank/tests/test_experiment.py +++ b/Yank/tests/test_experiment.py @@ -14,10 +14,12 @@ # ============================================================================================= import itertools +from pprint import pformat import shutil import tempfile import textwrap import time +import typing import unittest import mdtraj @@ -993,9 +995,9 @@ def test_validation_correct_protocols(): trailblazer_options = [ {'n_equilibration_iterations': 1000, 'n_samples_per_state': 100, - 'std_potential_threshold': 0.5, 'threshold_tolerance': 0.05}, + 'thermodynamic_distance': 0.5, 'distance_tolerance': 0.05}, {'n_equilibration_iterations': 100, 'n_samples_per_state': 10}, - {'std_potential_threshold': 1.0, 'threshold_tolerance': 0.5}, + {'thermodynamic_distance': 1.0, 'distance_tolerance': 0.5}, {'function_variable_name': 'lambda'}, {'function_variable_name': 'lambda', 'reversed_direction': False} ] @@ -2963,7 +2965,7 @@ def test_run_solvation_experiment(): class TestTrailblazeAlchemicalPath: """Test suite for the automatic discretization of the alchemical path.""" - def _get_harmonic_oscillator_script(self, tmp_dir, alchemical_path='auto', trailblazer_options=None): + def _get_hydration_free_energy_script(self, tmp_dir, alchemical_path='auto', trailblazer_options=None): # Setup only 1 hydration free energy system in implicit solvent and vacuum. yaml_script = get_template_script(tmp_dir, systems=['hydration-system']) yaml_script['systems']['hydration-system']['solvent1'] = 'GBSA-OBC2' @@ -2984,11 +2986,104 @@ def _get_harmonic_oscillator_script(self, tmp_dir, alchemical_path='auto', trail return yaml_script + def test_generate_lambda_alchemical_function(self): + """The alchemical functions generated by ExperimentBuilder._generate_lambda_alchemical_function are correct.""" + from openmmtools.utils import math_eval + + def evaluate(expr, l): + variables = {'lambda': l} + return math_eval(expr, variables) + + # Each test case are [(lambda0, lambda1), (f(lambda0), f([lambda0+lambda1]/2), f(lambda1))] + # where the second tuple in the list are the expected values of the function. + test_cases = [(0, 1), (1, 0), (2, 3), (3, 2), (4, 8), (10, 5)] + + for lambda0, lambda1 in test_cases: + expr = ExperimentBuilder._generate_lambda_alchemical_function(lambda0, lambda1) + print(lambda0, lambda1, ':', expr) + assert evaluate(expr, lambda0) == 0.0 + assert evaluate(expr, (lambda0 + lambda1)/2) == 0.5 + assert evaluate(expr, lambda1) == 1.0 + + # The funciton must be constant after the end states. + if lambda0 < lambda1: + assert evaluate(expr, lambda0-1) == 0.0 + assert evaluate(expr, lambda1+1) == 1.0 + else: + assert evaluate(expr, lambda0+1) == 0.0 + assert evaluate(expr, lambda1-1) == 1.0 + + def test_determine_trailblaze_path(self): + """Test the various paths generated by ExperimentBuilder._determine_trailblaze_path with alchemical functions""" + + # Mock objects to test the static function. + class MockTopography(typing.NamedTuple): + receptor_atoms: typing.List + solvent_atoms: typing.List + + class MockAlchemicalRegions(typing.NamedTuple): + annihilate_electrostatics: typing.List + annihilate_sterics: typing.List + + class MockPhaseFactory: + def __init__(self, restraint, is_vacuum, annihilate_electrostatics, annihilate_sterics): + self.restraint = restraint + if is_vacuum: + self.topography = MockTopography([], []) + else: + self.topography = MockTopography([0], [1]) + self.alchemical_regions = MockAlchemicalRegions(annihilate_electrostatics, annihilate_sterics) + + # Each test case is (mock_phase_factory_init_kwargs, expected_alchemical_functions, expected_state_parameters). + p = ExperimentBuilder._generate_lambda_alchemical_function # Shortcut. + test_cases = [ + ( + dict(restraint=True, is_vacuum=False, annihilate_electrostatics=True, annihilate_sterics=False), + dict(lambda_restraints=p(3, 2), lambda_electrostatics=p(1, 2), lambda_sterics=p(0, 1)), + [('lambda', [3.0, 0.0])] + ), + ( + dict(restraint=None, is_vacuum=False, annihilate_electrostatics=True, annihilate_sterics=False), + dict(lambda_electrostatics=p(1, 2), lambda_sterics=p(0, 1)), + [('lambda', [2.0, 0.0])] + ), + ( + dict(restraint=None, is_vacuum=True, annihilate_electrostatics=True, annihilate_sterics=False), + dict(lambda_electrostatics=p(0, 1), lambda_sterics='1.0'), + [('lambda', [1.0, 0.0])] + ), + ( + dict(restraint=True, is_vacuum=True, annihilate_electrostatics=False, annihilate_sterics=False), + dict(lambda_restraints=p(1, 0), lambda_electrostatics='1.0', lambda_sterics='1.0'), + [('lambda', [1.0, 0.0])] + ), + ( + dict(restraint=restraints.RMSD(), is_vacuum=False, annihilate_electrostatics=True, annihilate_sterics=False), + dict(lambda_restraints=p(4, 3) + ' - (' + p(1, 0) + ')', lambda_electrostatics=p(2, 3), lambda_sterics=p(1, 2)), + [('lambda', [4.0, 0.0])] + ), + ] + + for idx, (phase_factory_kwargs, expected_alchemical_functions, expected_state_parameters) in enumerate(test_cases): + phase_factory = MockPhaseFactory(**phase_factory_kwargs) + alchemical_functions, states_parameters = ExperimentBuilder._determine_trailblaze_path( + phase_factory, alchemical_path='auto', generate_alchemical_functions=True) + + # Convert alchemical functions objects to string expressions for comparison. + for parameter_name, alchemical_function in alchemical_functions.items(): + alchemical_functions[parameter_name] = alchemical_function._expression + + err_msg = 'test case ' + str(idx) + ':\n\nExpected:\n{}\n\nObtained:\n{}' + assert alchemical_functions == expected_alchemical_functions, err_msg.format(pformat(expected_alchemical_functions), + pformat(alchemical_functions)) + assert states_parameters == expected_state_parameters, err_msg.format(pformat(expected_state_parameters), + pformat(states_parameters)) + def test_auto_alchemical_path(self): """Test automatic alchemical path found by thermodynamic trailblazing when the option 'auto' is set.""" with mmtools.utils.temporary_directory() as tmp_dir: # Setup only 1 hydration free energy system in implicit solvent and vacuum. - yaml_script = self._get_harmonic_oscillator_script( + yaml_script = self._get_hydration_free_energy_script( tmp_dir, alchemical_path='auto', trailblazer_options={'n_equilibration_iterations': 0} ) @@ -3036,7 +3131,7 @@ def test_start_from_trailblaze_samples_path(self): """Test the correct implementation of the option start_from_trailblaze_samples.""" with mmtools.utils.temporary_directory() as tmp_dir: # Setup only 1 hydration free energy system in implicit solvent and vacuum. - yaml_script = self._get_harmonic_oscillator_script( + yaml_script = self._get_hydration_free_energy_script( tmp_dir, alchemical_path='auto', trailblazer_options={'n_equilibration_iterations': 0} ) @@ -3062,7 +3157,7 @@ def test_alchemical_functions_path(self): """Test automatic alchemical path found from alchemical functions.""" with mmtools.utils.temporary_directory() as tmp_dir: # Setup only 1 hydration free energy system in implicit solvent and vacuum. - yaml_script = self._get_harmonic_oscillator_script( + yaml_script = self._get_hydration_free_energy_script( tmp_dir, alchemical_path={'lambda_electrostatics': 'lambda', 'lambda_sterics': 'lambda', diff --git a/Yank/tests/test_pipeline.py b/Yank/tests/test_pipeline.py index 8b3c40de..838e8547 100644 --- a/Yank/tests/test_pipeline.py +++ b/Yank/tests/test_pipeline.py @@ -91,7 +91,8 @@ def test_pack_transformation(): class TestThermodynamicTrailblazing: """Test suite for the thermodynamic trailblazing function.""" - PAR_NAME = 'testsystems_HarmonicOscillator_x0' + PAR_NAME_X0 = 'testsystems_HarmonicOscillator_x0' + PAR_NAME_K = 'testsystems_HarmonicOscillator_K' @classmethod def get_harmonic_oscillator(cls): @@ -101,13 +102,20 @@ def get_harmonic_oscillator(cls): # Create composable state that control offset of harmonic oscillator. class X0State(GlobalParameterState): - testsystems_HarmonicOscillator_x0 = GlobalParameterState.GlobalParameter(cls.PAR_NAME, 1.0) + testsystems_HarmonicOscillator_x0 = GlobalParameterState.GlobalParameter( + cls.PAR_NAME_X0, 1.0) + testsystems_HarmonicOscillator_K = GlobalParameterState.GlobalParameter( + cls.PAR_NAME_K, (1.0*unit.kilocalories_per_mole/unit.nanometer**2).value_in_unit_system(unit.md_unit_system)) # Create a harmonic oscillator thermo state. - oscillator = mmtools.testsystems.HarmonicOscillator(K=1.0*unit.kilocalories_per_mole/unit.nanometer**2) + k = 1.0*unit.kilocalories_per_mole/unit.nanometer**2 + oscillator = mmtools.testsystems.HarmonicOscillator(K=k) sampler_state = SamplerState(positions=oscillator.positions) thermo_state = ThermodynamicState(oscillator.system, temperature=300*unit.kelvin) - x0_state = X0State(testsystems_HarmonicOscillator_x0=0.0) + x0_state = X0State( + testsystems_HarmonicOscillator_x0=0.0, + testsystems_HarmonicOscillator_K=k.value_in_unit_system(unit.md_unit_system) + ) compound_state = CompoundThermodynamicState(thermo_state, composable_states=[x0_state]) return compound_state, sampler_state @@ -136,7 +144,7 @@ def _check_checkpoint_files(checkpoint_dir_path, expected_protocol, n_atoms): assert checkpoint_protocol == expected_protocol # The positions and box vectors have the correct dimension. - expected_n_states = len(expected_protocol[self.PAR_NAME]) + expected_n_states = len(expected_protocol[self.PAR_NAME_X0]) trajectory_file = mdtraj.formats.DCDTrajectoryFile(checkpoint_positions_path, 'r') xyz, cell_lengths, cell_angles = trajectory_file.read() assert (xyz.shape[0], xyz.shape[1]) == (expected_n_states, n_atoms) @@ -154,7 +162,7 @@ def _check_checkpoint_files(checkpoint_dir_path, expected_protocol, n_atoms): first_protocol = run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, checkpoint_dir_path=checkpoint_dir_path, - state_parameters=[(self.PAR_NAME, [0.0, 1.0])] + state_parameters=[(self.PAR_NAME_X0, [0.0, 1.0])] ) # The info in the checkpoint files is correct. @@ -165,10 +173,11 @@ def _check_checkpoint_files(checkpoint_dir_path, expected_protocol, n_atoms): second_protocol = run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, checkpoint_dir_path=checkpoint_dir_path, - state_parameters=[(self.PAR_NAME, [0.0, 2.0])] + state_parameters=[(self.PAR_NAME_X0, [0.0, 2.0])], + bidirectional_redistribution=False ) - len_first_protocol = len(first_protocol[self.PAR_NAME]) - assert second_protocol[self.PAR_NAME][:len_first_protocol] == first_protocol[self.PAR_NAME] + len_first_protocol = len(first_protocol[self.PAR_NAME_X0]) + assert second_protocol[self.PAR_NAME_X0][:len_first_protocol] == first_protocol[self.PAR_NAME_X0] # The info in the checkpoint files is correct. _check_checkpoint_files(checkpoint_dir_path, second_protocol, n_atoms) @@ -181,15 +190,15 @@ def test_trailblaze_setters(self): # Assign to the parameter a function and run the # trailblaze algorithm over the function variable. - global_parameter_functions = {self.PAR_NAME: 'lambda**2'} + global_parameter_functions = {self.PAR_NAME_X0: 'lambda**2'} function_variables = ['lambda'] # Make sure it's not possible to have a parameter defined as a function and as a parameter state as well. - err_msg = f"Cannot specify {self.PAR_NAME} in 'state_parameters' and 'global_parameter_functions'" + err_msg = f"Cannot specify {self.PAR_NAME_X0} in 'state_parameters' and 'global_parameter_functions'" with assert_raises_regexp(ValueError, err_msg): run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, - state_parameters=[(self.PAR_NAME, [0.0, 1.0])], + state_parameters=[(self.PAR_NAME_X0, [0.0, 1.0])], global_parameter_functions=global_parameter_functions, function_variables=function_variables, ) @@ -201,8 +210,8 @@ def test_trailblaze_setters(self): global_parameter_functions=global_parameter_functions, function_variables=function_variables, ) - assert list(protocol.keys()) == [self.PAR_NAME] - parameter_protocol = protocol[self.PAR_NAME] + assert list(protocol.keys()) == [self.PAR_NAME_X0] + parameter_protocol = protocol[self.PAR_NAME_X0] assert parameter_protocol[0] == 0 assert parameter_protocol[-1] == 1 @@ -211,9 +220,113 @@ def test_reversed_direction(self): # Create a harmonic oscillator system to test. compound_state, sampler_state = self.get_harmonic_oscillator() mcmc_move = self.get_langevin_dynamics_move() + + # For this, we run through two variables to make sure they are executed in the correct order. + k_start = getattr(compound_state, self.PAR_NAME_K) + k_end = k_start * 2 protocol = run_thermodynamic_trailblazing( compound_state, sampler_state, mcmc_move, - state_parameters=[(self.PAR_NAME, [1.0, 0.0])], - reversed_direction=True + state_parameters=[ + (self.PAR_NAME_X0, [1.0, 0.0]), + (self.PAR_NAME_K, [k_start, k_end]), + ], + reversed_direction=True, + bidirectional_redistribution=False ) - assert protocol[self.PAR_NAME] == [1.0, 0.0] + assert protocol[self.PAR_NAME_X0] == [1.0, 0.0, 0.0], protocol[self.PAR_NAME_X0] + assert protocol[self.PAR_NAME_K] == [k_start, k_start, k_end], protocol[self.PAR_NAME_K] + + def test_redistribute_trailblaze_states(self): + """States are redistributed correctly as a function of the bidirectional thermo length.""" + # This simulates the protocol found after trailblazing. + optimal_protocol = { + 'lambda_electrostatics': [1.0, 0.5, 0.0, 0.0, 0.0], + 'lambda_sterics': [1.0, 1.0, 1.0, 0.5, 0.0] + } + + # Each test case is a tuple (states_stds, expected_redistributed_protocol, expected_states_map). + test_cases = [ + ( + [[0.5, 0.5, 0.5, 0.5], + [1.5, 1.5, 1.5]], + {'lambda_electrostatics': [1.0, 0.75, 0.5, 0.25, 0.0, 0.0, 0.0, 0.0], + 'lambda_sterics': [1.0, 1.0, 1.0, 1.0, 1.0, 0.75, 0.5, 0.0]}, + [0, 0, 1, 1, 2, 2, 3, 4] + ), + ( + [[0.5, 0.5, 0.5, 0.5], + [0.0, 0.0, 0.0]], + {'lambda_electrostatics': [1.0, 0.0, 0.0, 0.0], + 'lambda_sterics': [1.0, 1.0, 0.25, 0.0]}, + [0, 2, 3, 4] + ), + ] + + for states_stds, expected_redistributed_protocol, expected_states_map in test_cases: + redistributed_protocol, states_map = _redistribute_trailblaze_states( + optimal_protocol, states_stds, thermodynamic_distance=0.5) + assert expected_redistributed_protocol == redistributed_protocol, \ + f'{expected_redistributed_protocol} != {redistributed_protocol}' + assert expected_states_map == states_map, f'{expected_states_map} != {states_map}' + + def test_read_trailblaze_checkpoint_coordinates(self): + """read_trailblaze_checkpoint_coordinates() returns the correct number of SamplerStates.""" + # Create a harmonic oscillator system to test. + compound_state, sampler_state = self.get_harmonic_oscillator() + mcmc_move = self.get_langevin_dynamics_move() + + # The end states differ only in the spring constant + k_initial = getattr(compound_state, self.PAR_NAME_K) + k_final = 250 * k_initial + + # Helper function. + def _call_run_thermodynamic_trailblazing( + thermodynamic_distance, bidirectional_redistribution, + bidirectional_search_thermo_dist, checkpoint_dir_path + ): + return run_thermodynamic_trailblazing( + compound_state, sampler_state, mcmc_move, + state_parameters=[ + (self.PAR_NAME_X0, [0.0, 1.0]), + (self.PAR_NAME_K, [k_initial, k_final]) + ], + thermodynamic_distance=thermodynamic_distance, + bidirectional_redistribution=bidirectional_redistribution, + bidirectional_search_thermo_dist=bidirectional_search_thermo_dist, + checkpoint_dir_path=checkpoint_dir_path + ) + + # Each test case is (bidirectional_redistribution, thermodynamic_distance, bidirectional_search_thermo_dist). + test_cases = [ + (False, 1.0, 'auto'), + (True, 2.0, 1.0), + (True, 0.5, 1.0), + ] + + for bidirectional_redistribution, thermodynamic_distance, bidirectional_search_thermo_dist in test_cases: + with mmtools.utils.temporary_directory() as checkpoint_dir_path: + # Compute the protocol. + protocol = _call_run_thermodynamic_trailblazing( + thermodynamic_distance, bidirectional_redistribution, + bidirectional_search_thermo_dist, checkpoint_dir_path + ) + + # The number of frames returned should be equal to the number of + # states in the protocol. If this was redistributed, this might + # be different than the number of frames generated. + sampler_states = read_trailblaze_checkpoint_coordinates(checkpoint_dir_path) + len_protocol = len(protocol['testsystems_HarmonicOscillator_x0']) + err_msg = ( + f'bidirectional_redistribution={bidirectional_redistribution}, ' + f'thermodynamic_distance={thermodynamic_distance}, ' + f'bidirectional_search_thermo_dist={bidirectional_search_thermo_dist}: ' + f'{len(sampler_states)} != {len_protocol}: {protocol}' + ) + assert len(sampler_states) == len_protocol, err_msg + + # Now, resuming should give me the same exact protocol. + resumed_protocol = _call_run_thermodynamic_trailblazing( + thermodynamic_distance, bidirectional_redistribution, + bidirectional_search_thermo_dist, checkpoint_dir_path + ) + assert protocol == resumed_protocol diff --git a/Yank/tests/test_yank.py b/Yank/tests/test_yank.py index 3856bbe0..c5d502d8 100644 --- a/Yank/tests/test_yank.py +++ b/Yank/tests/test_yank.py @@ -71,20 +71,23 @@ def test_topography(): def test_topography_subset_regions(): """Test that topography subset region selection works""" - # This test relies on all other tests for topography passing + # This test relies on all other tests for topography passing. host_guest_explicit = testsystems.HostGuestExplicit() topography = Topography(host_guest_explicit.topology, ligand_atoms='resname B2') ligand_list = list(range(126, 156)) receptor_list = list(range(126)) n_slice = 3 # Number of atoms to slice from front and back assert topography.ligand_atoms == ligand_list + + # Add regions that will be tested. topography.add_region('lig_dsl', 'resname B2') topography.add_region('lig_list', ligand_list) topography.add_region('rec_list', receptor_list) topography.add_region('rec_lig_slice', receptor_list[-n_slice:] + ligand_list[:n_slice]) - # Selection list: selection, subset, sort_by, result + + # Selection list: selection, subset, sort_by, expected result. selections = ( - # DSL select, same DSL subset, small to large + # DSL select, same DSL subset, small to large index. ('resname B2', 'resname B2', 'index', ligand_list), # DSL select, region subset ('resname B2', 'lig_list', 'index', ligand_list), @@ -98,11 +101,11 @@ def test_topography_subset_regions(): ('lig_list', 'resname CB7', 'index', []), # compound Region, partial subset in bad order, order of region ('lig_list or rec_list', 'rec_lig_slice', 'region_order', ligand_list[:n_slice] + receptor_list[-n_slice:]) - ) + ) for selection, subset, sort_by, result in selections: selected = topography.select(selection, sort_by=sort_by, subset=subset, as_set=False) - assert result == selected, "Failed to match {}, subset {}, by {} to {},\ngot {}".format(selection, subset, - sort_by, result, + assert result == selected, (f"Failed to match {selection}, subset {subset}, " + f"by {sort_by} to {result},\ngot {selected}") selected) @@ -110,7 +113,8 @@ def test_topography_regions(): """Test that topography regions are created and fetched""" toluene_vacuum = testsystems.TolueneVacuum() topography = Topography(toluene_vacuum.topology) - # Should do nothing and return nothing without error + + # Should do nothing and return nothing without error. assert topography.remove_region('Nothing') is None topography.add_region('A hard list', [0, 1, 2]) assert 'A hard list' in topography @@ -118,12 +122,13 @@ def test_topography_regions(): topography.remove_region('A junk list') assert 'A junk list' not in topography assert topography.get_region('A hard list') == [0, 1, 2] - # Confirm that string typing is handled + + # Confirm that string typing is handled. topography.add_region('carbon', 'element C') assert len(topography.get_region('carbon')) > 0 with nose.tools.assert_raises(ValueError): topography.add_region('failure', 'Bad selection string') - # Ensure region was not added + # Ensure region was not added. assert 'failure' not in topography diff --git a/Yank/yank.py b/Yank/yank.py index c13eb905..40f718dd 100644 --- a/Yank/yank.py +++ b/Yank/yank.py @@ -327,14 +327,12 @@ def select(self, selection, as_set=False, sort_by='auto', subset=None) -> Union[ """ selection = copy.deepcopy(selection) + topology = self.topology # Handle subset. Define a subset topology to manipulate, then define a common call to convert subset atom # into absolute atom if subset is not None: - subset_atoms = self.select(subset, sort_by='index', as_set=False, subset=None) - topology = self.topology.subset(subset_atoms) - else: - subset_atoms = None - topology = self.topology + subset = self.select(subset, sort_by='index', as_set=False, subset=None) + topology = self.topology.subset(subset) class AtomMap(object): """Atom mapper class""" @@ -359,7 +357,7 @@ def __contains__(self, item): else: return item in self.subset_atoms - atom_map = AtomMap(subset_atoms) + atom_map = AtomMap(subset) # Shorthand for later atom_mapping = atom_map.atom_mapping @@ -1298,7 +1296,7 @@ def _expand_state_cutoff(thermodynamic_state, expanded_cutoff_distance, raise RuntimeError('Barostated box sides must be at least {} Angstroms ' 'to correct for missing dispersion interactions. The ' 'minimum dimension of the provided box is {} Angstroms' - ''.format(expanded_cutoff_distance/unit.angstrom * 2, + ''.format(expanded_cutoff_distance/unit.angstrom * 2 / fluctuation_size, min_box_dimension/unit.angstrom)) logger.debug('Setting cutoff for fully interacting system to {}. The minimum box '