Source code for multidim.models

r"""This module contains classifiers based on the SciKitLearn framework.

    - :class:`CDER`
    - :class:`GaussianMixtureClassifier`

We use these for modelling marked-point processes and labelled pointclouds.

Copyright
---------
- This file is part of https://github.com/geomdata/gda-public/ 
- 2015, 2016, 2017 by Geometric Data Analytics, Inc. (http://geomdata.com)
- AGPL license. See `LICENSE` or https://github.com/geomdata/gda-public/blob/master/LICENSE
"""

from __future__ import print_function

import logging
import numpy as np
import sklearn
import multidim
import multidim.covertree 
from .fast_algorithms import gaussian, gaussian_fit, entropy, distance_cache_None


[docs]class GaussianMixtureClassifier(sklearn.base.BaseEstimator, sklearn.base.ClassifierMixin): r""" A scikit-learn classification estimator, for any sort of Gaussian mixture model. A Gaussian mixture module is a collection of labelled Gaussian functions This is provided mainly to provide a parent class for `CDER`. """ def __init__(self, stop_level=None, **kwargs): self.stop_level = stop_level self.kwargs = kwargs self.pointcloud = None self.all_labels = None self.covertree = None self.gaussians = None
[docs] def get_params(self, deep=False): r""" Pass original kwargs internally.""" return self.kwargs
[docs] def gausscoords(self): """ This is the method to overwrite for specific models! """ gaussians = [{'mean': None, # mean point as numpy 1d 'std': None, # signal diagonals as numpy 1d 'rotation': None, # rotation as numpy 2d 'weight': None, # weight as calculated by fit 'label': None, # best label from labels_train 'adult': None, # adult coordinates 'index': None, # adult index 'radius': None, # radius of region }] tree = None # some sort of object for re-use. It should hava a pointcloud attribute of PointCloud type. return tree, gaussians
[docs] def fit(self, *training): """ Fit this estimator to training data. This is the step that runs the actual predictor. Parameters ---------- *training Can be any of the following: - a pair of list-like objects: pointclouds_train and labels_train The items in pointclouds_train should be :class:`numpy.ndarray` objects. The items in labels_train should be strings (preferably, color names). - a single labelled and weighted :class:`multidim.PointCloud` - a single :class:`multidim.covertree.CoverTree` obtained from a labelled and weighted :class:`multidim.PointCloud` """ if len(training) == 1 and type(training[0]) == multidim.covertree.CoverTree: self.covertree = training[0] self.pointcloud = self.covertree.pointcloud self.all_labels = self.pointcloud.label_info.index.values elif len(training) == 1 and type(training[0]) == multidim.PointCloud: self.pointcloud = training[0] self.all_labels = self.pointcloud.label_info.index.values self.covertree = multidim.covertree.CoverTree(self.pointcloud) elif len(training) == 2: pointclouds_train = training[0] labels_train = training[1] self.pointcloud = multidim.PointCloud.from_multisample_multilabel(pointclouds_train, labels_train) self.all_labels = self.pointcloud.label_info.index.values self.covertree = multidim.covertree.CoverTree(self.pointcloud) else: raise ValueError("bad input to fit()") # Do something here to make gaussians and tree. May use self.kwargs. self.gaussians = self.gausscoords(**self.kwargs) pass
[docs] def evaluate(self, x): r""" Evaluate all gaussians against a pointcloud Parameters ---------- x : :class:`numpy.ndarray` An array, giving a pointcloud. Each row is a point. Returns ------- functions : :class:`numpy.ndarray` the integral of each gaussian over these points. labels : :class:`numpy.ndarray` the labels of the gaussians, for reference. """ def run_gauss(g): return gaussian(x, g['mean'], g['std'], g['rotation']).sum()*g['weight'] functions = [run_gauss(g) for g in self.gaussians] labels = [g['label'] for g in self.gaussians] return np.array(functions), np.array(labels)
[docs] def plot(self, canvas, style="covertree"): r""" Plot a CDER model, using matplotlib or bokeh There are several options to help visualize various aspects of the model. Typically, one wants to overlay this with the plots from the underlying :class:`multidim.PointCloud` and :class:`multidim.covertree.CoverTree` objects. Parameters ---------- canvas : object An instance of `bokeh.plotting.figure.Figure` as in :code:`canvas = bokeh.plotting.figure()` or an instance of :class:`matplotlib.axes._subplots.AxesSubplot` as in :code:`axes,canvas = matplotlib.pyplot.subplots()` style : str options are "gaussians", "covertree", "heatmap", "hulls", "expatriots", and "predictions". Colors are generally obtained from the label names. See Also -------- :func:`multidim.PointCloud.plot` :func:`multidim.covertree.CoverTree.plot` """ if self.covertree is None or self.gaussians is None: raise RuntimeError("You must 'fit' first, before plotting.") # fix the aspect ratio! all_xs = self.covertree.pointcloud.coords.values[:, 0] all_ys = self.covertree.pointcloud.coords.values[:, 1] xmid = (all_xs.max() + all_xs.min())/2.0 ymid = (all_ys.max() + all_ys.min())/2.0 span = max([all_xs.max() - xmid, xmid - all_xs.min(), all_ys.max() - ymid, ymid - all_ys.min()]) if type(canvas).__module__ == 'bokeh.plotting.figure': canvas_type = "bokeh" import bokeh.plotting from bokeh.models import ColumnDataSource, Range1d # fix the aspect ratio! canvas.x_range = Range1d(xmid-span, xmid+span) canvas.y_range = Range1d(ymid-span, ymid+span) elif type(canvas).__module__ == 'matplotlib.axes._subplots': canvas_type = "pyplot" import matplotlib.pyplot as plt from matplotlib.collections import PolyCollection, PatchCollection from matplotlib.patches import Circle, Ellipse, Polygon import matplotlib.colors as colors # fix the aspect ratio! canvas.set_aspect('equal') canvas.set_xlim([xmid-span, xmid+span]) canvas.set_ylim([ymid-span, ymid+span]) else: raise NotImplementedError( "canvas must be a bokeh.plotting.figure() or a matplotlib.pyplot.subplots()[1]. You gave me {}".format( type(canvas)) ) if style == "gaussians": Xms = [] Yms = [] R1s = [] R2s = [] As = [] Cs = [] Ws = [] for g in self.gaussians: Xms.append(g['mean'][0]) Yms.append(g['mean'][1]) R1s.append(g['std'][0]*2/0.75) # 0.75 bokeh oval distortion R2s.append(g['std'][1]*2) As.append(np.arctan2(g['rotation'][0, 1], g['rotation'][0, 0])) Cs.append(g['label']) Ws.append(g['weight']) Ws = np.array(Ws) # Ws = np.log2(numpy.array(Ws)+1) Wmax = Ws.max() if canvas_type == "bokeh": canvas.oval(Xms, Yms, angle=As, width=R1s, height=R2s, alpha=Ws/Wmax, color=Cs, line_width=0, line_color='black') elif canvas_type == "pyplot": patches = [] rgbas = [] CC = colors.ColorConverter() for i in range(len(Xms)): patches.append(Ellipse(xy=(Xms[i], Yms[i]), width=R1s[i]*0.75, height=R2s[i], angle=As[i]*180/np.pi)) # have to manually set the alpha value rgba = list(CC.to_rgba(Cs[i])) rgba[3] = Ws[i]/Wmax rgbas.append(tuple(rgba)) p = PatchCollection(patches, edgecolor='none') p.set_facecolors(rgbas) canvas.add_collection(p) pass elif style == "heatmap": # This could probably use self.runit instead. xy = [] min_x = xmid - span max_x = xmid + span min_y = ymid - span max_y = ymid + span dx = (max_x - min_x)/64. dy = (max_y - min_y)/64. for x in np.arange(min_x, max_x, dx): for y in np.arange(min_y, max_y, dy): xy.append((x + dx/2, y + dy/2)) xy = np.array(xy) z_by_label = np.ndarray(shape=(xy.shape[0], self.all_labels.shape[0])) for i, point in enumerate(xy): integrals, labels = self.evaluate(xy[[i], :]) for j, label in enumerate(self.all_labels): z_by_label[i, j] = np.linalg.norm(integrals[labels == label], 1) # z_by_label = np.log2(z_by_label + 1) # z_by_label = z_by_label - z_by_label.min() z_by_label = z_by_label/z_by_label.max() # z_by_label = z_by_label**0.5 if canvas_type == "bokeh": for j, label in enumerate(self.all_labels): canvas.rect(xy[:, 0]-dx/2, xy[:, 1]-dy/2, width=dx, height=dy, color=label, alpha=z_by_label[:, j]) elif canvas_type == "pyplot": patches = [] rgbas = [] CC = colors.ColorConverter() for j, label in enumerate(self.all_labels): for i in range(xy.shape[0]): corners = np.array([[xy[i, 0]-dx/2, xy[i, 1]-dx/2], [xy[i, 0]+dx/2, xy[i, 1]-dx/2], [xy[i, 0]+dx/2, xy[i, 1]+dy/2], [xy[i, 0]-dx/2, xy[i, 1]+dy/2]]) patches.append(Polygon(corners)) # have to manually set the alpha value rgba = list(CC.to_rgba(label)) rgba[3] = z_by_label[i, j] rgbas.append(tuple(rgba)) p = PatchCollection(patches, edgecolor='none') p.set_facecolors(rgbas) canvas.add_collection(p) pass elif style == "hulls": from scipy.spatial import ConvexHull if canvas_type == "bokeh": raise NotImplementedError("No hulls in Bokeh yet. Use pyplot.") elif canvas_type == "pyplot": maxwt = max([g['weight'] for g in self.gaussians]) patches = [] rgbas = [] cc = colors.ColorConverter() for i, g in enumerate(self.gaussians): if g['count'] > 3: a = g['index'] l = g['level'] cl = self.covertree[l] label = g['label'] children = self.covertree.pointcloud.coords.values[cl.children[a], :] hull = ConvexHull(children).vertices poly_data = children[hull, :] patches.append(Polygon(poly_data)) # have to manually set the alpha value rgba = list(cc.to_rgba(label)) rgba[3] = g['weight']*1.0/maxwt rgbas.append(tuple(rgba)) p = PatchCollection(patches, edgecolor='none') p.set_facecolors(rgbas) canvas.add_collection(p) pass elif style == "entropy": N = len(self.covertree) C = np.zeros(shape=(N,)) Q = np.zeros(shape=(N,)) for g in self.gaussians: C[g['level']:] = C[g['level']:] + 1.0 Q[g['level']:] = Q[g['level']:] + (1.0 - g['entropy']) canvas.scatter(range(N), C, color="red", legend="coords", size=4) canvas.line(range(N), C, color="red") canvas.scatter(range(N), Q, color="blue", legend="bits", size=2) canvas.line(range(N), Q, color="blue") canvas.legend.location = "top_left" pass elif style == "expatriots": Xms = [] Yms = [] R1s = [] R2s = [] As = [] Cs = [] Ws = [] for g in self.gaussians: Xms.append(g['mean'][0]) Yms.append(g['mean'][1]) R1s.append(g['std'][0]*2/0.75) # 0.75 bokeh oval distortion R2s.append(g['std'][1]*2) As.append(np.arctan2(g['rotation'][0, 1], g['rotation'][0, 0])) Cs.append(g['label']) Ws.append(g['weight']) Ws = np.log2(np.array(Ws)+1) Wmax = Ws.max() if canvas_type == "bokeh": canvas.oval(Xms, Yms, angle=As, width=R1s, height=R2s, alpha=Ws/Wmax, color=Cs, line_width=0, line_color='black') elif canvas_type == "pyplot": patches = [] rgbas = [] CC = colors.ColorConverter() for i in range(len(Xms)): patches.append(Ellipse(xy=(Xms[i], Yms[i]), width=R1s[i]*0.75, height=R2s[i], angle=As[i]*180/np.pi)) # have to manually set the alpha value rgba = list(CC.to_rgba(Cs[i])) rgba[3] = Ws[i]/Wmax rgbas.append(tuple(rgba)) p = PatchCollection(patches, edgecolor='none') p.set_facecolors(rgbas) canvas.add_collection(p) pass elif style == "predictions": pass elif style == "covertree": Xs = [] Ys = [] Rs = [] Cs = [] Ws = [] for g in self.gaussians: Xs.append(g['adult'][0]) Ys.append(g['adult'][1]) Rs.append(g['radius']) Cs.append(g['label']) Ws.append(1.0 - g['entropy']) Ws = np.array(Ws) # Ws = np.log2(np.array(Ws)+1) Wmax = Ws.max() if canvas_type == "bokeh": canvas.circle(Xs, Ys, radius=Rs, alpha=Ws/Wmax, color=Cs, line_width=0, line_color='black') elif canvas_type == "pyplot": patches = [] rgbas = [] cc = colors.ColorConverter() for i in range(len(Xs)): patches.append(Circle(xy=(Xs[i], Ys[i]), radius=Rs[i])) # have to manually set the alpha value rgba = list(cc.to_rgba(Cs[i])) rgba[3] = Ws[i]/Wmax rgbas.append(tuple(rgba)) p = PatchCollection(patches, edgecolor='none') p.set_facecolors(rgbas) canvas.add_collection(p) pass else: raise ValueError("Wrong plot style.")
[docs] def predict(self, pointclouds): r""" Predict labels of given pointclouds, based on previous training data fed to :func:`fit`. This is just a call to :func:`runit` Parameters ---------- pointclouds : list-like A list of arrays. Each array is a pointcloud. Each row of that array is a point. Returns ------- array of labels, one for each pointcloud in the input list. """ return self.runit(pointclouds)[2]
[docs] def score(self, pointclouds, labels): r""" Score predicted labels against known labels. Parameters ---------- pointclouds : list-like A list of arrays. Each array is a pointcloud. Each row of that array is a point. labels : list-like A corresponding list of labels. Returns ------- an array of booleans. True means the labels match. See Also -------- :func:`score` """ return self.predict(pointclouds) == labels
[docs] def runit(self, pointclouds): num_tests = len(pointclouds) evaluations = np.ndarray(shape=(num_tests, len(self.gaussians)), dtype=np.float64) norm_scores = np.ndarray(shape=(num_tests, len(self.all_labels)), dtype=np.float64) best_match = np.ndarray(shape=(num_tests,), dtype=self.all_labels.dtype) # evaluate each pointcloud against each Gaussian for i, X in enumerate(pointclouds): v, l = self.evaluate(X) evaluations[i, :] = v for j, score_label in enumerate(self.all_labels): indicator = (l == score_label) norm_scores[i, j] = np.linalg.norm(v*indicator) best_label_index = norm_scores[i, :].argmax() best_match[i] = self.all_labels[best_label_index] return evaluations, norm_scores, best_match
[docs]class CDER(GaussianMixtureClassifier): r"""The CDER (Cover-Tree Differencing for Entropy Reduction) algorithm for supervised machine-learning of labelled cloud collections. This uses the "Cover Tree with Friends" algorithm along with an entropy computation to build a regional classifer for labelled pointclouds. It relies on the :class:`multidim.covertree.CoverTree` and :class:`multidim.covertree.CoverLevel` data structures. See the paper [CDER1]_ and the talk [CDER2]_. Parameters ---------- parsimonious : bool Whether to use the parsimonious version of CDER, where a region is ignored once entropy reached a local minimum. For most datasets, :code:`parsimonious=False` provides a better classifier, but they are more expensive to evaluate. In many cases, :code:`parsimonious=False` is good enough, and is significantly faster. Default: :code:`True` Examples -------- We'll make a simple dataset with one "green" pointcloud sampled from a uniform distribution on the 1x1 square centered at (-1,0), and one "magenta" pointcloud sampled from a uniform distribution on the 1x1 square centered at (1, 0). >>> import numpy as np >>> import multidim >>> train_dataL = np.random.rand(100,2) - np.array([-1.5, -0.5]) # for green >>> # dataL.mean(axis=0) # should be near (-1, 0) >>> train_dataR = np.random.rand(200,2) - np.array([0.5, -0.5]) # for magenta >>> # dataR.mean(axis=0) # should be near (+1, 0) >>> cder = CDER(parsimonious=True) # prepare a classifier >>> cder.fit([train_dataL, train_dataR], ["green", "magenta"]) # this runs CoverTree >>> for g in cder.gaussians: ... print(sorted(list(g.keys()))) ... break ['adult', 'count', 'entropy', 'index', 'label', 'level', 'mean', 'radius', 'rotation', 'std', 'weight'] >>> test_dataL = np.random.rand(50,2) - np.array([-1.5, -0.5]) # should be green >>> test_dataR = np.random.rand(50,2) - np.array([0.5, -0.5]) # should be magenta >>> cder.predict([test_dataL, test_dataR]) # Guess the labels array(['green', 'magenta'], dtype=object) >>> cder.score([test_dataL, test_dataR], ["green", "magenta"]) # Correct? array([ True, True], dtype=bool) A more thorough example is at http://nbviewer.jupyter.org/github/geomdata/gda-public/blob/master/examples/example-cder.ipynb See Also -------- :class:`multidim.covertree.CoverTree` References ---------- .. [CDER1] Supervised Learning of Labeled Pointcloud Differences via Cover-Tree Entropy Reduction https://arxiv.org/abs/1702.07959 .. [CDER2] CDER, Learning with Friends https://www.ima.umn.edu/2016-2017/DSS9.6.16-5.30.17/26150 """
[docs] def build_gaussian(self, coverlevel, adult, label, dominant_index): r""" Build a Gaussian from the children of this adult in a cover-tree (using only a particular label). Generally, a user will never call this. It is called when :func:`gausscoords` decides that a CoverTree ball is work modelling. Parameters ---------- coverlevel : `multidim.covertree.CoverLevel` adult : int label : int dominant_index : int See Also -------- :func:`gausscoords` """ ct = self.covertree pc = ct.pointcloud label_index = pc.label_info['int_index'].loc[label] assert label == pc.label_info.index[label_index] winning_children = np.intersect1d(coverlevel.children[adult], np.where(pc.labels == label_index )) count = len(winning_children) sample = ct.coords[winning_children,:] entropy = coverlevel.entropy[adult] ambient_dim = ct.coords.shape[1] weight = coverlevel.weights[adult][label_index].sum() if count < ambient_dim: logging.warn("Too few points {} at {}. Lost {}*{}".format(count, adult, 1-entropy, weight)) return None m,s,v = gaussian_fit(sample) if np.any(s <= 0.): logging.warn("Bad Gaussian {} at {}. Lost {}*{}".format(s, adult, 1-entropy, weight)) return None return {"label": label, "mean": m, "std": s, "rotation": v, "index": adult, "adult": ct.coords[adult,:], "level": coverlevel.exponent, "radius": coverlevel.radius, "entropy": entropy, "weight": coverlevel.radius**ambient_dim*weight*(1. - entropy ), "count": count, }
[docs] def gausscoords(self, parsimonious=True): r""" This is the Gaussian-building heart of the CDER algorithm. Look to elders, and look to successor. How does entropy change? If the entropy is a local minimum, then build a Gaussian coordinate with the dominant label. This populates :code:`self.gaussians` for a :class:`CDER` object. Generally, a user will never call this. It is called by :func:`fit` Parameters ---------- parsimonious: bool If True, then ignore unlikely regions quickly, to minimize the number of Gaussian coordinates. If False, then check all levels of the CoverTree. (default: True) See Also -------- :func:`build_gaussians` :func:`fit` """ gaussians = [] for next_level in self.covertree: # next_level is *not* the level we are actually studying! # but, we need to pre-compute to see if it is worth proceeding. if next_level.exponent > 16: break elif next_level.exponent <= 1: next_level.adults_to_check = [] next_level.adults_to_check.extend(next_level.adults) for adult in next_level.adults_to_check: next_level.find_entropy(adult) pass else: l_plus_1 = next_level.exponent prev_level = self.covertree._levels[l_plus_1-2] this_level = self.covertree._levels[l_plus_1-1] if not this_level.adults_to_check: logging.info("Done at {}".format(this_level.exponent)) break # done with covertree! next_level.adults_to_check = [] logging.info("Gaussians so far: {}".format(len(gaussians))) logging.info(this_level) logging.info("Checking {} adults".format(len(this_level.adults_to_check))) for adult in this_level.adults_to_check: # TODO -- make this more efficient with special ratio. # get union of entropy of elders adult_a = np.array([adult], dtype=np.int64) pre_elders = np.array(prev_level.friends1[this_level.predecessor[adult]]) pre_eldersR = distance_cache_None( adult_a, pre_elders, self.covertree.coords).flatten() my_elders = pre_elders[pre_eldersR <= prev_level.radius] prev_weights = np.concatenate([ prev_level.weights[e] for e in my_elders ]) prev_entropy = entropy(prev_weights/prev_weights.sum(axis=0)) assert not np.isnan(prev_entropy), "nan? prev_weights = {}".format(prev_weights) # union entropy of elders of adult at prev_level this_entropy = this_level.find_entropy(adult) assert not np.isnan(this_entropy) # entropy of this_level children of adult next_entropy = next_level.find_entropy(adult) assert not np.isnan(next_entropy) logging.debug("{}: {} {} {}:".format(adult, next_entropy, this_entropy, prev_entropy), end=": ") # entropy of next_level children of adult # We consider all orderings of these three quantities if np.count_nonzero(next_level.children[adult]) <= this_level.covertree.pointcloud.multiplicity[adult]: pass elif np.count_nonzero(this_level.children[adult]) <= this_level.covertree.pointcloud.multiplicity[adult]: pass elif 1.0 > prev_entropy >= this_entropy >= next_entropy: # use it! my_weights = this_level.weights[adult] totweight = my_weights.sum() numlabels = my_weights.shape[0] # find the labels that dominate the low-entropy # sort from biggest to least, and make a coord for each. dominant_index = (my_weights > totweight/numlabels) dominant_labels = this_level.covertree.pointcloud.label_info.index[dominant_index] label_ordering = my_weights[dominant_index].argsort()[::-1] for label in dominant_labels[label_ordering]: g = self.build_gaussian(this_level, adult, label, dominant_index) if g is not None: gaussians.append(g) if parsimonious is False: to_check = list(this_level.successors[adult]) next_level.adults_to_check.extend(to_check) elif 1.0 > this_entropy >= prev_entropy >= next_entropy: # re-check adult at the next level. if parsimonious is False: to_check = list(this_level.successors[adult]) next_level.adults_to_check.extend(to_check) elif parsimonious is True: # only the center is likely to be useful next_level.adults_to_check.append(adult) else: raise ValueError("parsimonious flag must be True or False.") elif 1.0 > prev_entropy >= next_entropy >= this_entropy: to_check = list(this_level.successors[adult]) if parsimonious is True: # center is unlikely to be useful. to_check.remove(adult) next_level.adults_to_check.extend(to_check) elif 1.0 > next_entropy >= prev_entropy >= this_entropy: to_check = list(this_level.successors[adult]) if parsimonious is True: # center is unlikely to be useful. to_check.remove(adult) next_level.adults_to_check.extend(to_check) elif 1.0 > this_entropy >= next_entropy >= prev_entropy: if parsimonious is False: to_check = list(this_level.successors[adult]) next_level.adults_to_check.extend(to_check) pass # do nothing with this subtree. That is, STOP. elif 1.0 > next_entropy >= this_entropy >= prev_entropy: if parsimonious is False: to_check = list(this_level.successors[adult]) next_level.adults_to_check.extend(to_check) pass # do nothing with this subtree. That is, STOP. else: assert max([prev_entropy, this_entropy, next_entropy]) >= 1.0, "I see an strange entropy value {}".format([prev_entropy, this_entropy, next_entropy]) to_check = list(this_level.successors[adult]) next_level.adults_to_check.extend(to_check) return gaussians