import dask
import datetime
import logging
import time
import numpy as np
from ml4chem.utils import convert_elapsed_time, get_chunks
from collections import OrderedDict
from scipy.linalg import cholesky

logger = logging.getLogger()

[docs]class KernelRidge(object): """Kernel Ridge Regression Parameters ---------- sigma : float, list, or dict Length scale of the Gaussian in the case of RBF, exponential, and laplacian kernels. Default is 1. (float) and it computes isotropic kernels. Pass a list if you would like to compute anisotropic kernels, or a dictionary if you want sigmas for each model. Example: >>> sigma={'energy': {'H': value, 'O': value}, 'forces': {'H': {0: value, 1: value, 2: value}, 'O': {0: value, 1: value, 2: value}}} `value` can be a float or a list. kernel : str Choose the kernel. Available kernels are: 'linear', 'rbf', 'laplacian', and 'exponential'. Default is 'rbf'. lamda : float, or dictionary Strength of the regularization. If you pass a dictionary then force and energy will have different regularization: >>> lamda = {'energy': value, 'forces': value} Dictionaries are only used when performing Cholesky factorization. trainingimages : str Path to Trajectory file containing the images in the training set. This is useful for predicting new structures. cholesky : bool Whether or not we are using Cholesky decomposition to determine the weights. This method returns an unique set of regression coefficients. weights_independent : bool Whether or not the weights are going to be split for energy and forces. forcetraining : bool Turn force training true. nnpartition : str Use per-atom energy partition from a neural network calculator. You have to set the path to .amp file. Useful for energy training with Cholesky factorization. Default is set to None. scheduler : str The scheduler to be used with the dask backend. sum_rule : bool Whether or not we sum of fingerprintprime elements over a given axis. This applies np.sum(fingerprint_list, axis=0). batch_size : int Number of elements per batch in order to split computations. Useful when number of local chemical environments is too large. weights : dict Dictionary of weights. Notes ----- In the case of training total energies, we need to apply either an atomic decomposition Ansatz (ADA) during training or an energy partition scheme to the training set. ADA can be achieved based on Ref. 1. For an explanation of what they do, see the Master thesis by Sonja Mathias. ADA is the default way of training total energies in this KernelRidge class. An energy partition scheme for total energies can be obtained from an artificial neural network or methods such as the interacting quantum atoms theory (IQA). I implemented the nnpartition mode for which users can provide the path to a NN calculator and we take the energies per-atom from the function .calculate_atomic_energy(). The strategy would be to use train the NN with a very tight convergence criterion (1e-6 RSME). Then, calling .calculate_atomic_energy() would give you the atomic energies for such set. For forces is a different history because we do know the derivative of the energy with respect to atom positions (a per-atom quantity). So we rely on the method in the algorithm shown by Rupp in Ref. 2. References ---------- 1. Bartók, A. P. & Csányi, G. Gaussian approximation potentials: A brief tutorial introduction. Int. J. Quantum Chem. 115, 1051–1057 (2015). 2. Rupp, M. Machine learning for quantum mechanics in a nutshell. Int. J. Quantum Chem. 115, 1058–1073 (2015). """ NAME = "KernelRidge"
[docs] @classmethod def name(cls): """Returns name of class""" return cls.NAME
def __init__( self, sigma=1.0, kernel="rbf", scheduler="distributed", lamda=1e-5, trainingimages=None, checkpoints=None, cholesky=True, weights_independent=True, forcetraining=False, nnpartition=None, sum_rule=True, batch_size=None, weights=None, **kwargs ): np.set_printoptions(precision=30, threshold=999999999) self.kernel = kernel self.sigma = sigma self.scheduler = scheduler self.lamda = lamda self.batch_size = batch_size # Let's add parameters that are going to be stored in the .params json # file. self.params = OrderedDict() self.params["name"] = self.params["type"] = "svm" self.params["class_name"] = self.__class__.__name__ # This is a very general way of not forgetting to save variables _params = vars() # Delete useless variables del _params["self"] for k, v in _params.items(): if v is not None: self.params[k] = v # Everything that is added here is not going to be part of the json # params file self.fingerprint_map = [] if weights is None: self.weights = {} else: self.weights = weights
[docs] def prepare_model( self, feature_space, reference_features, data=None, purpose="training" ): """Prepare the Kernel Ridge Regression model Parameters ---------- feature_space : dict A dictionary with hash, fingerprint structure. reference_features : dict A dictionary with raveled tuples of symbol, atomic fingerprint. data : object Data object created from the handler. purpose : str Purpose of this model: 'training', 'inference'. Notes ----- This method builds the atomic kernel matrices and the LT vectors needed to apply the atomic decomposition Ansatz. """ if self.batch_size == None: self.batch_size = data.get_total_number_atoms() if purpose == "training": now =" ")"Model")"=====") "Module accessed on {}.".format(now.strftime("%Y-%m-%d %H:%M:%S")) )"Model name: {}.".format("Kernel parameters:")" - Kernel function: {}.".format(self.kernel))" - Sigma: {}.".format(self.sigma))" - Lamda: {}.".format(self.lamda))" ") dim = len(reference_features) """ Atomic kernel matrices """"Computing Kernel Matrix...") # We start populating computations with delayed functions to # operate with dask's scheduler kernel_matrix = self.get_kernel_matrix( feature_space, reference_features, purpose=purpose ) # futures = self.get_kernel_matrix( # feature_space, reference_features, purpose=purpose # ) # if self.batch_size is not None: # futures = list(get_chunks(futures, self.batch_size)) # # " The calculations are in batches of {}.".format(self.batch_size) # ) # We compute the calculations with dask and the result is converted # to numpy array. # if self.batch_size is None: # kernel_matrix = client.gather(futures) # else: # kernel_matrix = [] # for chunk in futures: # kernel_matrix += client.gather(chunk) # FIXME probably not very efficient yet. # Found at _K = np.zeros([dim, dim]) indices_upper = np.triu_indices(dim) _K[indices_upper] = kernel_matrix self.K = _K.T + _K np.fill_diagonal(self.K, np.diag(_K)) del _K
[docs] def get_kernel_matrix(self, feature_space, reference_features, purpose): """Get kernel matrix delayed computations Parameters ---------- features : dict, list Dictionary with hash and features, or a list. reference_space : array Array with reference feature space. purpose : str Purpose of this kernel matrix. Accepted arguments are 'training', and 'inference'. Returns ------- kernel_matrix List with kernel matrix values. Notes ----- This class method expects the feature_space to be an OrderedDict and reference_space but it turns out that for computing variances, it might be the case the feature_space is also a list. """ call = {"exponential": exponential, "laplacian": laplacian, "rbf": rbf} initial_time = time.time() if isinstance(reference_features, dict): # This is the case when the reference_features are a # dictionary, too. If that's true we have to convert it to a list. reference_features = list(reference_features.values())[0] chunks = list(get_chunks(feature_space, self.batch_size)) " The calculations are distributed in {} batches of {} atoms.".format( len(chunks), self.batch_size ) ) counter = 0 kernel_matrix = [] for c, chunk in enumerate(chunks): chunk_initial_time = time.time()" Computing kernel functions for chunk {}...".format(c)) intermediates = [] if isinstance(feature_space, dict) and isinstance(reference_features, list): if isinstance(chunk, dict) is False: chunk = OrderedDict(chunk) reference_lenght = len(reference_features) for hash, _feature_space in chunk.items(): f_map = [] for i_symbol, i_afp in _feature_space: i_symbol = decode(i_symbol) f_map.append(1) if purpose == "training": for j in range(counter, reference_lenght): j_symbol, j_afp = reference_features[j] kernel = call[self.kernel]( i_afp, j_afp, i_symbol, j_symbol, self.sigma ) intermediates.append(kernel) counter += 1 else: for j_symbol, j_afp in reference_features: j_symbol = decode(j_symbol) kernel = call[self.kernel]( i_afp, j_afp, i_symbol, j_symbol, self.sigma ) intermediates.append(kernel) self.fingerprint_map.append(f_map) elif isinstance(feature_space, list) and isinstance( reference_features, list ): for i_symbol, i_afp in chunk: for j_symbol, j_afp in reference_features: i_symbol = decode(i_symbol) j_symbol = decode(j_symbol) kernel = call[self.kernel]( i_afp, j_afp, i_symbol, j_symbol, self.sigma ) intermediates.append(kernel) # Compute stuff from above kernel_matrix += dask.compute(intermediates, scheduler=self.scheduler)[0] del intermediates chunk_final_time = time.time() - chunk_initial_time h, m, s = convert_elapsed_time(chunk_final_time) " ...finished in {} hours {} minutes {:.2f} " "seconds.".format(h, m, s) ) # dask.distributed.wait(kernel_matrix) del reference_features # kernel_matrix = client.gather(kernel_matrix) build_time = time.time() - initial_time h, m, s = convert_elapsed_time(build_time) "Kernel matrix built in {} hours {} minutes {:.2f} " "seconds.".format(h, m, s) ) """ LT Vectors """ # We build the LT matrix needed for ADA if purpose == "training": self.LT = []"Building LT matrix") computations = [] for index, feature_space in enumerate(feature_space.items()): computations.append(self.get_lt(index)) computations = list(get_chunks(computations, self.batch_size)) " The calculations are distributed in {} batches of {} molecules.".format( len(computations), self.batch_size ) ) for chunk in computations: self.LT += dask.compute(*chunk, scheduler=self.scheduler) self.LT = np.array(self.LT) del computations del chunk lt_time = time.time() - initial_time h, m, s = convert_elapsed_time(lt_time) "LT matrix built in {} hours {} minutes {:.2f} seconds.".format(h, m, s) ) return kernel_matrix
[docs] def train(self, inputs, targets, data=None): """Train the model Parameters ---------- inputs : dict Dictionary with hashed feature space. targets : list The expected values that the model has to learn aka y. data : object Data object created from the handler. """ if isinstance(self.lamda, dict): lamda = self.lamda["energy"] else: lamda = self.lamda size = len(targets) I_e = np.identity(size) K = del self.LT"Size of the Kernel matrix is {}.".format(K.shape))"Starting Cholesky Factorization...") cholesky_U = cholesky((K + (lamda * I_e)))"Cholesky Factorization finished...") betas = np.linalg.solve(cholesky_U.T, targets) _weights = np.linalg.solve(cholesky_U, betas) _weights = [ w * g for index, w in enumerate(_weights) for g in self.fingerprint_map[index] ] self.weights["energy"] = _weights
[docs] def get_potential_energy(self, features, reference_space, purpose): """Get potential energy with Kernel Ridge Parameters ---------- features : dict Dictionary with hash and features. reference_space : array Array with reference feature space. purpose : str Purpose of this function: 'training', 'inference'. Returns ------- energy Energy of a molecule. """ client = dask.distributed.get_client() try: reference_space = reference_space[b"reference_space"] except KeyError: reference_space = reference_space["reference_space"] futures = self.get_kernel_matrix(features, reference_space, purpose=purpose) kernel = np.array(client.gather(futures)) weights = np.array(self.weights["energy"]) dim = int(kernel.shape[0] / weights.shape[0]) kernel = kernel.reshape(dim, len(weights)) energy_per_atom =, weights) energy = np.sum(energy_per_atom) return energy
@dask.delayed def get_lt(self, index): """Return LT vectors Parameters ---------- index : int Index of image. Returns ------- _LT : list Returns a list that maps atomic features in the images. """ _LT = [] for i, group in enumerate(self.fingerprint_map): if i == index: for _ in group: _LT.append(1.0) else: for _ in group: _LT.append(0.0) return _LT
[docs] def get_sigma(self, sigma, forcetraining=False): """Function to build sigma Parameters ---------- sigma : float, list or dict. This is user's raw input for sigma. forcetraining : bool Whether or not force training is set to true. Returns ------- _sigma : dict Universal sigma dictionary for KernelRidge. """ _sigma = {} if isinstance(sigma, int): for symbol in unique_element_symbols: _sigma[symbol] = sigma return _sigma
""" Auxiliary functions """ @dask.delayed def linear(feature_i, feature_j, i_symbol=None, j_symbol=None): """ Compute a linear kernel Parameters ---------- feature_i : np.array Atomic fingerprint for central atom. feature_j : np.array Atomic fingerprint for j atom. i_symbol : str Chemical symbol for central atom. j_symbol : str Chemical symbol for j atom. Returns ------- linear :float Linear kernel. """ if i_symbol != j_symbol: return 0.0 else: linear =, feature_j) return linear @dask.delayed def rbf(feature_i, feature_j, i_symbol=None, j_symbol=None, sigma=1.0): """ Compute the rbf (AKA Gaussian) kernel. Parameters ---------- feature_i : np.array Atomic fingerprint for central atom. feature_j : np.array Atomic fingerprint for j atom. i_symbol : str Chemical symbol for central atom. j_symbol : str Chemical symbol for j atom. sigma : float, or list. Gaussian width. If passed as a list or np.darray, kernel can become anisotropic. Returns ------- rbf :float RBF kernel. """ if i_symbol != j_symbol: return 0.0 else: if isinstance(sigma, list) or isinstance(sigma, np.ndarray): assert len(sigma) == len(feature_i) and len(sigma) == len(feature_j), ( "Length of sigma does not " "match atomic fingerprint " "length." ) sigma = np.array(sigma) anisotropic_rbf = np.exp( -( np.sum( np.divide( np.square(np.subtract(feature_i, feature_j)), (2.0 * np.square(sigma)), ) ) ) ) return anisotropic_rbf else: rbf = np.exp( -(np.linalg.norm(feature_i - feature_j) ** 2.0) / (2.0 * sigma ** 2.0) ) return rbf @dask.delayed def exponential(feature_i, feature_j, i_symbol=None, j_symbol=None, sigma=1.0): """ Compute the exponential kernel Parameters ---------- feature_i : np.array Atomic fingerprint for central atom. feature_j : np.array Atomic fingerprint for j atom. i_symbol : str Chemical symbol for central atom. j_symbol : str Chemical symbol for j atom. sigma : float, or list. Gaussian width. Returns ------- exponential : float Exponential kernel. """ if i_symbol != j_symbol: return 0.0 else: if isinstance(sigma, list) or isinstance(sigma, np.ndarray): assert len(sigma) == len(feature_i) and len(sigma) == len(feature_j), ( "Length of sigma does not " "match atomic fingerprint " "length." ) sigma = np.array(sigma) anisotropic_exp = np.exp( -( np.sqrt( np.sum( np.square( np.divide( np.subtract(feature_i, feature_j), (2.0 * np.square(sigma)), ) ) ) ) ) ) return anisotropic_exp else: exponential = np.exp( -(np.linalg.norm(feature_i - feature_j)) / (2.0 * sigma ** 2) ) return exponential @dask.delayed def laplacian(feature_i, feature_j, i_symbol=None, j_symbol=None, sigma=1.0): """ Compute the laplacian kernel Parameters ---------- feature_i : np.array Atomic fingerprint for central atom. feature_j : np.array Atomic fingerprint for j atom. i_symbol : str Chemical symbol for central atom. j_symbol : str Chemical symbol for j atom. sigma : float Gaussian width. Returns ------- laplacian : float Laplacian kernel. """ if i_symbol != j_symbol: return 0.0 else: if isinstance(sigma, list) or isinstance(sigma, np.ndarray): assert len(sigma) == len(feature_i) and len(sigma) == len(feature_j), ( "Length of sigma does not " "match atomic fingerprint " "length." ) sigma = np.array(sigma) sum_ij = np.sum( np.square(np.divide(np.subtract(feature_i, feature_j), sigma)) ) anisotropic_lap = np.exp(-(np.sqrt(sum_ij))) return anisotropic_lap else: laplacian = np.exp(-(np.linalg.norm(feature_i - feature_j)) / sigma) return laplacian
[docs]def decode(symbol): """Decode from binary to string Parameters ---------- symbol : binary A string in binary form, e.g. b'hola'. Returns ------- str Symbol as a string. """ try: return symbol.decode() except AttributeError: return symbol