Source code for teneto.trajectory.compression

"""Calculate compression of trajectory."""
import numpy as np
from ..utils import process_input


[docs]def create_traj_ranges(start, stop, N): """ Fills in the trajectory range. # Adapted from https://stackoverflow.com/a/40624614 """ steps = (1.0/(N-1)) * (stop - start) if np.isscalar(steps): return steps*np.arange(N) + start else: return steps[:, None]*np.arange(N) + start[:, None]
[docs]def rdp(datin, delta=1, report=10, quiet=True): """ """ # This needs to be added to utils for trajectory detection # T will be for trajectory or timeseries data, (roi x time). datin, datinfo = process_input(datin, ['C', 'G', 'TO', 'T']) # if network, then make to roi,time shape. If T, nothing needs to be done. if len(datin.shape) == 3: ind = np.triu_indices(datin.shape[0], k=1) datin = datin[ind[0], ind[1], :] index_out = np.array([ind[0], ind[1]]).transpose() else: index_out = np.arange(0, datin.shape[0]) # Interprets data to be rois x time. datin = np.array(datin, ndmin=2) s = 0 e = datin.shape[-1] # Create straight line between start and end point trajectory = create_traj_ranges( datin[:, s], datin[:, e-1], e-s) # Create lists of trajectories trajectory_points = [np.array([s, e-1]) for n in np.arange(datin.shape[0])] # Preset some outputs reduction = np.zeros(len(trajectory_points)) error = np.zeros(len(trajectory_points)) amp_weighted_error = np.zeros(len(trajectory_points)) # Stopping condition and round preset stop_cond = 0 round_count = 0 while stop_cond < datin.shape[0]: traj_data_diff = np.abs(datin-trajectory) ind = np.argmax(traj_data_diff, axis=1) for i, ix in enumerate(ind): ind_bool = traj_data_diff[i, ix] > delta if ind_bool: # Get trajectory breaking point indicies traj_ind = np.searchsorted( trajectory_points[i], ix, side='left') traj_start = trajectory_points[i][traj_ind-1] traj_end = trajectory_points[i][traj_ind] # Make new trajectories r1 = np.arange(traj_start, ix+1) trajectory[i, r1] = create_traj_ranges( trajectory[i, traj_start], datin[i, ix], len(r1)) r2 = np.arange(ix, traj_end+1) trajectory[i, r2] = create_traj_ranges( datin[i, ix], trajectory[i, traj_end], len(r2)) # Add new point trajectory_points[i] = np.insert( trajectory_points[i], traj_ind, ix) else: stop_cond += 1 round_count += 1 if np.remainder(round_count, report) == 0 and not quiet: print('After round: ' + str(round_count) + ', ' + str(np.round((stop_cond/datin.shape[0])*100, 2)) + '% completed data reduction') for i in np.arange(datin.shape[0]): reduction[i] = float(1-len(trajectory_points[i])/datin.shape[1]) error[i] = float(np.mean(traj_data_diff[i])) amp_weighted_error[i] = float( np.mean(np.abs(datin[i, :])*traj_data_diff[i])) # Construct ouput dictionary traj = datinfo traj['trajectory'] = trajectory traj['trajectory_points'] = trajectory_points traj['reduction'] = reduction traj['error'] = error traj['weighted_error'] = amp_weighted_error traj['nettype'] = 't' traj['index'] = index_out traj.pop('inputtype') return traj