# -*- coding: utf-8 -*-
"""A collection of useful functions for manipulating trajectory data and
dynamical basis set objects.
@author: Erik
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import numpy as np
[docs]def tlist_to_flat(trajs):
"""Flattens a list of two dimensional trajectories into a single two
dimensional datastructure, and returns it along with a list of tuples
giving the locations of each trajectory.
Parameters
----------
trajs : list of array-likes
List where each element n is a array-like object of shape N_n x d, where N_n is the number of data points in that trajectory and d is the number of coordinates for each datapoint.
Returns
-------
traj2D : 2D numpy array
Numpy array containing the flattened trajectory information.
traj_edges : 1D numpy array
Numpy array where each element is the start of each trajectory: the n'th trajectory runs from traj_edges[n] to traj_edges[n+1]
"""
# Check all trajectories are same order tensors.
traj_orders = np.array([len(np.shape(ti)) for ti in trajs])
if np.any(traj_orders != traj_orders[0]):
raise ValueError("Input Trajectories have varying dimension")
if traj_orders[0] == 1:
trajs = [t_i.reshape(-1, 1) for t_i in trajs]
# Get dimensions of traj object.
d = len(trajs[0][0])
# Populate the large trajectory.
traj_2d = []
traj_edges = [0]
len_traj_2d = 0
for i, traj in enumerate(trajs):
# Check that trajectory is of right format.
if len(np.shape(traj)) != 2:
raise ValueError('Trajectory %d is not two dimensional!' % i)
d2 = np.shape(traj)[1]
if d2 != d:
raise ValueError('Trajectories are of incompatible dimension. The first trajectory has dimension %d and trajectory %d has dimension %d' % (d, i, d2))
traj_2d += list(traj)
len_traj_2d += len(traj)
traj_edges.append(len_traj_2d)
return np.array(traj_2d), np.array(traj_edges)
[docs]def flat_to_tlist(traj_2d, traj_edges):
"""Takes a flattened trajectory with stop and start points and reformats it
into a list of separate trajectories.
Parameters
----------
traj2D : 2D numpy array
Numpy array containing the flattened trajectory information.
traj_edges : 1D numpy array
Numpy array where each element is the start of each trajectory: the n'th trajectory runs from traj_edges[n] to traj_edges[n+1]
Returns
-------
trajs : list of array-likes
List where each element n is a array-like object of shape N_n x d, where N_n is the number of data points in that trajectory and d is the number of coordinates for each datapoint.
"""
trajs = []
ntraj = len(traj_edges) - 1
for i in range(ntraj):
start = traj_edges[i]
stop = traj_edges[i + 1]
trajs.append(traj_2d[start:stop])
return trajs
[docs]def get_initial_final_split(traj_edges, lag=1):
"""Returns the incides of the points in the flat trajectory of the initial and final sample points.
In this context, initial means the first N-lag points, and final means the last N-lag points.
Parameters
----------
lag : int, optional
Number of timepoints in the future to look into the future for the transfer operator. Default is 1.
Returns
-------
t_0_indices : 1D numpy array
Indices in the flattened trajectory data of all the points at the initial times.
t_0_indices : 1D numpy array
Indices in the flattened trajectory data of all the points at the final times.
"""
ntraj = len(traj_edges) - 1
t_0_indices = []
t_lag_indices = []
for i in range(ntraj):
t_start = traj_edges[i]
t_stop = traj_edges[i + 1]
if (t_stop - t_start) > lag:
t_0_indices += range(t_start, t_stop - lag)
t_lag_indices += range(t_start + lag, t_stop)
return np.array(t_0_indices), np.array(t_lag_indices)
[docs]def delay_embed(traj_data, n_embed, lag=1, verbosity=0):
"""Performs delay embedding on the trajectory data. Takes in trajectory
data of format types, and returns the delay embedded data in the same type.
Parameters
----------
traj_data : list of arrays OR tuple of two arrays OR single numpy array
Dynamical data on which to perform the delay embedding. This can be of multiple types, and the type dictates the format of the data.
Specifically, it can be either a list of trajectories, the internal flattened format, or a single trajectory in the form of an array.
n_embed : int
The number of delay embeddings to perform.
lag : int, optional
The number of timesteps to look back in time for each delay. Default is 1.
verbosity : int, optional
The level of status messages that are output. Default is 0 (no messages).
Returns
-------
embedded_data : list of arrays OR tuple of two arrays OR single numpy array
Dynamical data with delay embedding performed, of the same type as the trajectory data.
"""
if type(traj_data) is list:
input_type = 'list_of_trajs'
tlist = traj_data
elif type(traj_data) is tuple:
input_type = 'flat'
tlist = flat_to_tlist(traj_data[0], traj_data[1])
elif type(traj_data) is np.ndarray:
input_type = 'single_array'
tlist = [traj_data]
else:
raise ValueError("Unable to recognize the format of the input from the type: type must either be tuple, list, or numpy array")
embed_traj_list = []
for i, traj_i in enumerate(tlist):
N_i = len(traj_i)
if N_i - (lag * n_embed) <= 0: # Must be longer than max embedding
continue
embed_traj_i = []
for n in range(n_embed+1):
start_ndx = lag * (n_embed - n)
stop_ndx = N_i - (lag * n)
embed_traj_i.append(traj_i[start_ndx:stop_ndx])
embed_traj_i = np.concatenate(embed_traj_i, axis=1)
embed_traj_list.append(embed_traj_i)
if input_type == 'list_of_trajs':
return embed_traj_list
elif input_type == 'flat':
return tlist_to_flat(embed_traj_list)
elif input_type == 'single_array':
return embed_traj_list[0]
[docs]def lift_function(function, n_embed, lag=1):
"""
Lift a function into the delay-embedded space.
"""
if type(function) is list:
input_type = 'list_of_trajs'
tlist = function
elif type(function) is tuple:
input_type = 'flat'
tlist = flat_to_tlist(function[0], function[1])
elif type(function) is np.ndarray:
input_type = 'single_array'
tlist = [function]
else:
raise ValueError("Unable to recognize the format of the input from the type: type must either be tuple, list, or numpy array")
lifted_fxn = []
for i, fxn_i in enumerate(tlist):
N_i = len(fxn_i)
if N_i - (lag * n_embed) <= 0: # Must be longer than max embedding
continue
sub_fxn = fxn_i[int(n_embed/2):int(N_i-(n_embed/2))]
lifted_fxn.append(sub_fxn)
if input_type == 'list_of_trajs':
return lifted_fxn
elif input_type == 'flat':
return tlist_to_flat(lifted_fxn)
elif input_type == 'single_array':
return lifted_fxn[0]
def _as_flat(traj_data):
if type(traj_data) is list:
input_type = 'list_of_trajs'
flat, edges = tlist_to_flat(traj_data)
elif type(traj_data) is tuple:
input_type = 'flat'
flat, edges = traj_data
elif type(traj_data) is np.ndarray:
input_type = 'single_array'
flat, edges = tlist_to_flat([traj_data])
else:
raise ValueError("Unable to recognize the format of the input from the type: type must either be tuple, list, or numpy array")
return flat, edges, input_type
def _flat_to_orig(traj, edges, input_type):
if input_type == 'list_of_trajs':
return flat_to_tlist(traj, edges)
elif input_type == 'flat':
return traj, edges
elif input_type == 'single_array':
return traj