r"""
This module defines tools for geometric analysis of one-dimensional
(time-series) data sets. The main classes are
- :class:`Signal`
- :class:`SpaceCurve`
See `timeseries-data` for a more general outline.
Copyright
---------
- This file is part of https://github.com/geomdata/gda-public/
- 2015, 2016, 2017 by Geometric Data Analytics, Inc. (http://geomdata.com)
- AGPL license. See `LICENSE` or https://github.com/geomdata/gda-public/blob/master/LICENSE
Examples
--------
>>> el = Signal([2.0, 3.0, 0.0, 5.0, 2.5, 2.9])
>>> el.make_pers()
>>> el.pers.diagram
birth_index death_index birth death pers
0 0 1 2.0 3.0 1.0
1 4 3 2.5 5.0 2.5
2 2 3 0.0 5.0 5.0
>>> bin_counts, bins = el.pers.bin(1.0)
>>> print(bins)
[-inf 0. 1. 2. 3. 4. 5. inf]
>>> print(bin_counts)
[[ 0. 0. 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0. 0. 1.]
[ 0. 0. 0. 0. 0. 0. 0.]
[ 0. 0. 1. 1. 0. 0. 0.]
[ 0. 0. 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0. 0. 0.]]
>>> sorted(list(el.pers.domains.keys()))
[(0, 1), (2, 3), (3, 4)]
>>> for interval in el.iter_intervals():
... print("h_A( {} ) == {}".format(interval,el.interval_height(interval)))
h_A( (0, 1) ) == 1.0
h_A( (0, 2) ) == 0.0
h_A( (0, 3) ) == 0.0
h_A( (0, 4) ) == 0.0
h_A( (0, 5) ) == 0.0
h_A( (1, 2) ) == 0.0
h_A( (1, 3) ) == 0.0
h_A( (1, 4) ) == 0.0
h_A( (1, 5) ) == 0.0
h_A( (2, 3) ) == 5.0
h_A( (2, 4) ) == 0.0
h_A( (2, 5) ) == 0.0
h_A( (3, 4) ) == 2.5
h_A( (3, 5) ) == 0.0
h_A( (4, 5) ) == 0.0
>>> list(el.pers.forest.keys())
[None]
>>> sorted(list(el.pers.forest[None]))
[0, 1, 2]
>>> el.jagged(6.0)
0 0.0
1 0.0
2 0.0
3 0.0
4 0.0
5 0.0
dtype: float64
>>> el = Signal([6.5,0.0,2.0])
>>> el.make_pers()
>>> el.pers.diagram
birth_index death_index birth death pers
0 1 0 0.0 6.5 6.5
>>> list(el.pers.forest[None])
[0]
>>> el.vertices
time height
0 0.0 6.5
1 1.0 0.0
2 2.0 2.0
>>> el.jagged() #el.pers.diagram,el.vertices.index,el.vertices.dtype)
0 6.5
1 0.0
2 0.0
dtype: float64
>>> elN = el.sample_near(sigma=0.1)
>>> elN.make_pers()
>>> elN.pers.domains == el.pers.domains
True
>>> el = Signal([6.5,7.0,2.0,4.5,3.9,9.0,8.3,8.7,5.5,9.9])
>>> el.vertices
time height
0 0.0 6.5
1 1.0 7.0
2 2.0 2.0
3 3.0 4.5
4 4.0 3.9
5 5.0 9.0
6 6.0 8.3
7 7.0 8.7
8 8.0 5.5
9 9.0 9.9
>>> el.edges
src dst max height
2 2 3 3 4.5
3 3 4 3 4.5
0 0 1 1 7.0
1 1 2 1 7.0
6 6 7 7 8.7
7 7 8 7 8.7
4 4 5 5 9.0
5 5 6 5 9.0
8 8 9 9 9.9
>>> el.make_pers()
>>> el.pers.diagram
birth_index death_index birth death pers
2 6 7 8.3 8.7 0.4
1 0 1 6.5 7.0 0.5
0 4 3 3.9 4.5 0.6
3 8 5 5.5 9.0 3.5
4 2 9 2.0 9.9 7.9
>>> el.feature()
time height
2 2.0 2.0
3 3.0 4.5
4 4.0 3.9
5 5.0 9.0
6 6.0 8.3
7 7.0 8.7
8 8.0 5.5
9 9.0 9.9
>>> el.pers.syzygy((0,0))
2 0.4
1 0.5
0 0.6
3 3.5
4 7.9
dtype: float64
>>> el.pers.syzygy((1,2))
2 251.2908
1 159.2500
0 47.3850
3 1559.2500
4 1548.5580
dtype: float64
>>> el.pers.forest_parents == {0: 4, 1: None, 2: 3, 3: 4, 4: None}
True
>>> el.vertices['height'].sum()/len(el.vertices)
6.5299999999999994
>>> el.normalize()
>>> el2 = Signal(el.vertices)
>>> el2.make_pers()
>>> all(el2.pers.diagram == el.pers.diagram)
True
>>> el = Signal([0.0,3.0,1.5,2.2,0.001])
>>> el.make_pers()
>>> el.vertices
time height
0 0.0 0.000
1 1.0 3.000
2 2.0 1.500
3 3.0 2.200
4 4.0 0.001
>>> el.edges
src dst max height
2 2 3 3 2.2
3 3 4 3 2.2
0 0 1 1 3.0
1 1 2 1 3.0
>>> el.pers.diagram
birth_index death_index birth death pers
0 2 3 1.500 2.2 0.700
1 4 1 0.001 3.0 2.999
2 0 1 0.000 3.0 3.000
>>> el = Signal([0.0,0.7,0.45,0.55,0.3, 1.0],
... times=[0.1, 0.2, 0.3, 0.6, 0.8, 0.85])
>>> el.vertices
time height
0 0.10 0.00
1 0.20 0.70
2 0.30 0.45
3 0.60 0.55
4 0.80 0.30
5 0.85 1.00
>>> el.edges
src dst max height
2 2 3 3 0.55
3 3 4 3 0.55
0 0 1 1 0.70
1 1 2 1 0.70
4 4 5 5 1.00
>>> el.make_pers()
>>> el.pers.diagram
birth_index death_index birth death pers
0 2 3 0.45 0.55 0.1
1 4 1 0.30 0.70 0.4
2 0 5 0.00 1.00 1.0
>>> el = Signal([0.0,0.5,0.4,0.9,0.1,1.0])
>>> el.make_pers()
>>> el.pers.diagram
birth_index death_index birth death pers
0 2 1 0.4 0.5 0.1
1 4 3 0.1 0.9 0.8
2 0 5 0.0 1.0 1.0
>>> for F in el.iter_features(min_pers=0.5): print(F)
time height
0 0.0 0.0
1 1.0 0.5
2 2.0 0.4
3 3.0 0.9
4 4.0 0.1
5 5.0 1.0
time height
3 3.0 0.9
4 4.0 0.1
>>> el = Signal(np.sin(np.arange(0,8*np.pi,0.1)))
>>> el.make_pers()
>>> el.pers.domains == {(110, 204): 4, (204, 236): 3, (0, 16): 2, (47, 79): 0, (141, 173): 1}
True
>>> s = Signal([50.0, 120.0, 100, 180, 200, 150, 135])
>>> s.make_pers()
>>> s.pers.diagram
birth_index death_index birth death pers
0 2 1 100.0 120.0 20.0
1 6 4 135.0 200.0 65.0
2 0 4 50.0 200.0 150.0
>>> s.pers.mergetree
{1: (0, 2), 4: (1, 6)}
"""
import collections, itertools
import numpy as np
import pandas as pd
import scipy.spatial.distance as ssd
from . import fast_algorithms, curve_geometry
import homology.dim0
[docs]def jagged(persdiag, index):
""" Produce a piecewise-linear function that matches the given persistence
diagram. This assumes that the index for the vertices is sequential and
linear, so that the mergetree can be ignored.
Parameters
----------
persdiag : :class:`homology.PersDiag`
A 0-dimensional persistence diagram
index : list-like
The domain index for the function
Returns
-------
function : `pandas.Series`
See Also
--------
:func:`timeseries.Signal.jagged` :func:`timeseries.Signal.makepers`
"""
V = pd.Series(index=index, dtype=np.float64)
if len(persdiag) == 0:
V[:] = 0
V[persdiag['birth_index']] = persdiag['birth']
V[persdiag['death_index']] = persdiag['death']
# fill with linear interpolation
V.interpolate(method='linear', inplace=True)
# make sure we don't lose domain
V.fillna(method='ffill', inplace=True) # the end
V.fillna(method='bfill', inplace=True) # the beginning
# MUST ADD CIRCULAR INTERPOLATION!
return V
def wavy(pasrdiad, index):
r""" Produce a piecewise-sine function that matches the given persistence
diagram. This assumes that the index for the vertices is sequential and
linear, so that the mergetree can be ignored.
Parameters
----------
persdiag : :class:`homology.PersDiag`
A 0-dimensional persistence diagram
index : list-like
The domain index for the function
Returns
-------
function : `pandas.Series`
See Also
--------
:func:`timeseries.Signal.jagged` :func:`timeseries.Signal.makepers`
"""
#V = pd.Series(index=index, dtype=np.float64)
#if len(persdiag) == 0:
# V[:] = 0
#V[persdiag['birth_index']] = persdiag['birth']
#V[persdiag['death_index']] = persdiag['death']
## fill with linear interpolation
#V.interpolate(method='linear', inplace=True)
## make sure we don't lose domain
#V.fillna(method='ffill', inplace=True) # the end
#V.fillna(method='bfill', inplace=True) # the beginning
# MUST ADD CIRCULAR INTERPOLATION!
#return V
[docs]class Signal(object):
def __init__(self, values, times=None):
""" produce an Signal from function output values. function input
domain is implicitly range(len(values)). The resulting Signal has
vertices with function values and edges with max of adjacent pairs.
A Signal is a pair of pd DataFrames that act as indexed
lists of numerical values. The vertices are stored as a pd Series,
Signal.index provides the vertex indices.
The edges are a DataFrame, giving triples labeled by
('src', 'dst', 'max', 'height'), where src and dst are the *indices* (not
the *values*) of vertices. Vertices should be considered fixed on
initialization. Overloaded functions refer to edges, not vertices.
"""
if type(values) == pd.core.frame.DataFrame:
assert values.index.dtype == np.int64
assert all(values.columns == ['time', 'height'])
assert values['height'].dtype == np.float64
assert values['time'].dtype == np.float64
self.vertices = values.copy()
else:
values = np.array(values, dtype=np.float64)
assert len(values.shape) == 1
N = values.shape[0]
if times is None:
times = np.arange(N, dtype=np.float64)
index = np.arange(N, dtype=np.int64)
self.vertices = pd.DataFrame({
'time': times,
'height': values},
columns=['time', 'height'],
index=index)
# if times is not None:
# self.times
#
# if type(values) == pd.core.series.Series and values.index.dtype == 'int64':
# self.vertices = values.copy()
# else:
# self.vertices = pd.Series(values, dtype=np.float64,
# index=np.arange(len(values), dtype=np.int64))
# figure out whether the value is from the left (0) or right (1)
# this is now done in homology.dim0.unionfind.
leftright = np.array([self.vertices['height'].values[:-1],
self.vertices['height'].values[1:]]).argmax(axis=0)
maxes = pd.Series(self.vertices.index[:-1] + leftright, dtype=np.int64)
edges = pd.DataFrame({
'src': self.vertices['height'].index[:-1],
'dst': self.vertices['height'].index[1:],
'max': maxes.values,
'height': self.vertices['height'].values[maxes]},
columns=['src', 'dst', 'max', 'height'])
self.edges = pd.DataFrame(edges)
self.edges.sort_values(by=['height', 'src'],
ascending=[True, True],
inplace=True)
self.components=pd.Series(self.vertices.index,
index=self.vertices.index)
self.pers = None
# assert pos.size == len(self.edges.index)
# self.edges['pos'] = pos
[docs] def make_pers(self):
tbirth_index, tdeath_index, ybirth_index, ydeath_index, mergetree = homology.dim0.unionfind(self, -1.0)
self.pers = homology.PersDiag(tbirth_index, tdeath_index, ybirth_index, ydeath_index, mergetree)
def __len__(self):
return len(self.vertices)
[docs] def gap(self):
""" Return the largest homology. """
bc = self.pers.diagram
if len(bc) > 0:
return bc.loc[bc.index[-1]] # sorted by length!
else:
return bc
[docs] def iter_intervals(self):
""" return the itertools combinations iterator over all sub-intervals.
"""
return itertools.combinations(self.vertices.index.values, 2)
[docs] def interval_height(self, interval):
""" the indicator-persistence function for intervals,
called h_U(A) in the notes.
"""
if interval in self.pers.domains:
index = self.pers.domains[interval]
return self.pers.diagram['pers'].loc[index]
return 0.0
[docs] def sample_near(self, sigma=1.0):
""" return a Signal object that is L2-near self in the normal
distribution.
"""
diff = np.random.randn(self.vertices['height'].values.shape[0])
return self.__class__(self.vertices['height'].values + sigma*diff)
[docs] def height_measure(self, sigma=1.0, num_samples=1000, parallel=True, min_pers=0):
""" Use a simulation to estimate the height-measure of an interval. """
all_data = [(self, sigma) for _ in range(num_samples)]
if parallel:
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(max_workers=None)
all_heights = list(pool.map(fast_algorithms.sample_height, all_data))
else:
all_heights = [fast_algorithms.sample_height(x) for x in all_data]
all_heights = np.stack(all_heights)
if min_pers > 0:
all_heights[all_heights < min_pers] = 0
tot_heights = all_heights.sum(axis=0)/num_samples
measures = dict()
for U in np.stack(tot_heights.nonzero()).T:
i, j = U
length = j-i+1
if length not in measures:
measures[length] = dict()
measures[length][i] = tot_heights[i, j]
return measures
[docs] def feature(self, index=None):
""" return the region of a feature, with vertical displacement threshold tau. """
if index is None:
index = self.gap().name
left = self.pers.lefts[index]
right = self.pers.rights[index]
V = self.vertices[left:right+1]
return V
[docs] def iter_features(self, min_pers=0, depth_first=False):
""" walk the feature tree. """
bc = self.pers.diagram
tree = self.pers.forest
to_walk = collections.deque(tree[None]) # start at root of tree
while to_walk:
idx = to_walk.popleft()
if bc.loc[idx]['pers'] > min_pers:
yield self.feature(index=idx)
if depth_first:
to_walk.extendleft(tree[idx])
else:
to_walk.extend(tree[idx])
[docs] def feature_match(self, other, sigma=1.0, num_samples=1000):
total_match = 0
for left, right in self.iter_intervals():
feature = self.vertices[left:right+1].values
match_number = other.profile(feature).max()
interval_height = self.height_measure((left, right),
sigma=sigma,
num_samples=num_samples)
total_match += match_number * interval_height
return total_match
[docs] def jagged(self, beta=0):
""" call :func:`timeseries.jagged` on this :class:`Signal`'s
own persistence diagram. This effectively makes a piecewise-linear
version of the same function, with the same extrema. """
# simple cache
try:
self._jagged
if beta in self._jagged:
return self._jagged[beta]
except AttributeError as e:
self._jagged = dict()
keepbc = self.pers.clip(beta)['keepcode']
self._jagged[beta] = jagged(keepbc,
self.vertices.index)
return self._jagged[beta]
[docs] def smoothen(self, beta):
T = self.vertices['time'].values
F = self.vertices['height'].values
N = T.shape[0]
pd = self.pers.clip(beta)['keepcode']
cut_indices = np.concatenate([pd[['birth_index', 'death_index']].values.flatten(),
np.array([0, N], dtype='int')])
cut_indices = np.unique(cut_indices)
times = [ ]
segments = [ ]
for j0,j1 in zip(cut_indices[:-1], cut_indices[1:]):
times.append(T[j0:j1])
if F[j0] > F[min(j1,N-1)]:
segments.append(np.sort(F[j0:j1])[::-1])
elif F[j0] <= F[min(j1,N-1)]:
segments.append(np.sort(F[j0:j1]))
#else:
# assert F[j0:j1].min() == F[j0:j1].max()
times = np.concatenate(times)
segments = np.concatenate(segments)
assert np.all(np.sort(segments) == np.sort(F))
assert np.all(times == T)
return Signal(segments, times=times)
[docs] def profile(self, arch, normalize=False, norm=np.linalg.norm):
""" produce profile by dragging an archetype across self,
looking for matches. You may want to normalize arch first.
"""
a = len(arch)
v = len(self.vertices)
assert a <= v, "Archetype is too long."
d = v - a + 1
# if normalize: arch = (arch-arch.mean())/(arch.max() - arch.min())
s = []
for i in range(d):
snip = self.vertices.values[i:a+i]
if normalize:
snip = (snip-snip.mean())/(snip.max() - snip.min())
s.append(norm(arch - snip))
p = np.exp2(-np.array(s))
return p
# if len(p) > 1:
# P = Signal(p)
# return P
[docs] def iter_windows_by_index(self, window, step=1, start=None, stop=None):
""" Produce equal-length Signals using a sliding-window on self. This
slides by index, not by abstract time.
window = length of window (number of indices)
time_step = step size in index. (default = 1)
time_start = start index (default = None, min index)
time_stop = stop index (default = None, max index)
normalize = renormalize by N(0,1) on window? (default = False)
norm = norm function to use for comparison (default = np.linalg.norm)
return: iterator of np arrays, whose columns are timestamp, value.
To access the tracks as a list,use
list(Signal.iter_windows_index())
Examples
--------
>>> S = Signal([2.0,3.0,0.0,5.0,2.5,2.9])
>>> for s in S.iter_windows_by_index(4, step=2):
... print(s[:,1])
[ 2. 3. 0. 5.]
[ 0. 5. 2.5 2.9]
"""
if start is None:
start = self.vertices.index.values[0]
if stop is None:
stop = self.vertices.index.values[-1]
slices = np.arange(start, stop, step)
for i, start_i in enumerate(slices):
stop_i = start_i + window
yield self.vertices.values[start_i:stop_i, :]
if stop_i >= stop:
break
[docs] def self_similarity(self, window, step=1, start=None, stop=None,
dist=ssd.euclidean, normalizer=None):
""" Compare sliding windows of this Signal using a distance function.
Parameters
----------
window (length of segment)
step (steps to move between windows)
start (index to start at)
stop (index to stop at)
dist (distance function to use. Default:`scipy.spatial.distance.euclidean`
normalizer (function to use to renormalize each window. default:None)
Returns
-------
an iterator of the window comparisons.
(0,0), (0,1), (0,2), ... (0, n-1), (1,1), (1,2), ... (n-2, n-1)
The return elements are pairs ((index_lo, index_hi), norm), which
can be used to populate a dictionary or array.
Examples
--------
>>> S = Signal([0.0, 0.0, 3.0, 4.0, 6.0, 8.0])
>>> Sss = list(S.self_similarity(window=2, step=2))
>>> for (ij, d) in Sss:
... print("{} -> {}".format(ij,d))
(0, 0) -> 0.0
(0, 2) -> 5.0
(0, 4) -> 10.0
(2, 2) -> 0.0
(2, 4) -> 5.0
(4, 4) -> 0.0
>>> D = np.array([d for ij,d in Sss])
>>> print(D)
[ 0. 5. 10. 0. 5. 0.]
>>> print(ssd.squareform(D)) # scipy.spatial.distance
[[ 0. 0. 5. 10.]
[ 0. 0. 0. 5.]
[ 5. 0. 0. 0.]
[ 10. 5. 0. 0.]]
"""
if start is None:
start = self.vertices.index.values[0]
if stop is None:
stop = self.vertices.index.values[-1]
slices = np.arange(start, stop, step)
for i, start_i in enumerate(slices):
stop_i = start_i + window
win_i = self.vertices.values[start_i:stop_i, :]
if normalizer is not None:
win_i = normalizer(win_i)
for j, start_j in enumerate(slices[i:]):
stop_j = start_j + window
win_j = self.vertices.values[start_j:stop_j, :]
if normalizer is not None:
win_j = normalizer(win_j)
yield ((start_i, start_j), dist(win_i[:, 1], win_j[:, 1]))
if stop_i >= stop:
break
#
[docs] def plot(self, canvas, title="Signal"):
""" Plot the Signal.
Parameters
----------
canvas : class:`bokeh.plotting.figure.Figure` or :class:`matplotlib.axes._subplots.AxesSubplot`
A bokeh or pyplot canvas to draw on. Create one with
:code:`canvas = bokeh.plotting.figure()` or
:code:`fig,canvas = matplotlib.pyplot.subplots()`
Notes
-----
You have to save or show axes after running this call.
"""
if type(canvas).__module__ == 'bokeh.plotting.figure':
canvas_type = "bokeh"
import bokeh.plotting
elif type(canvas).__module__ == 'matplotlib.axes._subplots':
canvas_type = "pyplot"
import matplotlib.pyplot as plt
else:
raise NotImplementedError(
"canvas must be a bokeh.plotting.figure() or a matplotlib.pyplot.subplots()[1]. You gave me {}".format(type(canvas))
)
if canvas_type == "bokeh":
canvas.title=title
elif canvas_type == "pyplot":
canvas.set_title(title)
if canvas_type == "bokeh":
canvas.circle(self.vertices['time'].values, self.vertices['height'].values)
canvas.line(self.vertices['time'].values, self.vertices['height'].values)
elif canvas_type == "pyplot":
canvas.scatter(self.vertices['height'].index.values, self.vertices['height'].values)
canvas.plot(self.vertices['height'].index.values, self.vertices['height'].values)
pass
[docs] @classmethod
def from_pointcloud(cls, points, direction, norm):
values = np.dot(points, direction/norm(direction))
return cls(values)
[docs] def normalize(self):
""" change this Signal object to have mean = 0 and max-min = 1 """
bc=self.pers.diagram
h = self.gap()['pers']
mean = self.vertices['height'].mean()
self.vertices['height'] = (self.vertices['height'] - mean)/h
self.edges['height'] = (self.edges['height'] - mean)/h
bc['birth'] = (bc['birth'] - mean)/h
bc['death'] = (bc['death'] - mean)/h
bc['pers'] = (bc['pers'])/h
pass
[docs]class SpaceCurve(object):
r""" SpaceCurve is a Python class for studying curves in
:math:`\mathbb{R}^2` or :math:`\mathbb{R}^3`.
For example, a SpaceCurve could represent kinematic flight data, or
trajectories of vehicles given by GPS coordinates.
All arguments other than :code:`tn` are optional.
Parameters
----------
tn : list-like
Integer timestamps, typically in 'numpy.int64` nanoseconds-since-epoch
px : list-like
py : list-like
pz : list-like
Positions in :class:`numpy.float64` meters
quality : list-like
Quality/accuracy of a particular location
trackid : int
An integer label for the track
platform : str
A descriptive label
activity : str
A descriptive label
mollified: bool
Whether this track has undegone mollification, for example with :func:`clean_copy`
Attributes
----------
data : :class:`pandas.DataFrame`
The original position and velocity data, as originally provided.
The index of this DataFrame is the :code:`tn` integer time index.
info : :class:`pandas.DataFrame`
Data computed using various algorithms. This is filled by
:func:`compute`, but more can be added.
"""
def __init__(self, tn, px=None, py=None, pz=None,
# vx=None, vy=None, vz=None,
quality=None,
trackid=-1, platform=None, activity=None, mollified=False):
tn = np.array(tn, dtype=np.int64)
assert len(tn) > 0,\
"A SpaceCurve cannot have empty nanosecond index. You gave me {}".format(tn)
if px is None:
px = np.zeros(tn.shape, dtype=np.float64)
if py is None:
py = np.zeros(tn.shape, dtype=np.float64)
if pz is None:
pz = np.zeros(tn.shape, dtype=np.float64)
#if vx is None:
# vx = np.zeros(tn.shape, dtype=np.float64)
#if vy is None:
# vy = np.zeros(tn.shape, dtype=np.float64)
#if vz is None:
# vz = np.zeros(tn.shape, dtype=np.float64)
if quality is None:
quality = -np.ones(tn.shape, dtype=np.int64)
px = np.array(px, dtype=np.float64)
py = np.array(py, dtype=np.float64)
pz = np.array(pz, dtype=np.float64)
#vx = np.array(vx, dtype=np.float64)
#vy = np.array(vy, dtype=np.float64)
#vz = np.array(vz, dtype=np.float64)
quality = np.array(quality, dtype=np.int64)
assert len(tn) == len(px)
assert len(tn) == len(py)
assert len(tn) == len(pz)
#assert len(tn) == len(vx)
#assert len(tn) == len(vy)
#assert len(tn) == len(vz)
assert len(tn) == len(quality)
sort_by_time = tn.argsort()
tn = tn[sort_by_time]
px = px[sort_by_time]
py = py[sort_by_time]
pz = pz[sort_by_time]
#vx = vx[sort_by_time]
#vy = vy[sort_by_time]
#vz = vz[sort_by_time]
quality = quality[sort_by_time]
ts = (tn - tn[0]).astype(np.float64) / (10 ** 9)
self.data = pd.DataFrame({'time': ts,
'pos_x': px,
'pos_y': py,
'pos_z': pz,
#'vel_x': vx,
#'vel_y': vy,
#'vel_z': vz,
'quality': quality},
columns=['time', 'pos_x', 'pos_y', 'pos_z',
#'vel_x', 'vel_y', 'vel_z',
'quality'],
index=tn)
self.info = pd.DataFrame({}, index=self.data.index)
self.trackid = trackid
self.platform = platform
self.activity = activity
self.mollified = mollified
def __getitem__(self, key):
""" get raw data via index """
return self.data.loc[self.data.index[key]]
[docs] def accel(self, rate):
r"""Change time parametrization, to represent a constant tangential
acceleration (or deceleration). Locations, initial time, and
arc-length are preserved.
The first timestep is changed to have (1-rate) times the speed of the
original's first timestep. The last timestep is changed to have
(1+rate) times the speed of the original's last timestep.
That is, if S is a `SpaceCurve` of constant speed, then S.accel(-0.1)
will start 10% faster and end 10% slower than S.
If speed[i] changes to speed[i]*q[i], then delta_t[i] changes to
delta_t[i]*p[i], where p[i] = 1 / q[i]
Examples
--------
>>> tn = np.arange(0, 5*1e9, 1e9)
>>> s = SpaceCurve(tn=tn, px=10.0*tn/1e9) # drive straight at 10m/s
>>> s
SpaceCurve with 5 entries and duration 4.000000000
>>> s.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
1000000000 1.0 10.0 0.0 0.0 -1
2000000000 2.0 20.0 0.0 0.0 -1
3000000000 3.0 30.0 0.0 0.0 -1
4000000000 4.0 40.0 0.0 0.0 -1
>>> s.compute()
>>> s.info['speed']
0 10.0
1000000000 10.0
2000000000 10.0
3000000000 10.0
4000000000 10.0
Name: speed, dtype: float64
>>> a = s.accel(0.25)
>>> a
SpaceCurve with 5 entries and duration 4.147319347
>>> a.compute()
>>> a.info['speed']
0 7.500000
1333333333 8.250000
2424242424 9.930556
3347319347 11.607143
4147319347 12.500000
Name: speed, dtype: float64
>>> a.data
time pos_x pos_y pos_z quality
0 0.000000 0.0 0.0 0.0 -1
1333333333 1.333333 10.0 0.0 0.0 -1
2424242424 2.424242 20.0 0.0 0.0 -1
3347319347 3.347319 30.0 0.0 0.0 -1
4147319347 4.147319 40.0 0.0 0.0 -1
>>> b = s.accel(-0.25)
>>> b
SpaceCurve with 5 entries and duration 4.147319347
>>> b.compute()
>>> b.info['speed']
0 12.500000
800000000 11.607143
1723076923 9.930556
2813986013 8.250000
4147319347 7.500000
Name: speed, dtype: float64
>>> b.data
time pos_x pos_y pos_z quality
0 0.000000 0.0 0.0 0.0 -1
800000000 0.800000 10.0 0.0 0.0 -1
1723076923 1.723077 20.0 0.0 0.0 -1
2813986013 2.813986 30.0 0.0 0.0 -1
4147319347 4.147319 40.0 0.0 0.0 -1
"""
n = len(self.data)
ts = self.data['time'].values
change_of_speed = np.linspace(1.-rate, 1.+rate, num=n-1)
new_Dt = np.diff(ts)/change_of_speed
accum_time = np.cumsum(new_Dt)
new_ts = np.ndarray(shape=ts.shape, dtype=ts.dtype)
new_ts[0] = ts[0]
new_ts[1:] = accum_time + ts[0]
new_tn = np.int64(new_ts*10**9)
return SpaceCurve(new_tn,
px=self.data['pos_x'].values,
py=self.data['pos_y'].values,
pz=self.data['pos_z'].values)
def __matmul__(self, array):
""" Apply a matrix (NumPy array) to the positions to produce a new
SpaceCurve. Used for rotating SpaceCurves.
Note that this is the LEFT action from a group-theory perspective.
Examples
--------
>>> sc = SpaceCurve(np.arange(4)*10**9, px=np.arange(4))
>>> sc.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
1000000000 1.0 1.0 0.0 0.0 -1
2000000000 2.0 2.0 0.0 0.0 -1
3000000000 3.0 3.0 0.0 0.0 -1
>>> g = np.array([[np.cos(np.pi/6), -np.sin(np.pi/6), 0],
... [np.sin(np.pi/6), np.cos(np.pi/6), 0],
... [ 0, 0, 1]])
>>> sc2 = sc.__matmul__(g).__matmul__(g).__matmul__(g) # use @ in Py3
>>> np.all(sc2.data['pos_y'].values == sc.data['pos_x'].values)
True
"""
pos = self.data[['pos_x', 'pos_y', 'pos_z']].values
new_pos = pos.dot(array.T)
return SpaceCurve(self.data.index,
px=new_pos[:, 0],
py=new_pos[:, 1],
pz=new_pos[:, 2])
def __add__(self, other):
""" Concatenate SpaceCurves, end-to-end in space and time. The other
SpaceCurve is set to begin at a time and position where self ends.
This way, the number of points shrinks by one, but the total duration
adds.
Examples
--------
>>> sc1 = SpaceCurve(np.arange(4)*10**9, px=np.arange(4), py=2*np.arange(4))
>>> sc1
SpaceCurve with 4 entries and duration 3.000000000
>>> sc1.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
1000000000 1.0 1.0 2.0 0.0 -1
2000000000 2.0 2.0 4.0 0.0 -1
3000000000 3.0 3.0 6.0 0.0 -1
>>> sc2 = SpaceCurve(np.arange(8)*10**8, px=np.arange(0,0.4,0.05), py=3*np.arange(8))
>>> sc2
SpaceCurve with 8 entries and duration .700000000
>>> sc2.data
time pos_x pos_y pos_z quality
0 0.0 0.00 0.0 0.0 -1
100000000 0.1 0.05 3.0 0.0 -1
200000000 0.2 0.10 6.0 0.0 -1
300000000 0.3 0.15 9.0 0.0 -1
400000000 0.4 0.20 12.0 0.0 -1
500000000 0.5 0.25 15.0 0.0 -1
600000000 0.6 0.30 18.0 0.0 -1
700000000 0.7 0.35 21.0 0.0 -1
>>> sc3 = sc1 + sc2
>>> sc3
SpaceCurve with 11 entries and duration 3.700000000
>>> sc3.data
time pos_x pos_y pos_z quality
0 0.0 0.00 0.0 0.0 -1
1000000000 1.0 1.00 2.0 0.0 -1
2000000000 2.0 2.00 4.0 0.0 -1
3000000000 3.0 3.00 6.0 0.0 -1
3100000000 3.1 3.05 9.0 0.0 -1
3200000000 3.2 3.10 12.0 0.0 -1
3300000000 3.3 3.15 15.0 0.0 -1
3400000000 3.4 3.20 18.0 0.0 -1
3500000000 3.5 3.25 21.0 0.0 -1
3600000000 3.6 3.30 24.0 0.0 -1
3700000000 3.7 3.35 27.0 0.0 -1
"""
tn_shift = self.data.index.values[-1] - other.data.index.values[0]
px_shift = self.data['pos_x'].values[-1] - other.data['pos_x'].values[0]
py_shift = self.data['pos_y'].values[-1] - other.data['pos_y'].values[0]
pz_shift = self.data['pos_z'].values[-1] - other.data['pos_z'].values[0]
new_tn = np.concatenate([self.data.index.values,
other.data.index.values[1:] + tn_shift])
new_px = np.concatenate([self.data['pos_x'].values,
other.data['pos_x'].values[1:] + px_shift])
new_py = np.concatenate([self.data['pos_y'].values,
other.data['pos_y'].values[1:] + py_shift])
new_pz = np.concatenate([self.data['pos_z'].values,
other.data['pos_z'].values[1:] + pz_shift])
return self.__class__(new_tn, px=new_px, py=new_py, pz=new_pz)
[docs] def arclength_param(self):
""" Change time parametrization to the universal speed=1 arclength
parametrization.
Examples
--------
>>> tn = np.arange(0, 5e9, 1e9)
>>> s = SpaceCurve(tn=tn, px=(tn/1e9)**2)
>>> s.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
1000000000 1.0 1.0 0.0 0.0 -1
2000000000 2.0 4.0 0.0 0.0 -1
3000000000 3.0 9.0 0.0 0.0 -1
4000000000 4.0 16.0 0.0 0.0 -1
>>> a = s.arclength_param()
>>> a.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
1000000000 1.0 1.0 0.0 0.0 -1
4000000000 4.0 4.0 0.0 0.0 -1
9000000000 9.0 9.0 0.0 0.0 -1
16000000000 16.0 16.0 0.0 0.0 -1
"""
pos = self.data[['pos_x', 'pos_y', 'pos_z']].values
gap = np.diff(pos, axis=0)
dist = np.linalg.norm(gap, axis=1)
accum_time = dist.cumsum()
ts = self.data['time'].values
new_ts = np.ndarray(shape=ts.shape, dtype=ts.dtype)
new_ts[0] = ts[0]
new_ts[1:] = accum_time + ts[0]
new_tn = np.int64(new_ts*10**9)
return SpaceCurve(new_tn,
px=self.data['pos_x'].values,
py=self.data['pos_y'].values,
pz=self.data['pos_z'].values)
[docs] def reverse(self):
"""
Reverse the time parametrization of the SpaceCurve.
Examples
--------
>>> tn = np.arange(0, 5*1e9, 1e9)
>>> s = SpaceCurve(tn=tn, px=10.0*tn/1e9) # drive straight at 10m/s
>>> s.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
1000000000 1.0 10.0 0.0 0.0 -1
2000000000 2.0 20.0 0.0 0.0 -1
3000000000 3.0 30.0 0.0 0.0 -1
4000000000 4.0 40.0 0.0 0.0 -1
>>> a = s.reverse()
>>> a.data
time pos_x pos_y pos_z quality
0 0.0 40.0 0.0 0.0 -1
1000000000 1.0 30.0 0.0 0.0 -1
2000000000 2.0 20.0 0.0 0.0 -1
3000000000 3.0 10.0 0.0 0.0 -1
4000000000 4.0 0.0 0.0 0.0 -1
"""
return SpaceCurve(self.data.index.values,
px=self.data['pos_x'].values[::-1],
py=self.data['pos_y'].values[::-1],
pz=self.data['pos_z'].values[::-1])
[docs] def reparam(self, rate):
r"""Change time parametrization, to represent a constant change of speed.
Locations, initial time, and arc-length are preserved.
Parameters
----------
rate : float
A (positive) ratio by which to increase the speed.
Notes
-----
If :code:`speed[i]` changes to :code:`speed[i]*q[i]`, then
:code:`delta_t[i]` changes to :code:`delta_t[i]*p[i]`, where
:code:`p[i] = 1 / q[i]`.
Examples
--------
>>> tn = np.arange(0, 5*1e9, 1e9)
>>> s = SpaceCurve(tn=tn, px=10.0*tn/1e9) # drive straight at 10m/s
>>> s
SpaceCurve with 5 entries and duration 4.000000000
>>> s.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
1000000000 1.0 10.0 0.0 0.0 -1
2000000000 2.0 20.0 0.0 0.0 -1
3000000000 3.0 30.0 0.0 0.0 -1
4000000000 4.0 40.0 0.0 0.0 -1
>>> a = s.reparam(0.5)
>>> a.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
2000000000 2.0 10.0 0.0 0.0 -1
4000000000 4.0 20.0 0.0 0.0 -1
6000000000 6.0 30.0 0.0 0.0 -1
8000000000 8.0 40.0 0.0 0.0 -1
>>> a
SpaceCurve with 5 entries and duration 8.000000000
>>> b = s.reparam(2.0)
>>> b
SpaceCurve with 5 entries and duration 2.000000000
>>> b.data
time pos_x pos_y pos_z quality
0 0.0 0.0 0.0 0.0 -1
500000000 0.5 10.0 0.0 0.0 -1
1000000000 1.0 20.0 0.0 0.0 -1
1500000000 1.5 30.0 0.0 0.0 -1
2000000000 2.0 40.0 0.0 0.0 -1
"""
ts = self.data['time'].values
new_ts = ts/rate
new_tn = np.int64(new_ts*10**9)
return SpaceCurve(new_tn,
px=self.data['pos_x'].values,
py=self.data['pos_y'].values,
pz=self.data['pos_z'].values)
[docs] def plot(self, canvas, title="SpaceCurve", color="blue"):
r""" Plot the SpaceCurve in 3D.
Parameters
-----------
canvas : :class:`matplotlib.axes._subplots.AxesSubplot`
Be sure that 3D plotting is enabled on this canvas, with
:code:`mpl_toolkits.mplot3d import Axes3D` and
:code:`canvas = matplotlib.pyplot.subplot(projection='3d')`
title : str
A title for the figure.
color : str
The name of a color for the points to draw. Passed to the
appropriate drawing library (bokeh or matplotlib).
Notes
-----
Because we are using ECEF coordinates, the horizontal projection should
be taken as an approximation!
"""
if type(canvas).__module__ == 'bokeh.plotting.figure':
canvas_type = "bokeh"
import bokeh.plotting
elif type(canvas).__module__ == 'matplotlib.axes._subplots':
canvas_type = "pyplot"
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
else:
raise NotImplementedError(
"canvas must be a bokeh.plotting.figure() or a matplotlib.pyplot.subplots()[1]. You gave me {}".format(
type(canvas))
)
#if canvas_type == "bokeh":
# canvas.title = title
#elif canvas_type == "pyplot":
# canvas.set_title(title)
# transform to local ENU coordinates at every point, and start at 0.
pos = self.data[['pos_x', 'pos_y', 'pos_z']].values
if pos.shape[0] == 0:
return False
if canvas_type == "pyplot":
canvas.plot(pos[:, 0], pos[:, 1], pos[:, 2],
color=color)
else:
raise NotImplementedError
pass
[docs] def slide(self, window=60 * 10 ** 9,
time_step=1 * 10 ** 9,
time_start=None,
time_stop=None,
overlap_ends=True):
r""" Produce equal-time SpaceCurves using a sliding-window on self.
Parameters
----------
window : int
length of window in nanoseconds (default = 60*10**9 or 1 minute)
time_step : int
step size in nanoseconds between window starting-points (default = 1*10** or 1 second)
time_start : int
start time in nanoseconds (default = None, min time)
time_stop : int
stop time in nanoseconds (default = None, max time)
overlap_ends : bool
Should endpoints overlap (True) or abutt (False) (default = True)
Returns
-------
iterator of SpaceCurves
To access the tracks as a list, do
:code:`list( FT.snip() )`
Notes
-----
The trackids are unchanged! You might want to change them manually.
"""
if time_start is None:
time_start = self.data.index.values[0]
if time_stop is None:
time_stop = self.data.index.values[-1]
if not overlap_ends:
window -= 1
slices = np.arange(time_start, time_stop, time_step)
for i, start_ns in enumerate(slices):
stop_ns = start_ns + window
newFT = self.copy()
newFT.data = self.data.loc[start_ns:stop_ns].copy()
newFT.info = self.info.loc[start_ns:stop_ns].copy()
yield newFT
if stop_ns >= time_stop:
break
[docs] def snip(self, time_step=60 * 10 ** 9, time_start=None, time_stop=None,
overlap_ends=True):
""" Cut this SpaceCurve into equal-time snippets.
This is just a simple wrapper around self.slide( )
with time_step == window.
time_step=60*10**9 == 1 minute
Note! The trackids are unchanged!
You might want to change them manually.
Yields
------
iterator of SpaceCurves
To access the tracks as a list, do
list( FT.snip() )
"""
for F in self.slide(window=time_step,
time_step=time_step,
time_start=time_start,
time_stop=time_stop,
overlap_ends=overlap_ends):
yield F
[docs] def clean_copy(self, cleanup_func=curve_geometry.mollifier, **kwargs):
r""" Make copy in which a cleanup function performed
on the data.
cleanup_func is the interface to the functions in curve_geometry.
In particular:
1. :func:`curve_geometry.cleanup` does slope comparison
2. :func:`curve_geometry.mollifier` does a convolution
cleanup_func should at the minimum take time and position as
positional arguments 1 and 2 and return both new time and new
position arrays. All Keyword arguments (:code:`**kwargs`) are passed on to
cleanup_func.
One can tune the various cleanup functions by passing
a user manufactured function, for example
.. code::
my_cleanup = lambda time, x:curve_geometry.mollifier(time,x,width=0.5)
clean_track = track.clean_copy(cleanup_func = my_cleanup)
would be equivalent to
.. code::
clean_track = track.clean_copy(cleanup_func=curve_geometry.mollifier, width=0.5)
"""
# Compute the cleaned up values
time, clean_px = cleanup_func(self.data['time'].values,
self.data['pos_x'].values, **kwargs)
timey, clean_py = cleanup_func(self.data['time'].values,
self.data['pos_y'].values, **kwargs)
timez, clean_pz = cleanup_func(self.data['time'].values,
self.data['pos_z'].values, **kwargs)
assert np.all(time == timey)
assert np.all(time == timez)
# [time, clean_vx] = cleanup_func(self.data['time'].values,
# self.data['vel_x'].values)
# [time, clean_vy] = cleanup_func(self.data['time'].values,
# self.data['vel_y'].values)
# [time, clean_vz] = cleanup_func(self.data['time'].values,
# self.data['vel_z'].values)
# Build a new index from the time array
new_index_start = self.data.index.values[0]
new_index = np.int64(time*10**9)+new_index_start
assert new_index.shape[0] == time.shape[0],\
"""New index is shape {} but time is shape {}""".format(new_index.shape[0], time.shape[0])
# Instantiate a new flighttrack object from the cleaned versions
newFT = self.__class__(new_index,
trackid=self.trackid,
platform=self.platform,
activity=self.activity,
mollified=True)
newFT.data = pd.DataFrame({'time': time,
'pos_x': clean_px,
'pos_y': clean_py,
'pos_z': clean_pz,
'quality': -np.ones(time.shape, dtype=np.int64)},
columns=['time', 'pos_x', 'pos_y', 'pos_z', 'quality'],
index=new_index)
return newFT
[docs] def compute(self):
""" Compute some nice invariants and store them to self.info.
Vector quantities are generally in the fixed frame.
"""
# raw data in ECEF frame.
T = self.data['time'].values
P = self.data[['pos_x', 'pos_y', 'pos_z']].values
V = curve_geometry.secant_derivative(T, P)
#V2 = self.data[['vel_x', 'vel_y', 'vel_z']].values
#assert not np.any(np.isnan(V2)), "{} bad in {}".format(np.where(np.isnan(V2)), V2.shape)
# all derivatives and integrals
A = curve_geometry.secant_derivative(T, V)
J = curve_geometry.secant_derivative(T, A)
arclengthS = curve_geometry.secant_arclength(P)
# norms
#recspeed = np.linalg.norm(V2, axis=1).flatten()
speed = np.linalg.norm(V, axis=1).flatten()
acc = np.linalg.norm(A, axis=1).flatten()
jerk = np.linalg.norm(J, axis=1).flatten()
if len(self) > 4:
# Use signature curve to make curv and tors
kap, kap_s, tau, tau_s = self.signature_curve()
#KT = np.ndarray(shape=(T.shape[0], 2), dtype='float')
#KT[:2, :] = sc[0, :]
#KT[:-2, :] = sc[-1, :]
#KT[2:-2, :] = sc
#TNB_enu = curve_geometry.frenet_frame(V, A)
#tilt = np.arccos(np.abs(TNB_enu[:, 2, 2]))
#dKT_ds = curve_geometry.secant_derivative(arclengthS, KT)
#tilt_array = tilt.copy()
#tilt_array.shape = (tilt.shape[0], 1)
#dtilt_ds = curve_geometry.secant_derivative(arclengthS,
#tilt_array).flatten()
# curv_per_alt = KT[:, 0] / (P[:,2]+1)
# acc_per_alt = acc / (P[:,2]+1)
# tors_per_alt = KT[:, 1] / (P[:,2]+1)
#print(tau.shape)
#print(kap_s.shape)
#print(tau.shape)
#print(tau_s.shape)
# friction = kap * speed ** 2 ## need to check this for angle.
# bank = np.arctan(friction / 9.8) ## replace 9.8 with gravity??
# dKT_ds = curve_geometry.secant_derivative(arclengthS, KT)
self.info['vel_x'] = V[:, 0]
self.info['vel_y'] = V[:, 1]
self.info['vel_z'] = V[:, 2]
self.info['acc_x'] = A[:, 0]
self.info['acc_y'] = A[:, 1]
self.info['acc_z'] = A[:, 2]
self.info['len'] = arclengthS # This is the only thing we store in data
self.info['speed'] = speed
#self.info['recspeed'] = recspeed
self.info['acc'] = acc
self.info['jerk'] = jerk
if len(self) > 4:
self.info['curv'] = kap
self.info['curv_s'] = kap_s
self.info['tors'] = tau
self.info['tors_s'] = 0.0
#self.info['tors_s'].values[3:-3] = tau_s
#self.info['tors_s'].values[:3] = tau_s[0]
#self.info['tors_s'].values[-3:] = tau_s[-1]
#self.info['tors'] = tau
#self.info['dKds'] = curv_s
#self.info['dTds'] = tau_s
pass
[docs] def featurize(self, sort_and_grab_num=None):
""" A convenience function to compute everything we think might be
important. This returns a dictionary of np arrays.
Optionally, we can hit all of the persistence diagrams with sort+grab.
"""
feature_dict = dict()
self.compute()
for name in ['curv', 'tors', 'up', 'acc',
'Dcurv/Dlen', 'Dtors/Dlen', 'Dup/Dlen',
'speed', 'friction', 'curv/alt', 'tors/alt', 'acc/alt',
]:
S = Signal(self.info[name].values)
if sort_and_grab_num is not None:
feature_dict[name] = S.pers.grab(sort_and_grab_num)['keepcode']
else:
feature_dict[name] = S.pers.diagram
return feature_dict
[docs] def copy(self):
""" make an identical copy of self. """
d = self.data.copy()
newFT = self.__class__(d.index.values, trackid=self.trackid,
platform=self.platform,
activity=self.activity,
)
newFT.data = d
newFT.info = self.info.copy()
return newFT
[docs] @classmethod
def load(cls, filename):
""" Simple CSV reader.
columns ('time', 'pos_x', 'pos_y', 'pos_z')
"""
data = pd.read_csv(filename, sep="\t")
F = cls(tn=data['time'].values,
px=data['pos_x'].values,
py=data['pos_y'].values,
pz=data['pos_z'].values)
return F
[docs] def duration(self):
""" return string of duraction of SpaceCurve,
converting integer nanoseconds to string seconds.
"""
times = self.data.index.values
if len(times) == 0:
return 0
ns = repr(times[-1] - times[0])
assert ns.find("e") == -1
seconds = ns[:-9] + "." + ns[-9:]
return seconds
def __len__(self):
return len(self.data)
def __repr__(self):
return "SpaceCurve with {} entries and duration {}".format(len(self), self.duration())
[docs] def mass_profile(self, num_bins=100, underflow=-50, overflow=50):
""" histogram of the lift_and_mass by mass. """
LM = self.lift_and_mass()
width = np.float64(overflow - underflow) / np.float64(num_bins)
finite_bins = np.arange(underflow, overflow + width, width)
bins = np.zeros(shape=(finite_bins.shape[0] + 2,))
bins[0] = np.float64('-inf')
bins[-1] = np.float64('inf')
bins[1:-1] = finite_bins[:]
return np.histogram(LM[:, 1], bins)
[docs] def lift_profile(self, num_bins=500, underflow=0, overflow=500000):
""" histogram of the lift_and_mass by lift. """
LM = self.lift_and_mass()
width = np.float64(overflow - underflow) / np.float64(num_bins)
finite_bins = np.arange(underflow, overflow + width, width)
bins = np.zeros(shape=(finite_bins.shape[0] + 2,))
bins[0] = np.float64('-inf')
bins[-1] = np.float64('inf')
bins[1:-1] = finite_bins[:]
return np.histogram(LM[:, 0], bins)
[docs] def lift_and_mass(self):
r""" Produce the terms of the force-balance equations, to help derive
coefficient-of-lift and mass from pure trajectory information.
Consider an aircraft flying with velocity :math:`v`, banking angle
:math:`\theta`, air density :math:`\rho`, mass :math:`m`, gravity
:math:`g`, vertical acceleration :math:`\ddot{z}`, and lift coefficient
:math:`C_{\text{lift}}`.
The force-balance equation is
..math::
LHS = \frac12 \|v\|^2 \, \cos(\theta)\, \rho
RHS = (g + \ddot{z})
LHS\, C_{\text{lift}} = RHS\, m
Returns
-------
coefficients : :class:np.array
the columns are LHS and RHS.
"""
# build an atmospheric air density profile using the data
# 1976 international standard atmosphere. We just use a 4th-order fit.
# https://en.wikipedia.org/wiki/International_Standard_Atmosphere
# alts = np.array([-610, 11000, 20000, 32000,47000])
# dens = np.array([1.2985,0.3639,0.0880,.0105,0.0020])
# air_denstiy = np.poly1d(np.polyfit(alts,dens,4))
air_density = np.poly1d([2.70588959e-19, -5.57103078e-14, 3.91598431e-09, -1.15140013e-04, 1.22679477e+00])
speed = self.info['speed'].values
altacc = self.info['acc_z'].values
bank = self.info['bank'].values
cosb = np.cos(bank)
h = self.info['alt'].values
air = air_density(h)
LHS = 0.5 * speed ** 2 * cosb * air
RHS = 9.8 + altacc
return np.array([LHS, RHS]).T
[docs] def auto_bin(self, num_bins=10, underflow=0, overflow=100):
""" Count bins on the transformed persistence diagrams of
-Speed/7 (so it is expected to be between 0 and 100)
-Climb/3 (so it is expected to be between 0 and 100)
-Curvature*10000 (so it is expected to be between 0 and 100)
-Torsion*10000 (so it is expected to be between 0 and 100)
-Bank*100/(pi/4) (==grade, between 0 and 100)
Bins are added for (-inf, underflow] and (overflow,inf).
If underflow and overflow are not provided, they are set to min/max.
"""
speed01 = Signal(self.info['speed'].values / 7)
climb01 = Signal(self.info['climb'].values / 3)
curv01 = Signal(self.info['curv'].values * 10000)
tors01 = Signal(self.info['tors'].values * 10000)
bank01 = Signal(self.info['bank'].values * 400 / np.pi)
width = np.float64(overflow - underflow) / np.float64(num_bins)
speed_hist, speed_bins = speed01.pers.bin(width=width,
underflow=underflow,
overflow=overflow)
climb_hist, climb_bins = climb01.pers.bin(width=width,
underflow=underflow,
overflow=overflow)
curv_hist, curv_bins = curv01.pers.bin(width=width,
underflow=underflow,
overflow=overflow)
tors_hist, tors_bins = tors01.pers.bin(width=width,
underflow=underflow,
overflow=overflow)
bank_hist, bank_bins = bank01.pers.bin(width=width,
underflow=underflow,
overflow=overflow)
assert np.all(curv_bins == tors_bins) and np.all(
curv_bins == bank_bins)
return {'bins': curv_bins,
'speed': speed_hist,
'climb': climb_hist,
'curv': curv_hist,
'tors': tors_hist,
'bank': bank_hist}
[docs] def signature_curve(self):
r""" Olver/Boutin signature curve.
(kappa, kappa_s, tau, tau_s)
due to difference methods, the lengths are (n-2, n-4, n-4, n-6),
but we pad them all to length n
Usage:
------
>>> ts = np.arange(0,12,0.1)
>>> # Line
>>> C = SpaceCurve(tn = np.arange(ts.shape[0]),
... px = 5*ts + 3, py=2*ts + 5)
>>> kappa, kappa_s, tau, tau_s = C.signature_curve()
>>> np.allclose(kappa, 0.0, atol=1.5e-7)
True
>>> np.allclose(kappa_s, 0.0, atol=1.5e-7)
True
>>> np.allclose(tau, 0.0, atol=1.5e-7)
True
>>> np.allclose(tau_s, 0.0, atol=1.5e-7)
True
>>> ts = np.arange(0,12,0.1)
>>> # Circle with constant speed
>>> C = SpaceCurve(tn = np.arange(ts.shape[0]),
... px = np.cos(ts),
... py = np.sin(ts))
>>> kappa, kappa_s, tau, tau_s = C.signature_curve()
>>> kappa.shape # padded
(120,)
>>> kappa_s.shape
(120,)
>>> np.allclose(kappa, 1.)
True
>>> np.allclose(kappa_s, 0.)
True
>>> ts = np.arange(0,12,0.1)
>>> # Circle with varying speed
>>> C = SpaceCurve(tn = np.arange(ts.shape[0]),
... px = np.cos(ts**2),
... py = np.sin(ts**2))
>>> kappa, kappa_s, tau, tau_s = C.signature_curve()
>>> kappa_s.shape # padded
(120,)
>>> np.allclose(kappa, 1.)
True
>>> np.allclose(kappa_s, 0.)
True
>>> np.allclose(tau, 0.)
True
>>> np.allclose(tau_s, 0.)
True
>>> ts = np.arange(1,13,0.01)
>>> # A Spiral
>>> C = SpaceCurve(tn = np.arange(ts.shape[0]),
... px = np.exp(0.75*ts)*np.cos(ts),
... py = np.exp(0.75*ts)*np.sin(ts))
>>> kappa, kappa_s, tau, tau_s = C.signature_curve()
>>> kappa.shape # padded
(1200,)
>>> np.allclose(kappa[1:-1], np.exp(-0.75*ts[1:-1]), atol=0.1)
True
>>> np.allclose(kappa_s[2:-2]*np.exp(1.5*ts[2:-2]), -12./25., atol=0.01)
True
>>> # A Helix
>>> C = SpaceCurve(tn = np.arange(ts.shape[0]),
... px = 3*np.cos(ts),
... py = 3*np.sin(ts),
... pz = 4*ts)
>>> kappa, kappa_s, tau, tau_s = C.signature_curve()
>>> np.allclose(kappa, 3/25.)
True
>>> np.allclose(kappa_s, 0.0)
True
>>> np.allclose(tau, 4/25.)
True
>>> np.allclose(tau_s, 0.0)
True
>>> # A Helix (reversed)
>>> C = SpaceCurve(tn = np.arange(ts.shape[0]),
... px = 3*np.cos(-ts),
... py = 3*np.sin(-ts),
... pz = 4*ts)
>>> kappa, kappa_s, tau, tau_s = C.signature_curve()
>>> np.allclose(kappa, -3/25.,)
True
>>> np.allclose(kappa_s, 0.0)
True
>>> np.allclose(tau, -4/25.,)
True
>>> np.allclose(tau_s, 0.0)
True
"""
#if not np.all(self.data['pos_z'].values == 0.):
# raise ValueError("This method currently handles only planar curves.")
# if np.all(self.data['pos_x'].values == 0.):
pos = self.data[['pos_x', 'pos_y', 'pos_z']].values
n = pos.shape[0]
# follow Calabi's naming convention.
# We deal with 1-interior points.
P_i_mns_1 = pos[:-2, :]
P_i = pos[1:-1, :]
P_i_pls_1 = pos[2:, :]
# Use the determinant to set a right-handed sign.
triples = np.ndarray(shape=(n-2, 3,3), dtype='float')
triples[:, 0, :] = P_i_mns_1
triples[:, 1, :] = P_i
triples[:, 2, :] = P_i_pls_1
sign = (-1)**np.signbit(np.linalg.det(triples))
a = np.sqrt(np.sum((P_i - P_i_mns_1)**2, axis=1))
b = np.sqrt(np.sum((P_i_pls_1 - P_i )**2, axis=1))
c = np.sqrt(np.sum((P_i_pls_1 - P_i_mns_1)**2, axis=1))
s = 0.5*(a+b+c)
# If a,b,c are co-linear, then we might get s-c to be negative
# due to roundoff error. (or other permutations)
s_minus_a = np.clip(s-a, 0., np.infty)
s_minus_b = np.clip(s-b, 0., np.infty)
s_minus_c = np.clip(s-c, 0., np.infty)
abc = a*b*c
# Calabi,et al eqn (2.2)
non_trivial = (abc != 0)
kappa = sign*4*np.sqrt(s*s_minus_a*s_minus_b*s_minus_c)
kappa[non_trivial] = kappa[non_trivial] / abc[non_trivial]
kappa[~ non_trivial] = 0.0
assert kappa.shape[0] == n-2
# Now, we follow Boutin's naming convention.
# We deal with 2-interior points.
P_i = pos[2:-2, :]
P_i_pls_1 = pos[3:-1, :]
P_i_pls_2 = pos[4:, :]
P_i_mns_1 = pos[1:-3, :]
P_i_mns_2 = pos[0:-4, :]
a = np.sqrt(np.sum((P_i - P_i_mns_1)**2, axis=1))
b = np.sqrt(np.sum((P_i_pls_1 - P_i )**2, axis=1))
# c = np.sqrt(np.sum((P_i_pls_1 - P_i_mns_1)**2, axis=1))
d = np.sqrt(np.sum((P_i_pls_2 - P_i_pls_1)**2, axis=1))
e = np.sqrt(np.sum((P_i_pls_2 - P_i )**2, axis=1))
f = np.sqrt(np.sum((P_i_pls_2 - P_i_mns_1)**2, axis=1))
g = np.sqrt(np.sum((P_i_mns_2 - P_i_mns_1)**2, axis=1))
# reverse collections, for reverse tau
dd = g
ee = np.sqrt(np.sum((P_i_mns_2 - P_i )**2, axis=1))
ff = np.sqrt(np.sum((P_i_mns_2 - P_i_pls_1)**2, axis=1))
assert a.shape[0] == n-4
# Note that the index of a goes 0..n-5, and
# Note that the index of kappa goes 0..n-3.
# and P[i] corresponds to a[i] and kappa[i+1]
denom_ks = 2*a + 2*b + d + g
non_trivial = (denom_ks != 0)
kappa_s = 3*(kappa[2:] - kappa[:-2])
kappa_s[non_trivial] = kappa_s[non_trivial]/denom_ks[non_trivial]
kappa_s[~ non_trivial] = 0.0
# tau according to Boutin's \tilde{tau}_1, in the forward direction
tetra_height = np.ndarray(shape = kappa_s.shape, dtype='float')
for i in range(P_i.shape[0]):
tetrahedron = np.array([P_i_mns_1[i] - P_i[i],
P_i_pls_1[i] - P_i[i],
P_i_pls_2[i] - P_i[i]]).T
tetra_height[i] = np.linalg.qr(tetrahedron, mode='r')[-1,-1]
# we want tau = 6 * tetra_height / denom_t, but
# don't want to divide by zero, which happens if points repeat.
tau_fwd = 6 * tetra_height
denom_t = d * e * f * kappa[1:-1] # sign is inherited!
non_trivial = (denom_t != 0) & (tetra_height != 0)
tau_fwd[non_trivial] = tau_fwd[non_trivial] / denom_t[non_trivial]
tau_fwd[~ non_trivial] = 0.0
# tau according to Boutin's \tilde{tau}_1, in the backard direction
tetra_height = np.ndarray(shape = kappa_s.shape, dtype='float')
for i in range(P_i.shape[0]):
tetrahedron = np.array([P_i_mns_2[i] - P_i[i],
P_i_mns_1[i] - P_i[i],
P_i_pls_1[i] - P_i[i]]).T
tetra_height[i] = np.linalg.qr(tetrahedron, mode='r')[-1,-1]
# we want tau = 6 * tetra_height / denom_t, but
# don't want to divide by zero, which happens if points repeat.
tau_bwd = 6 * tetra_height
denom_t = d * e * f * kappa[1:-1] # sign is inherited!
non_trivial = (denom_t != 0) & (tetra_height != 0)
tau_bwd[non_trivial] = tau_bwd[non_trivial] / denom_t[non_trivial]
tau_bwd[~ non_trivial] = 0.0
tau = (tau_fwd + tau_bwd)/2
# # tau_s according to Boutin's (17), in the forward direction
# P_i_pls_3 = pos[4:, :]
# h = np.sqrt(np.sum((P_i_pls_3 - P_i_pls_2[:-1])**2, axis=1))
# dh = d[:-1] + h
# denom_ts = denom_ks[1:-1] + dh
# old_settings = np.seterr(divide='ignore') #seterr to known value
# tau_s_fwd = 4*(tau[1:] - tau[:-1] + (denom_ks[1:-1] -3*dh) * tau * kappa_s / (6 * kappa[2:-2]))/denom_ts
# tau_s_fwd[denom_ts == 0] = 0
# np.seterr(**old_settings)
#
# # tau_s according to Boutin's (17), in the backward direction
# P_i_mns_3 = pos[:-4, :]
# h = np.sqrt(np.sum((P_i_mns_3 - P_i_mns_2[1:])**2, axis=1))
# dh = d[1:] + h
# denom_ts = denom_ks[1:-1] + dh
# old_settings = np.seterr(divide='ignore') #seterr to known value
# tau_s_bwd = 4*(tau[1:] - tau[:-1] + (denom_ks[1:-1] -3*dh) * tau * kappa_s / (6 * kappa[2:-2]))/denom_ts
# tau_s_bwd[denom_ts == 0] = 0
# np.seterr(**old_settings)
#
# tau_s = (tau_s_fwd + tau_s_bwd)/2
assert kappa.shape == (n-2,)
assert kappa_s.shape == (n-4,)
assert tau.shape == (n-4,)
#assert tau_s.shape == (n-6,)
#print(kappa.shape, tau_s.shape)
kappa_pad = np.ndarray(shape=(n,), dtype='float')
kappa_s_pad = np.ndarray(shape=(n,), dtype='float')
tau_pad = np.ndarray(shape=(n,), dtype='float')
tau_s_pad = np.ndarray(shape=(n,), dtype='float')
kappa_pad[1:-1] = kappa
kappa_pad[:1] = kappa[0]
kappa_pad[-1:] = kappa[-1]
kappa_s_pad[2:-2] = kappa_s
kappa_s_pad[:2] = kappa_s[0]
kappa_s_pad[-2:] = kappa_s[-1]
tau_pad[2:-2] = tau
tau_pad[:2] = tau[0]
tau_pad[-2:] = tau[-1]
tau_s_pad[:] = 0.0
#tau_s_pad[2:-2] = tau_s
#tau_s_pad[:2] = tau_s[0]
#tau_s_pad[-2:] = tau_s[-1]
return kappa_pad, kappa_s_pad, tau_pad, tau_s_pad
# end of class SpaceCurve