-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetadynamics.py
270 lines (246 loc) · 13.3 KB
/
metadynamics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import simtk.openmm as mm
import simtk.unit as unit
import numpy as np
from collections import namedtuple
from functools import reduce
import os
import re
import yank
import logging
cv_logger = logging.getLogger(__name__)
cv_logger.setLevel(logging.DEBUG)
handler = logging.FileHandler('cv.log')
cv_logger.addHandler(handler)
yank.utils.config_root_logger(verbose=True, log_file_path=None)
class Metadynamics(object):
"""Performs metadynamics.
This class implements well-tempered metadynamics, as described in Barducci et al.,
"Well-Tempered Metadynamics: A Smoothly Converging and Tunable Free-Energy Method"
(https://doi.org/10.1103/PhysRevLett.100.020603). You specify from one to three
collective variables whose sampling should be accelerated. A biasing force that
depends on the collective variables is added to the simulation. Initially the bias
is zero. As the simulation runs, Gaussian bumps are periodically added to the bias
at the current location of the simulation. This pushes the simulation away from areas
it has already explored, encouraging it to sample other regions. At the end of the
simulation, the bias function can be used to calculate the system's free energy as a
function of the collective variables.
To use the class you create a Metadynamics object, passing to it the System you want
to simulate and a list of BiasVariable objects defining the collective variables.
It creates a biasing force and adds it to the System. You then run the simulation
as usual, but call step() on the Metadynamics object instead of on the Simulation.
You can optionally specify a directory on disk where the current bias function should
periodically be written. In addition, it loads biases from any other files in the
same directory and includes them in the simulation. It loads files when the
Metqdynamics object is first created, and also checks for any new files every time it
updates its own bias on disk.
This serves two important functions. First, it lets you stop a metadynamics run and
resume it later. When you begin the new simulation, it will load the biases computed
in the earlier simulation and continue adding to them. Second, it provides an easy
way to parallelize metadynamics sampling across many computers. Just point all of
them to a shared directory on disk. Each process will save its biases to that
directory, and also load in and apply the biases added by other processes.
"""
def __init__(self, system, variables, force, temperature, biasFactor, height, frequency, saveFrequency=None, biasDir=None):
"""Create a Metadynamics object.
Parameters
----------
system: System
the System to simulate. A CustomCVForce implementing the bias is created and
added to the System.
variables: list of BiasVariables
the collective variables to sample
temperature: temperature
the temperature at which the simulation is being run. This is used in computing
the free energy.
biasFactor: float
used in scaling the height of the Gaussians added to the bias. The collective
variables are sampled as if the effective temperature of the simulation were
temperature*biasFactor.
height: energy
the initial height of the Gaussians to add
frequency: int
the interval in time steps at which Gaussians should be added to the bias potential
saveFrequency: int (optional)
the interval in time steps at which to write out the current biases to disk. At
the same it it writes biases, it also checks for updated biases written by other
processes and loads them in. This must be a multiple of frequency.
biasDir: str (optional)
the directory to which biases should be written, and from which biases written by
other processes should be loaded
"""
if not unit.is_quantity(temperature):
temperature = temperature*unit.kelvin
if not unit.is_quantity(height):
height = height*unit.kilojoules_per_mole
if biasFactor < 1.0:
raise ValueError('biasFactor must be >= 1')
if (saveFrequency is None and biasDir is not None) or (saveFrequency is not None and biasDir is None):
raise ValueError('Must specify both saveFrequency and biasDir')
if saveFrequency is not None and (saveFrequency < frequency or saveFrequency%frequency != 0):
raise ValueError('saveFrequency must be a multiple of frequency')
self.variables = variables
self.temperature = temperature
self.biasFactor = biasFactor
self.height = height
self.frequency = frequency
self.biasDir = biasDir
self.saveFrequency = saveFrequency
self._id = np.random.randint(0xFFFFFFFFFFFF)
self._saveIndex = 0
self._selfBias = np.zeros(tuple(v.gridWidth for v in variables))
self._totalBias = np.zeros(tuple(v.gridWidth for v in variables))
self._loadedBiases = {}
self._deltaT = temperature*(biasFactor-1)
varNames = ['cv%d' % i for i in range(len(variables))]
self._force = force
#for name, var in zip(varNames, variables):
# self._force.addCollectiveVariable(name, var.force)
widths = [v.gridWidth for v in variables]
mins = [v.minValue for v in variables]
maxs = [v.maxValue for v in variables]
if len(variables) == 1:
self._table = mm.Continuous1DFunction(self._totalBias.flatten(), mins[0], maxs[0])
elif len(variables) == 2:
self._table = mm.Continuous2DFunction(widths[0], widths[1], self._totalBias.flatten(), mins[0], maxs[0], mins[1], maxs[1])
elif len(variables) == 3:
self._table = mm.Continuous3DFunction(widths[0], widths[1], widths[2], self._totalBias.flatten(), mins[0], maxs[0], mins[1], maxs[1], mins[2], maxs[2])
else:
raise ValueError('Metadynamics requires 1, 2, or 3 collective variables')
self._force.addTabulatedFunction('table', self._table)
self._force.setForceGroup(31)
system.addForce(self._force)
self._syncWithDisk()
def step(self, simulation, steps):
"""Advance the simulation by integrating a specified number of time steps.
Parameters
----------
simulation: Simulation
the Simulation to advance
steps: int
the number of time steps to integrate
"""
stepsToGo = steps
while stepsToGo > 0:
nextSteps = stepsToGo
if simulation.currentStep % self.frequency == 0:
nextSteps = min(nextSteps, self.frequency)
else:
nextSteps = min(nextSteps, simulation.currentStep % self.frequency)
simulation.step(nextSteps)
if simulation.currentStep % self.frequency == 0:
simulation.context.setParameter('report', 1.0)
# This prints the current value of r_parallel and r_orthogonal
cv_logger.debug('{} {}'.format(simulation.context.getState(getEnergy=True, groups=2**29).getPotentialEnergy()._value,
simulation.context.getState(getEnergy=True,groups=2**30).getPotentialEnergy()._value))
# position is the current value of r_parallel
position =[simulation.context.getState(getEnergy=True, groups=2**29).getPotentialEnergy()._value]
simulation.context.setParameter('report', 0.0)
energy = simulation.context.getState(getEnergy=True, groups={31}).getPotentialEnergy()
height = self.height*np.exp(-energy/(unit.MOLAR_GAS_CONSTANT_R*self._deltaT))
self._addGaussian(position, height, simulation.context)
if self.saveFrequency is not None and simulation.currentStep % self.saveFrequency == 0:
self._syncWithDisk()
stepsToGo -= nextSteps
def getFreeEnergy(self):
"""Get the free energy of the system as a function of the collective variables.
The result is returned as a N-dimensional NumPy array, where N is the number of collective
variables. The values are in kJ/mole. The i'th position along an axis corresponds to
minValue + i*(maxValue-minValue)/gridWidth.
"""
return -((self.temperature+self._deltaT)/self._deltaT)*self._totalBias
def _addGaussian(self, position, height, context):
"""Add a Gaussian to the bias function."""
# Compute a Gaussian along each axis.
axisGaussians = []
for i,v in enumerate(self.variables):
x = (position[i]-v.minValue) / (v.maxValue-v.minValue)
if v.periodic:
x = x % 1.0
dist = np.abs(np.arange(0, 1, 1.0/v.gridWidth) - x)
if v.periodic:
dist = np.min(np.array([dist, np.abs(dist-1)]), axis=0)
axisGaussians.append(np.exp(-dist*dist*v.gridWidth/v.biasWidth))
# Compute their outer product.
if len(self.variables) == 1:
gaussian = axisGaussians[0]
else:
gaussian = reduce(np.multiply.outer, reversed(axisGaussians))
# Add it to the bias.
height = height.value_in_unit(unit.kilojoules_per_mole)
self._selfBias += height*gaussian
self._totalBias += height*gaussian
widths = [v.gridWidth for v in self.variables]
mins = [v.minValue for v in self.variables]
maxs = [v.maxValue for v in self.variables]
if len(self.variables) == 1:
self._table.setFunctionParameters(self._totalBias.flatten(), mins[0], maxs[0])
elif len(self.variables) == 2:
self._table.setFunctionParameters(widths[0], widths[1], self._totalBias.flatten(), mins[0], maxs[0], mins[1], maxs[1])
elif len(self.variables) == 3:
self._table.setFunctionParameters(widths[0], widths[1], widths[2], self._totalBias.flatten(), mins[0], maxs[0], mins[1], maxs[1], mins[2], maxs[2])
self._force.updateParametersInContext(context)
def _syncWithDisk(self):
"""Save biases to disk, and check for updated files created by other processes."""
if self.biasDir is None:
return
# Use a safe save to write out the biases to disk, then delete the older file.
oldName = os.path.join(self.biasDir, 'bias_%d_%d.npy' % (self._id, self._saveIndex))
self._saveIndex += 1
tempName = os.path.join(self.biasDir, 'temp_%d_%d.npy' % (self._id, self._saveIndex))
fileName = os.path.join(self.biasDir, 'bias_%d_%d.npy' % (self._id, self._saveIndex))
np.save(tempName, self._selfBias)
cv_logger.debug(fileName)
os.rename(tempName, fileName)
#if os.path.exists(oldName):
# os.remove(oldName)
# Check for any files updated by other processes.
fileLoaded = False
pattern = re.compile('bias_(.*)_(.*)\.npy')
for filename in os.listdir(self.biasDir):
match = pattern.match(filename)
if match is not None:
matchId = int(match.group(1))
matchIndex = int(match.group(2))
if matchId != self._id and (matchId not in self._loadedBiases or matchIndex > self._loadedBiases[matchId].index):
data = np.load(os.path.join(self.biasDir, filename))
self._loadedBiases[matchId] = _LoadedBias(matchId, matchIndex, data)
fileLoaded = True
# If we loaded any files, recompute the total bias from all processes.
if fileLoaded:
self._totalBias = self._selfBias
for bias in self._loadedBiases.values():
self._totalBias += bias.bias
if os.path.exists(oldName):
os.remove(oldName)
class BiasVariable(object):
"""A collective variable that can be used to bias a simulation with metadynamics."""
def __init__(self, force, minValue, maxValue, biasWidth, periodic=False, gridWidth=None):
"""Create a BiasVariable.
Parameters
----------
force: Force
the Force object whose potential energy defines the collective variable
minValue: float
the minimum value the collective variable can take. If it should ever go below this,
the bias force will be set to 0.
maxValue: float
the maximum value the collective variable can take. If it should ever go above this,
the bias force will be set to 0.
biasWidth: float
the width (standard deviation) of the Gaussians added to the bias during metadynamics
periodic: bool
whether this is a periodic variable, such that minValue and maxValue are physical equivalent
gridWidth: int
the number of grid points to use when tabulating the bias function. If this is omitted,
a reasonable value is chosen automatically.
"""
self.force = force
self.minValue = minValue
self.maxValue = maxValue
self.biasWidth = biasWidth
self.periodic = periodic
if gridWidth is None:
self.gridWidth = int(np.ceil(5*(maxValue-minValue)/biasWidth))
else:
self.gridWidth = gridWidth
_LoadedBias = namedtuple('LoadedBias', ['id', 'index', 'bias'])