Source code for pyedgar.basis

# -*- coding: utf-8 -*-
"""Routines and Class definitions for constructing basis sets using the
diffusion maps algorithm.

@author: Erik

"""
from __future__ import absolute_import, division, print_function

import numpy as np
import scipy.sparse as sps
import scipy.sparse.linalg as spsl

from pydiffmap.diffusion_map import DiffusionMap
from .data_manipulation import _flat_to_orig, _as_flat


[docs]class DiffusionAtlas(object): """The diffusion atlas is a factory object for constructing diffusion map bases with various boundary conditions. """ def __init__(self, dmap_object=None): """ Builds the Diffusion Atlas a diffusion map object. Parameters ---------- dmap_object : pyDiffMap DiffusionMap object, optional Diffusion map object to use in the atlas. If None, uses default parameters, which are similar to the LSDmap. """ if dmap_object is None: dmap_object = DiffusionMap.from_sklearn(alpha=0, k=500, bandwidth_type='-1/d', epsilon='bgh_generous') self.dmap = dmap_object
[docs] @classmethod def from_kernel(cls, kernel_object, alpha=0.5, weight_fxn=None, density_fxn=None, bandwidth_normalize=False, oos='nystroem'): """ Builds the Diffusion Atlas using a pyDiffMap kernel. See the pyDiffMap.DiffusionMap constructor for a description of arguments. """ dmap = DiffusionMap(kernel_object=kernel_object, alpha=alpha, weight_fxn=weight_fxn, density_fxn=density_fxn, bandwidth_normalize=bandwidth_normalize, oos=oos) return cls(dmap)
[docs] @classmethod def from_sklearn(cls, alpha=0.5, k=64, kernel_type='gaussian', epsilon='bgh', neighbor_params=None, metric='euclidean', metric_params=None, weight_fxn=None, density_fxn=None, bandwidth_type=None, bandwidth_normalize=False, oos='nystroem'): """ Builds the Diffusion Atlas using the standard pyDiffMap kernel. See the pyDiffMap.DiffusionMap.from_sklearn for a description of arguments. """ dmap = DiffusionMap.from_sklearn(alpha=alpha, k=k, kernel_type=kernel_type, epsilon=epsilon, neighbor_params=neighbor_params, metric=metric, metric_params=metric_params, weight_fxn=weight_fxn, density_fxn=density_fxn, bandwidth_type=bandwidth_type, bandwidth_normalize=bandwidth_normalize, oos=oos) return cls(dmap)
[docs] def fit(self, data): """Constructs the diffusion map on the dataset. Parameters ---------- data : 2D array-like OR list of trajectories OR Flat data format Dataset on which to construct the diffusion map. """ # Default Parameter Selection and Type Cleaning data, edges, input_type = _as_flat(data) if len(data.shape) == 1: data = data.reshape(-1, 1) self.data = data self.edges = edges self.input_type = input_type self.dmap.construct_Lmat(data) return self
[docs] def make_dirichlet_basis(self, k, in_domain=None, return_evals=False): """Creates a diffusion map basis set that obeys the homogeneous Dirichlet boundary conditions on the domain. This is done by taking the eigenfunctions of the diffusion map submatrix on the domain. Parameters ---------- k : int Number of basis functions to create. in_domain : 1D array-like, OR list of such arrays, OR flat data format, optional Array of the same shape as the data, where each element is 1 or True if that datapoint is inside the domain, and 0 or False if it is in the domain. Naturally, this must be the length as the current dataset. If None (default), all points assumed to be in the domain. return_evals : Boolean, optional Whether or not to return the eigenvalues as well. These are useful for out of sample extension. Returns ------- basis : Dataset of same type as the data The basis functions evaluated on each datapoint. Of the same type as the input data. evals : 1D numpy array, optional The eigenvalues corresponding to each basis vector. Only returned if return_evals is True. """ submat = self.dmap.L npoints = submat.shape[0] # Take the submatrix if necessary if in_domain is not None: in_domain = _as_flat(in_domain)[0].ravel() domain = np.where(in_domain > 0)[0] submat = submat[domain][:, domain] evals, evecs = spsl.eigs(submat, k, which='LR') # Sort by eigenvalue. idx = evals.argsort()[::-1] evals = evals[idx] evecs = evecs[:, idx] # If using a submatrix, expand back to full size if in_domain is not None: full_evecs = np.zeros((npoints, k)) full_evecs[domain, :] = np.real(evecs) else: full_evecs = evecs full_evecs = _flat_to_orig(full_evecs, self.edges, self.input_type) if return_evals: return full_evecs, evals else: return full_evecs
[docs] def extend_dirichlet_basis(self, Y, in_domain, basis, evals): """ Performs out-of-sample extension an a dirichlet basis set. Parameters ---------- Y : 2D array-like OR list of trajectories OR flat data format Data for which to perform the out-of-sample extension. in_domain : 1D array-like, OR list of such arrays, OR flat data format Dataset of the same shape as the input datapoints, where each element is 1 or True if that datapoint is inside the domain, and 0 or False if it is in the domain. basis : 2D array-like OR list of trajectories OR Flat data format The basis functions. evals : 1D numpy array The eigenvalues corresponding to each basis vector. Returns ------- basis_extended : Dataset of same type as the data Transformed value of the given values. """ Y, edges, Y_input_type = _as_flat(Y) Y = np.asanyarray(Y) in_domain = _as_flat(in_domain)[0].ravel() basis = _as_flat(basis)[0] if len(Y.shape) == 1: Y = Y.reshape(-1, 1) if np.array_equal(Y, self.dmap.data): return _flat_to_orig(basis, edges, Y_input_type) if self.dmap.oos == "nystroem": basis_extended = nystroem_oos(self.dmap, Y, basis, evals) elif self.dmap.oos == "power": basis_extended = power_oos(self.dmap, Y, basis, evals) else: raise ValueError('Did not understand the OOS algorithm specified') outside_locs = np.where(1 - in_domain)[0] basis_extended[outside_locs] = 0. return _flat_to_orig(basis_extended, edges, Y_input_type)
[docs] def make_FK_soln(self, b, in_domain): """ Solves a Feynman-Kac problem on the data. Specifically, solves Lx = b on the domain and x=b off of the domain. In the DGA framework, this is intended to be used to solve for guess functions. Parameters ---------- b : 1D array-like, OR list of such arrays, OR flat data format. Dataset of the same shape as the input datapoints. Right hand side of the Feynman-Kac equation. in_domain : 1D array-like, OR list of such arrays, OR flat data format. Dataset of the same shape as the input datapoints, where each element is 1 or True if that datapoint is inside the domain, and 0 or False if it is in the domain. Returns ------- soln : Dataset of same type as the data. Solution to the Feynman-Kac problem. """ in_domain = _as_flat(in_domain)[0].ravel() b = _as_flat(b)[0].ravel() domain_locs = np.where(in_domain)[0] complement_locs = np.where(1. - in_domain)[0] # Solve the FK problem L_sub = self.dmap.L[domain_locs, :] L_comp = L_sub[:, complement_locs] b_comp = b[complement_locs] Lb = L_comp.dot(b_comp) L_sub = L_sub[:, domain_locs] # Add the boundary conditions back in. soln_sub = spsl.spsolve(L_sub, b[domain_locs] - Lb) soln = np.copy(b) soln[domain_locs] = soln_sub return _flat_to_orig(soln, self.edges, self.input_type)
[docs] def extend_FK_soln(self, soln, Y, b, in_domain): """ Extends the values of the Feynman-Kac solution onto new points. In the DGA framework, this is intended to be used to extend guess functions onto new datapoints. Parameters ---------- soln : Dataset of same type as the data. Solution to the Feynman-Kac problem on the original type. Y : 2D array-like OR list of trajectories OR flat data format Data for which to perform the out-of-sample extension. b :1D array-like, OR list of such arrays, OR flat data format. Values of the right hand-side for the OOS points. in_domain : 1D array-like, OR list of such arrays, OR flat data format. Dataset of the same shape as the input datapoints, where each element is 1 or True if that datapoint is inside the domain, and 0 or False if it is in the domain. Returns ------- extended_soln : Dataset of same type as the data. Solution to the Feynman-Kac problem. """ Y, edges, Y_input_type = _as_flat(Y) b = _as_flat(b)[0].ravel() in_domain = _as_flat(in_domain)[0].ravel() Y = np.asanyarray(Y) if len(Y.shape) == 1: Y = Y.reshape(-1, 1) domain_locs = np.where(in_domain)[0] Y_sub = Y[domain_locs] L_yx, L_yy = _get_L_oos(self.dmap, Y_sub) # L_yx, L_yy = _get_L_oos(self.dmap, Y soln_sub = b[domain_locs] - L_yx.dot(soln) soln_sub /= L_yy soln = np.copy(b) soln[domain_locs] = np.copy(soln_sub) return _flat_to_orig(soln, edges, Y_input_type)
[docs]def nystroem_oos(dmap_object, Y, evecs, evals): """ Performs Nystroem out-of-sample extension to calculate the values of the diffusion coordinates at each given point. Parameters ---------- dmap_object : DiffusionMap object Diffusion map upon which to perform the out-of-sample extension. Y : array-like, shape (n_query, n_features) Data for which to perform the out-of-sample extension. Returns ------- phi : numpy array, shape (n_query, n_eigenvectors) Transformed value of the given values. """ # check if Y is equal to data. If yes, no computation needed. # compute the values of the kernel matrix kernel_extended = dmap_object.local_kernel.compute(Y) weights = dmap_object._compute_weights(dmap_object.local_kernel.data) P = dmap_object._left_normalize(dmap_object._right_normalize(kernel_extended, dmap_object.right_norm_vec, weights)) oos_evecs = P * evecs # evals_p = dmap_object.local_kernel.epsilon_fitted * dmap_object.evals + 1. # oos_dmap = np.dot(oos_evecs, np.diag(1. / evals_p)) return oos_evecs
[docs]def power_oos(dmap_object, Y, evecs, evals): """ Performs out-of-sample extension to calculate the values of the diffusion coordinates at each given point using the power-like method. Parameters ---------- dmap_object : DiffusionMap object Diffusion map upon which to perform the out-of-sample extension. Y : array-like, shape (n_query, n_features) Data for which to perform the out-of-sample extension. Returns ------- phi : numpy array, shape (n_query, n_eigenvectors) Transformed value of the given values. """ L_yx, L_yy = _get_L_oos(dmap_object, Y) adj_evals = evals - L_yy.reshape(-1, 1) dot_part = np.array(L_yx.dot(evecs)) return (1. / adj_evals) * dot_part
def _get_L_oos(dmap_object, Y): M = int(Y.shape[0]) k_yx, y_bandwidths = dmap_object.local_kernel.compute(Y, return_bandwidths=True) # Evaluate on ref points yy_right_norm_vec = dmap_object._make_right_norm_vec(k_yx, y_bandwidths)[1] k_yy_diag = dmap_object.local_kernel.kernel_fxn(0, dmap_object.epsilon_fitted) data_full = np.vstack([dmap_object.local_kernel.data, Y]) k_full = sps.hstack([k_yx, sps.eye(M) * k_yy_diag]) right_norm_full = np.hstack([dmap_object.right_norm_vec, yy_right_norm_vec]) weights = dmap_object._compute_weights(data_full) P = dmap_object._left_normalize(dmap_object._right_normalize(k_full, right_norm_full, weights)) L = dmap_object._build_generator(P, dmap_object.epsilon_fitted, y_bandwidths) L_yx = L[:, :-M] L_yy = np.array(L[:, -M:].diagonal()) return L_yx, L_yy