Created
September 22, 2020 06:35
-
-
Save andersx/d16e3daa982d4b74e25ed5f2b3a72fda to your computer and use it in GitHub Desktop.
GPR MD ASE example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
from copy import deepcopy | |
import numpy as np | |
import scipy.linalg | |
from scipy.linalg import norm | |
from ase import Atoms | |
from ase.io import read, write | |
from ase.optimize import * | |
from ase import units | |
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution | |
from ase.md.langevin import Langevin | |
from ase.md.verlet import VelocityVerlet | |
from ase.io.trajectory import Trajectory | |
from os.path import splitext | |
from ase.visualize import view | |
# from ase_util import transform_to_eckart_frame | |
import uuid | |
import ase | |
from ase import Atoms | |
from ase.optimize import BFGS | |
from ase.vibrations import Vibrations | |
import qml | |
from qml.representations import generate_fchl_acsf | |
from qml.math import cho_solve | |
from tqdm import tqdm | |
import numpy as np | |
import time | |
from ase import Atoms | |
from ase.calculators.calculator import Calculator, all_changes | |
from qml.kernels import get_gp_kernel | |
from qml.kernels import get_symmetric_gp_kernel | |
from qml.representations import generate_fchl_acsf | |
new_cut = 8.0 | |
cut_parameters = { | |
"rcut": new_cut, | |
"acut": new_cut, | |
"nRs2": int(24 * new_cut / 8.0), | |
"nRs3": int(20 * new_cut / 8.0), | |
} | |
def gen_representations(data): | |
nuclear_charges = [] | |
print(list(data.keys())) | |
print(data["Z"]) | |
max_atoms = 4 | |
elements = [1,6,8] | |
print("max_atoms", max_atoms) | |
print("elements", elements) | |
reps = [] | |
dreps = [] | |
for i in tqdm(range(len(data["E"]))): | |
x, dx = generate_fchl_acsf(data["Z"][i], data["R"][i], elements=elements, gradients=True, | |
**cut_parameters) | |
reps.append(x) | |
dreps.append(dx) | |
energies = data["E"].flatten() | |
nuclear_charges = data["Z"].tolist() | |
reps = np.array(reps) | |
dreps = np.array(dreps) | |
return reps, dreps, nuclear_charges, energies | |
class QMLCalculatorGPR(Calculator): | |
name = 'QMLCalculator' | |
implemented_properties = ['energy', 'forces'] | |
def __init__(self, parameters, representations, deriv_reps, charges, alphas, reducer=None, **kwargs): | |
super().__init__(**kwargs) | |
# unpack parameters | |
offset = parameters["offset"] | |
sigma = parameters["sigma"] | |
self.set_model(alphas, representations, deriv_reps, charges, offset, sigma) | |
self.energy = 0.0 | |
self.reducer = reducer | |
self.last_atoms = -1 | |
if reducer is not None: | |
self.repr = np.einsum("ijk,kl->ijl", representations, reducer) | |
self.drepr = np.einsum("ijkmn,kl->ijlmn", deriv_reps, reducer) | |
def calculate(self, atoms: Atoms = None, properties=('energy', 'forces'), | |
system_changes=all_changes): | |
if atoms is None: | |
atoms = self.atoms | |
if atoms is None: | |
raise ValueError( | |
'No ASE atoms supplied to calculation, and no ASE atoms supplied with initialisation.') | |
self.query(atoms) | |
if 'energy' in properties: | |
self.results['energy'] = self.energy | |
if 'forces' in properties: | |
self.results['forces'] = self.forces | |
return | |
def set_model(self, alphas, representations, deriv_reps, charges, offset, sigma): | |
self.alphas = alphas | |
self.repr = representations | |
self.drepr = deriv_reps | |
self.charges = charges | |
# Offset from training | |
self.offset = offset | |
# Hyper-parameters | |
self.sigma = sigma | |
self.max_atoms = self.repr.shape[1] | |
self.n_atoms = len(charges[0]) | |
return | |
def query(self, atoms=None, print_time=True): | |
if print_time: | |
start = time.time() | |
# kcal/mol til ev | |
# kcal/mol/aangstrom til ev / aangstorm | |
conv_energy = 1.0 #0.0433635093659 | |
conv_force = 1.0 # 0.0433635093659 | |
coordinates = atoms.get_positions() | |
nuclear_charges = atoms.get_atomic_numbers() | |
n_atoms = coordinates.shape[0] | |
rep_start = time.time() | |
rep, drep = generate_fchl_acsf( | |
nuclear_charges, | |
coordinates, | |
gradients=True, | |
elements=[1,6,8], | |
# pad=self.max_atoms, | |
**cut_parameters) | |
rep_end = time.time() | |
einsum_start = time.time() | |
# Put data into arrays | |
Qs = [nuclear_charges] | |
Xs = np.array([rep], order="F") | |
dXs = np.array([drep], order="F") | |
if self.reducer is not None: | |
Xs = np.einsum("ijk,kl->ijl", Xs, self.reducer) | |
dXs = np.einsum("ijkmn,kl->ijlmn", dXs, self.reducer) | |
einsum_end = time.time() | |
kernel_start = time.time() | |
Ks = get_gp_kernel(self.repr, Xs, self.drepr, dXs, self.charges, Qs, self.sigma) | |
kernel_end = time.time() | |
pred_start = time.time() | |
# Energy prediction | |
energy_predicted = np.dot(Ks[:1,:], self.alphas)[0] + self.offset | |
self.energy = energy_predicted * conv_energy | |
# Force prediction | |
forces_predicted = np.dot(Ks[1:,:], self.alphas).reshape((n_atoms, 3)) | |
self.forces = forces_predicted * conv_force | |
pred_end = time.time() | |
if print_time: | |
end = time.time() | |
# print("rep ", rep_end - rep_start) | |
# print("einsum ", einsum_end - einsum_start) | |
# print("kernel ", kernel_end - kernel_start) | |
# print("prediciton ", pred_end - pred_start) | |
print("qml query {:7.3f}s {:10.3f} ".format(end-start, energy_predicted)) | |
return | |
def calculation_required(atoms, quantities): | |
print("TEST") | |
if atoms == self.last_atoms: | |
print("Not required") | |
return False | |
else: | |
print("Required") | |
return True | |
def get_potential_energy(self, atoms=None, force_consistent=False): | |
energy = self.energy | |
return energy | |
def get_forces(self, atoms=None): | |
# print(atoms.get_positions()) | |
# print(self.last_atoms) | |
do_query = True | |
try: | |
D = atoms.get_positions() - self.last_atoms | |
if norm(D) < 1e-12: | |
do_query = False | |
except: | |
do_query = True | |
if do_query: | |
self.last_atoms = atoms.get_positions() | |
self.query(atoms=atoms) | |
forces = self.forces | |
return forces | |
def train_alphas(reps, dreps, nuclear_charges, E, F, train_idx, parameters): | |
kernel_name = "../scratch/K_gpr_%f.npy" % parameters["sigma"] | |
K = np.load(kernel_name) | |
print(reps.shape) | |
all_idx = np.array(list(range(4001))) | |
test_idx = np.array([i for i in all_idx if i not in train_idx]) | |
print(train_idx) | |
print(test_idx) | |
natoms = 4 | |
nmols = 4001 | |
atoms = np.array([i for i in range(natoms*3)]) | |
train_idx_force = np.array([atoms + (3*natoms)*j + nmols for j in train_idx]).flatten() | |
test_idx_force = np.array([atoms + (3*natoms)*j + nmols for j in test_idx]).flatten() | |
idx = np.concatenate((train_idx, train_idx_force)) | |
C = deepcopy(K[idx[:,np.newaxis], idx]) | |
eKs = deepcopy(K[test_idx[:,np.newaxis],idx]) | |
fKs = deepcopy(K[test_idx_force[:,np.newaxis],idx]) | |
Y = np.concatenate((E[train_idx], F[train_idx].flatten())) | |
alphas = cho_solve(C, Y, l2reg=llambda, destructive=True) | |
eYs = deepcopy(E[test_idx]) | |
fYs = deepcopy(F[test_idx]).flatten() | |
eYss = np.dot(eKs, alphas) | |
fYss = np.dot(fKs, alphas) | |
ermse_test = np.sqrt(np.mean(np.square(eYss - eYs))) | |
emae_test = np.mean(np.abs(eYss - eYs)) | |
frmse_test = np.sqrt(np.mean(np.square(fYss - fYs))) | |
fmae_test = np.mean(np.abs(fYss - fYs)) | |
schnet_score = 0.01 * sum(np.square(eYss - eYs)) | |
schnet_score += sum(np.square(fYss - fYs)) / natoms | |
print("TEST %5.2f %.2E %6.4e %10.8f %10.8f %10.8f %10.8f" % \ | |
(parameters["sigma"], parameters["llambda"], schnet_score, emae_test, ermse_test, fmae_test, frmse_test)) | |
repsize = reps.shape[2] | |
n_train = len(train_idx) | |
n_test = len(test_idx) | |
X = reps[train_idx] | |
Xs = reps[test_idx] | |
dX = dreps[train_idx] | |
dXs = dreps[test_idx] | |
Q = nuclear_charges[train_idx] | |
Qs = nuclear_charges[test_idx] | |
print(X.shape) | |
Xpca = X.reshape((len(train_idx)*natoms,repsize)) | |
eigen_vecs, eigen_vals, Vh = np.linalg.svd(Xpca.T, full_matrices=False, compute_uv=True) | |
print(eigen_vecs.shape) | |
NPCA = 80 | |
print(eigen_vals[NPCA-1]) | |
cev = 100 - (np.sum(eigen_vals) - np.sum(eigen_vals[:NPCA])) / np.sum(eigen_vals) * 100 | |
print("Cumulative explained variance = %f %%" % cev ) | |
reducer = eigen_vecs[:,:NPCA] | |
print(reducer.shape) | |
print(X.shape) | |
X = np.einsum("ijk,kl->ijl", X, reducer) | |
print(X.shape) | |
print(Xs.shape) | |
Xs = np.einsum("ijk,kl->ijl", Xs, reducer) | |
print(Xs.shape) | |
print(dX.shape) | |
dX = np.einsum("ijkmn,kl->ijlmn", dX, reducer) | |
print(dX.shape) | |
print(dXs.shape) | |
dXs = np.einsum("ijkmn,kl->ijlmn", dXs, reducer) | |
print(dXs.shape) | |
# C = deepcopy(K[idx[:,np.newaxis], idx]) | |
C = get_symmetric_gp_kernel(X, dX, Q, parameters["sigma"]) | |
Ks = get_gp_kernel(X, Xs, dX, dXs, Q, Qs, parameters["sigma"]) | |
# eKs = deepcopy(K[test_idx[:,np.newaxis],idx]) | |
# fKs = deepcopy(K[test_idx_force[:,np.newaxis],idx]) | |
eKs = deepcopy(Ks[:n_test]) | |
fKs = deepcopy(Ks[n_test:]) | |
print(C.shape) | |
print(eKs.shape) | |
print(fKs.shape) | |
Y = np.concatenate((E[train_idx], F[train_idx].flatten())) | |
alphas = cho_solve(C, Y, l2reg=llambda, destructive=True) | |
eYs = deepcopy(E[test_idx]) | |
fYs = deepcopy(F[test_idx]).flatten() | |
eYss = np.dot(eKs, alphas) | |
fYss = np.dot(fKs, alphas) | |
ermse_test = np.sqrt(np.mean(np.square(eYss - eYs))) | |
emae_test = np.mean(np.abs(eYss - eYs)) | |
frmse_test = np.sqrt(np.mean(np.square(fYss - fYs))) | |
fmae_test = np.mean(np.abs(fYss - fYs)) | |
schnet_score = 0.01 * sum(np.square(eYss - eYs)) | |
schnet_score += sum(np.square(fYss - fYs)) / natoms | |
print("TEST %5.2f %.2E %6.4e %10.8f %10.8f %10.8f %10.8f" % \ | |
(parameters["sigma"], parameters["llambda"], schnet_score, emae_test, ermse_test, fmae_test, frmse_test)) | |
return alphas, reducer | |
if __name__ == "__main__": | |
temperature = 300 | |
timestep = 0.5 | |
friction = 0.01 | |
data = np.load(sys.argv[1]) | |
train_idx = sorted(np.loadtxt(sys.argv[2], dtype=int)) | |
sigma = float(sys.argv[3]) | |
llambda = float(sys.argv[4]) | |
E = data["E"] # * 23.06035 # * 627.51 | |
F = data["F"] # * 23.06035 # * 627.51 # / 0.52917721092 | |
# reps = np.load("../learning_curves_new_splits/npy_data/X.npy") | |
# dreps = np.load("../learning_curves_new_splits/npy_data/dX.npy") | |
reps, dreps, nuclear_charges, energies = gen_representations(data) | |
nuclear_charges = np.load("../learning_curves_new_splits/npy_data/Q.npy") | |
parameters = { | |
"sigma": sigma, | |
"offset": 0.0, | |
"llambda": llambda, | |
} | |
alphas, reducer = train_alphas(reps, dreps, nuclear_charges, E, F, train_idx, parameters) | |
np.save("alphas.npy", alphas) | |
# alphas = np.load("npy_data/alphas_gpr_%f.npy" % parameters["sigma"]) | |
calculator = QMLCalculatorGPR(parameters, reps[train_idx], dreps[train_idx], nuclear_charges[train_idx], | |
alphas, reducer=reducer) | |
coordinates = np.array([ | |
[ 0.20123873, -0.39602112, -0.32005839], | |
[-0.23296217, 0.48683071, 0.37425374], | |
[ 0.6071696 , -0.20890494, -1.3476949 ], | |
[ 0.22926032, -1.46543064, 0.01371154]]) | |
nuclear_charges = np.array([6, 8, 1, 1,]) | |
molecule = ase.Atoms(nuclear_charges, coordinates) | |
molecule.set_calculator(calculator) | |
BFGS(molecule).run(fmax=0.00001) | |
vib = Vibrations(molecule, nfree=4) | |
vib.run() | |
vib.summary() | |
# Set the momenta corresponding to a temperature T | |
MaxwellBoltzmannDistribution(molecule, temperature * units.kB) | |
# define the algorithm for MD: here Langevin alg. with with a time step of 0.1 fs, | |
# the temperature T and the friction coefficient to 0.02 atomic units. | |
dyn = Langevin(molecule, timestep * units.fs, temperature * units.kB, friction) | |
R_list = [] | |
P_list = [] | |
# Equilibrate in NVT ensemble for 100'000 steps (=50ps) | |
for i in range(100000): | |
dyn.run(1) | |
print("NVT equilibration at step: " + str(i)) | |
pos = molecule.get_positions() | |
mom = molecule.get_momenta() | |
R_list.append(pos) | |
P_list.append(mom) | |
R_list = np.array(R_list) | |
P_list = np.array(R_list) | |
np.save("pos_equil.npy", R_list) | |
np.save("mom_equil.npy", P_list) | |
#change to NVE ensemble | |
dyn = VelocityVerlet(molecule, timestep * units.fs) | |
R_list = [] | |
P_list = [] | |
E_list = [] | |
F_list = [] | |
for i in range(400000): | |
dyn.run(1) | |
# pos = atoms.get_positions() | |
# mom = atoms.get_momenta() | |
pos = molecule.get_positions() | |
mom = molecule.get_momenta() | |
R_list.append(pos) | |
P_list.append(mom) | |
R_list = np.array(R_list) | |
P_list = np.array(R_list) | |
np.save("pos.npy", R_list) | |
np.save("mom.npy", P_list) | |
#print("running md simulation at step: " + str(i)) | |
# epot = atoms.get_potential_energy() | |
# ekin = atoms.get_kinetic_energy() | |
# etot = epot + ekin | |
# | |
# for i in range(args.steps): | |
# dyn.run(1) | |
# if i%args.interval == 0: # run md now | |
# | |
# qi = atoms.get_charges() | |
# pos = atoms.get_positions() | |
# | |
# R_list.append(pos) | |
# q_list.append(qi) | |
# p_list.append(np.dot(qi, pos)) | |
# | |
# #print("running md simulation at step: " + str(i)) | |
# # traj.write() | |
# # time += 1 | |
# mom_last_frame = atoms.get_momenta() | |
# #print(results) | |
# positions = np.asarray(R_list) | |
# charges = np.asarray(q_list) | |
# dipole = np.asarray(p_list) | |
# etot = np.asarray(etot) | |
# mom_last_frame = np.asarray(mom_last_frame) | |
# | |
# np.savez("ir_md_"+filename +'.npz', positions= positions, charges=charges, dipole = dipole, total_energy = etot, mom_last_frame = mom_last_frame) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment