Source code for skbel.algorithms.statistics

#  Copyright (c) 2021. Robin Thibaut, Ghent University

import math
import warnings

import numpy as np
import pandas as pd
from numpy.random import uniform
from scipy import integrate, ndimage
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
from sklearn.utils import check_array

from skbel.algorithms.extmath import get_block

__all__ = [
    "KDE",
    "kde_params",
    "posterior_conditional",
    "mvn_inference",
    "it_sampling",
    "normalize",
    "remove_outliers",
]


def tupleset(t, i, value):
    """Set the `i`th element of a tuple to `value`.

    :param t: tuple
    :param i: index
    :param value: value
    """
    l = list(t)
    l[i] = value
    return tuple(l)


def romb(y: np.array, dx: float = 1.0) -> np.array:
    """Romberg's integration using samples of a function.

    :param y: A vector of ``2**k + 1`` equally-spaced samples of a function.
    :param dx: The sample spacing. Default is 1.
    :return: The integral of the function.
    """
    y = np.asarray(y)
    nd = len(y.shape)
    axis = -1
    Nsamps = y.shape[axis]
    Ninterv = Nsamps - 1
    n = 1
    k = 0
    while n < Ninterv:
        n <<= 1
        k += 1
    if n != Ninterv:
        raise ValueError("Number of samples must be one plus a non-negative power of 2.")

    R = {}
    slice_all = (slice(None),) * nd
    slice0 = tupleset(slice_all, axis, 0)
    slicem1 = tupleset(slice_all, axis, -1)
    h = Ninterv * np.asarray(dx, dtype=float)
    R[(0, 0)] = (y[slice0] + y[slicem1]) / 2.0 * h
    slice_R = slice_all
    start = stop = step = Ninterv
    for i in range(1, k + 1):
        start >>= 1
        slice_R = tupleset(slice_R, axis, slice(start, stop, step))
        step >>= 1
        R[(i, 0)] = 0.5 * (R[(i - 1, 0)] + h * y[slice_R].sum(axis=axis))
        for j in range(1, i + 1):
            prev = R[(i, j - 1)]
            R[(i, j)] = prev + (prev - R[(i - 1, j - 1)]) / ((1 << (2 * j)) - 1)
        h /= 2.0
    return R[(k, k)]


[docs] class KDE: """Uni/Bi-variate kernel density estimator. This class is adapted from the class of the same name in the package Seaborn 0.11.1 https://seaborn.pydata.org/generated/seaborn.kdeplot.html """
[docs] def __init__( self, *, kernel_type: str = None, bandwidth: float = None, grid_search: bool = True, bandwidth_space: np.array = None, gridsize: int = 200, cut: float = 1, clip: list = None, ): """Initialize the estimator with its parameters. :param kernel_type: kernel type, one of 'gaussian', 'tophat', 'epanechnikov', 'exponential', 'linear', 'cosine' :param bandwidth: bandwidth :param grid_search: perform a grid search for the bandwidth :param bandwidth_space: array of bandwidths to try :param gridsize: number of points on each dimension of the evaluation grid. :param cut: Factor, multiplied by the smoothing bandwidth, that determines how far the evaluation grid extends past the extreme datapoints. When set to 0, truncate the curve at the data limits. :param clip: A list of two elements, the lower and upper bounds for the support of the density. If None, the support is the range of the data. """ if clip is None: clip = None, None self.kernel_type = kernel_type if kernel_type is None: self.kernel_type = "gaussian" # default self.bw = bandwidth self.grid_search = grid_search if bandwidth_space is None: self.bandwidth_space = np.logspace(-2, 2, 50) # default bandwidths else: self.bandwidth_space = bandwidth_space self.gridsize = gridsize self.cut = cut self.clip = clip self.support = None
[docs] @staticmethod def _define_support_grid(x: np.array, bandwidth: float, cut: float, clip: list, gridsize: int): """Create the grid of evaluation points depending for vector x. :param x: vector of values :param bandwidth: bandwidth :param cut: factor, multiplied by the smoothing bandwidth, that determines how far the evaluation grid extends past the extreme datapoints. When set to 0, truncate the curve at the data limits. :param clip: pair of numbers None, or a pair of such pairs Do not evaluate the density outside of these limits. :param gridsize: number of points on each dimension of the evaluation grid. :return: evaluation grid """ clip_lo = -np.inf if clip[0] is None else clip[0] clip_hi = +np.inf if clip[1] is None else clip[1] bw = 1 if bandwidth is None else bandwidth gridmin = max(x.min() - bw * cut, clip_lo) gridmax = min(x.max() + bw * cut, clip_hi) return np.linspace(gridmin, gridmax, gridsize)
[docs] def _define_support_univariate(self, x: np.array): """Create a 1D grid of evaluation points. :param x: 1D array of data :return: 1D array of evaluation points """ grid = self._define_support_grid( x, self.bw, self.cut, self.clip, self.gridsize ) # define grid return grid
[docs] def _define_support_bivariate(self, x1: np.array, x2: np.array): """Create a 2D grid of evaluation points. :param x1: 1st dimension of the evaluation grid :param x2: 2nd dimension of the evaluation grid :return: 2D grid of evaluation points """ clip = self.clip if clip[0] is None or np.isscalar(clip[0]): # if clip is a single number clip = (clip, clip) grid1 = self._define_support_grid( x1, self.bw, self.cut, clip[0], self.gridsize ) # define grid for x1 grid2 = self._define_support_grid( x2, self.bw, self.cut, clip[1], self.gridsize ) # define grid for x2 return grid1, grid2
[docs] def define_support( self, x1: np.array, x2: np.array = None, cache: bool = True, ): """Create the evaluation grid for a given data set. :param x1: 1D array of data :param x2: 2D array of data :param cache: if True, cache the support grid :return: grid of evaluation points """ if x2 is None: support = self._define_support_univariate(x1) # 1D else: support = self._define_support_bivariate(x1, x2) # 2D if cache: self.support = support # cache the support grid return support
[docs] def _fit(self, fit_data: np.array): """Fit the scikit-learn KDE. :param fit_data: Data to fit the KDE to. :return: fitted KDE object """ bw = 1 if self.bw is None else self.bw # bandwidth fit_kws = { "bandwidth": bw, "algorithm": "auto", # kdtree or ball_tree "kernel": self.kernel_type, "metric": "euclidean", # default "atol": 1e-4, # tolerance for convergence "rtol": 0, # "breadth_first": True, # "leaf_size": 40, "metric_params": None, } # define the kernel density estimator parameters kde = KernelDensity(**fit_kws) # initiate the estimator if self.grid_search and not self.bw: # GridSearchCV maximizes the total log probability density under the model. # The data X will be divided into train-test splits based on folds defined in cv param # For each combination of parameters that you specified in param_grid, the model # will be trained on the train part from the step above and then scoring will be used on test part. # The scores for each parameter combination will be combined for all the folds and averaged. # Highest performing parameter will be selected. grid = GridSearchCV( kde, {"bandwidth": self.bandwidth_space} ) # Grid search on bandwidth grid.fit(fit_data) # Fit the grid search self.bw = grid.best_params_["bandwidth"] # Set the bandwidth to the best bandwidth fit_kws["bandwidth"] = self.bw # Update the bandwidth in the fit_kws kde.set_params( **{"bandwidth": self.bw} ) # Update the bandwidth in the scikit-learn model kde.fit(fit_data) # Fit the KDE return kde
[docs] def _eval_univariate(self, x: np.array): """Fit and evaluate on univariate data. :param x: Data to evaluate. :return: (density, support) """ support = self.support if support is None: support = self.define_support(x, cache=True) kde = self._fit(x.reshape(-1, 1)) density = np.exp(kde.score_samples(support.reshape(-1, 1))) # evaluate the KDE return density, support
[docs] def _eval_bivariate(self, x1: np.array, x2: np.array): """Fit and evaluate on bivariate data. :param x1: First data set. :param x2: Second data set. :return: (density, support) """ support = self.support if support is None: support = self.define_support(x1, x2, cache=False) X_train = np.vstack([x1, x2]).T kde = self._fit(X_train) X, Y = np.meshgrid(*support) grid = np.vstack([X.ravel(), Y.ravel()]).T density = np.exp(kde.score_samples(grid)) # evaluate the KDE density = density.reshape(X.shape) return density, support
[docs] def __call__(self, x1, x2=None): """Fit and evaluate on univariate or bivariate data.""" if x2 is None: return self._eval_univariate(x1) else: return self._eval_bivariate(x1, x2)
def _univariate_density( data_variable: pd.DataFrame, estimate_kws: dict, ): """Estimate the density of a single variable. :param data_variable: DataFrame with a single variable. :param estimate_kws: Keyword arguments for the density estimator. :return: (density, support, bandwidth) """ # Initialize the estimator object estimator = KDE(**estimate_kws) sub_data = data_variable.dropna() # # Extract the data points from this sub set and remove nulls observations = sub_data["x"].to_numpy() observation_variance = observations.var() if math.isclose(observation_variance, 0) or np.isnan(observation_variance): msg = "Dataset has 0 variance; skipping density estimate." warnings.warn(msg, UserWarning, stacklevel=2) # Estimate the density of observations at this level density, support = estimator(observations) return density, support, estimator.bw def _bivariate_density( data: pd.DataFrame, estimate_kws: dict, ): """Estimate bivariate KDE. :param data: DataFrame containing (x, y) data :param estimate_kws: KDE parameters :return: (density, support, bandwidth) """ estimator = KDE(**estimate_kws) # Extract the data points from this sub set and remove nulls sub_data = data.dropna() observations = sub_data[["x", "y"]] # Check that KDE will not error out variance = observations[["x", "y"]].var() if any(math.isclose(x, 0) for x in variance) or variance.isna().any(): msg = "Dataset has 0 variance; skipping density estimate." warnings.warn(msg, UserWarning, stacklevel=2) # Estimate the density of observations at this level observations = observations["x"], observations["y"] density, support = estimator(*observations) return density, support, estimator.bw
[docs] def kde_params( x: np.array = None, y: np.array = None, bw: float = None, bandwidth_space=None, gridsize: int = 200, cut: float = 1, clip=None, ): """Computes the kernel density estimate (KDE) of one or two data sets. :param x: The x-coordinates of the input data. :param y: The y-coordinates of the input data. :param gridsize: Number of discrete points in the evaluation grid. :param bw: The bandwidth of the kernel. :param bandwidth_space: The space to search for the bandwidth. :param cut: Draw the estimate to cut * bw from the extreme data points. :param clip: Lower and upper bounds for datapoints used to fit KDE. Can provide a pair of (low, high) bounds for bivariate plots. :return: (density: The estimated probability density function evaluated at the support, support: The support of the density function, the x-axis of the KDE.) """ # Pack the kwargs for KDE estimate_kws = dict( bandwidth=bw, bandwidth_space=bandwidth_space, gridsize=gridsize, cut=cut, clip=clip, ) if y is None: data = {"x": x} frame = pd.DataFrame(data=data) density, support, bw = _univariate_density(data_variable=frame, estimate_kws=estimate_kws) else: data = {"x": x, "y": y} frame = pd.DataFrame(data=data) density, support, bw = _bivariate_density( data=frame, estimate_kws=estimate_kws, ) return density, support, bw
def _pixel_coordinate(line: list, x_1d: np.array, y_1d: np.array, k: int = None): """Gets the pixel coordinate of the value x or y, in order to get posterior conditional probability given a KDE. :param line: Coordinates of the line we'd like to sample along [(x1, y1), (x2, y2)] :param x_1d: List of x coordinates along the axis :param y_1d: List of y coordinates along the axis :param k: Used to set number of rows/columns :return: (rows, columns) """ if k is None: num = 200 else: num = k # https://stackoverflow.com/questions/18920614/plot-cross-section-through-heat-map # Convert the line to pixel/index coordinates x_world, y_world = np.array(list(zip(*line, strict=False))) col = y_1d.shape * (x_world - min(x_1d)) / np.ptp(x_1d) row = x_1d.shape * (y_world - min(y_1d)) / np.ptp(y_1d) # Interpolate the line at "num" points... row, col = [np.linspace(item[0], item[1], num) for item in [row, col]] return row, col def _conditional_distribution( kde_array: np.array, x_array: np.array, y_array: np.array, x: float = None, y: float = None, k: int = None, ): """Compute the conditional posterior distribution p(x_array|y_array) given x or y. Provide only one observation ! Either x or y. Perform a cross- section in the KDE along the y axis. :param kde_array: KDE of the prediction :param x_array: X grid (1D) :param y_array: Y grid (1D) :param x: Observed data (horizontal axis) :param y: Observed data (vertical axis) :param k: Used to set number of rows/columns :return: (cross_section: The cross-section of the KDE, line: The line of the KDE) """ # Coordinates of the line we'd like to sample along if x is not None: line = [(x, min(y_array)), (x, max(y_array))] elif y is not None: line = [(min(x_array), y), (max(x_array), y)] else: msg = "No observation point included." warnings.warn(msg, UserWarning, stacklevel=2) return 0 # Convert line to row/column row, col = _pixel_coordinate(line=line, x_1d=x_array, y_1d=y_array, k=k) # Extract the values along the line, using cubic interpolation zi = ndimage.map_coordinates(kde_array, np.vstack((row, col))) if x is not None: line = np.linspace(min(y_array), max(y_array), k) elif y is not None: line = np.linspace(min(x_array), max(x_array), k) return zi, line def _scale_distribution(post: np.array, support: np.array) -> np.array: """Scale the distribution to have a maximum of 1, and a minimum of 0. :param post: Values of the KDE cross-section :param support: Support of the KDE cross-section :return: The scaled distribution """ post[np.abs(post) < 1e-8] = 0 # Rule of thumb if post.any(): # If there is any value a = integrate.simpson(y=np.abs(post), x=support) # Integrate the absolute values post *= 1 / a # Scale the distribution return post
[docs] def posterior_conditional( X_obs: float = None, Y_obs: float = None, dens: np.array = None, support: np.array = None, k: int = None, ) -> (np.array, np.array): """Computes the posterior distribution p(y|x_obs) or p(x|y_obs) by doing a cross-section of the KDE of (d, h). :param X_obs: Observation (predictor, x-axis) :param Y_obs: Observation (target, y-axis) :param dens: The density values of the KDE of (X, Y). :param support: The support grid of the KDE of (X, Y). :param k: Used to set number of rows/columns :return: The posterior distribution p(y|x_obs) or p(x|y_obs) and the support grid of the cross-section. """ # Grid parameters xg, yg = support if X_obs is not None: # Extract the density values along the line, using cubic interpolation if isinstance(X_obs, list) or isinstance(X_obs, np.ndarray): X_obs = X_obs[0] post, line = _conditional_distribution(x=X_obs, x_array=xg, y_array=yg, kde_array=dens, k=k) elif Y_obs is not None: # Extract the density values along the line, using cubic interpolation if isinstance(Y_obs, list) or isinstance(Y_obs, np.ndarray): Y_obs = Y_obs[0] post, line = _conditional_distribution(y=Y_obs, x_array=xg, y_array=yg, kde_array=dens, k=k) else: msg = "No observation point included." warnings.warn(msg, UserWarning, stacklevel=2) return 0 post = _scale_distribution(post, line) return post, line
[docs] def mvn_inference(X: np.array, Y: np.array, X_obs: np.array, **kwargs) -> (np.array, np.array): """Estimates the posterior mean and covariance of the target. Note that in this implementation, n_samples must be = 1. .. [1] A. Tarantola. Inverse Problem Theory and Methods for Model Parameter Estimation. SIAM, 2005. Pages: 70-71 :param X: Canonical Variate of the training data :param Y: Canonical Variate of the training target, gaussian-distributed :param X_obs: Canonical Variate of the observation (n_samples, n_features). :return: y_posterior_mean, y_posterior_covariance """ Y = check_array(Y, copy=True, ensure_2d=False) X = check_array(X, copy=True, ensure_2d=False) X_obs = check_array(X_obs, copy=True, ensure_2d=False) # Size of the set n_training = X.shape[0] # Computation of the posterior mean in Canonical space y_mean = np.mean(Y, axis=0) # (n_comp_CCA, 1) # noqa # Mean is 0, as expected. y_mean = np.where(np.abs(y_mean) < 1e-8, 0, y_mean) # Evaluate the covariance in h (in Canonical space) # Very close to the Identity matrix # (n_comp_CCA, n_comp_CCA) y_cov = np.cov(Y.T) # noqa if "x_cov" in kwargs.keys(): x_cov = kwargs["x_cov"] else: x_cov = np.zeros(shape=y_cov.shape) # Linear modeling h to d (in canonical space) with least-square criterion. # Pay attention to the transpose operator. # Computes the vector g that approximately solves the equation y @ g = x. g = np.linalg.lstsq(Y, X, rcond=None)[0].T # Replace values below threshold by 0. g = np.where(np.abs(g) < 1e-8, 0, g) # (n_comp_CCA, n_comp_CCA) # Modeling error due to deviations from theory # (n_components_CCA, n_training) x_ls_predicted = np.matmul(Y, g.T) # noqa x_modeling_mean_error = np.mean(X - x_ls_predicted, axis=0) # (n_comp_CCA, 1) x_modeling_error = X - x_ls_predicted - np.tile(x_modeling_mean_error, (n_training, 1)) # (n_comp_CCA, n_training) # Information about the covariance of the posterior distribution in Canonical space. x_modeling_covariance = np.cov(x_modeling_error.T) # (n_comp_CCA, n_comp_CCA) # Build block matrix s11 = y_cov if y_cov.ndim == 0: y_cov = [y_cov] s12 = y_cov @ g.T s21 = g @ y_cov s22 = g @ y_cov @ g.T + x_cov + x_modeling_covariance block = np.block([[s11, s12], [s21, s22]]) # Inverse delta = np.linalg.pinv(block) # Partition block d11 = get_block(delta, 1) d12 = get_block(delta, 2) # Observe that posterior covariance does not depend on observed x. y_posterior_covariance = np.linalg.pinv(d11) # (n_comp_CCA, n_comp_CCA) # Computing the posterior mean is simply a linear operation, given precomputed posterior covariance. y_posterior_mean = y_posterior_covariance @ ( d11 @ y_mean - d12 @ (X_obs[0] - x_modeling_mean_error - y_mean @ g.T) # noqa ) # (n_comp_CCA,) return y_posterior_mean, y_posterior_covariance
[docs] def normalize(pdf): """Normalize a non-normalized PDF. :param pdf: The probability density function (not necessarily normalized). Must take floats or ints as input, and return floats as an output. :return: pdf_norm: Function with same signature as pdf, but normalized so that the integral between lower_bd and upper_bd is close to 1. Maps nicely over iterables. """ dx = np.abs(pdf.x[1] - pdf.x[0]) # Assume uniform spacing quadrature = romb(pdf.y, dx) # Integrate using Romberg's method A = quadrature # Normalization constant def pdf_normed(x): """Normalized PDF. :param x: Input to the pdf. :return: pdf(x) / A. """ b = np.interp(x=x, xp=pdf.x, fp=pdf.y) # Evaluate the PDF at x if A < 1e-3: # Rule of thumb return 0 if b / A < 1e-3: # If the PDF is very small, return 0 return 0 else: return b / A return pdf_normed
def get_cdf(pdf): """Generate a CDF from a (possibly not normalized) pdf. :param pdf: The probability density function (not necessarily normalized). Must take floats or ints as input, and return floats as an output. :return: cdf: The cumulative density function of the (normalized version of the) provided pdf. Will return a float if provided with a float or int; will return a numpy array if provided with an iterable. """ pdf_norm = normalize(pdf) # Calculate the normalized pdf lower_bound = np.min(pdf.x) upper_bound = np.max(pdf.x) def cdf_number(x): """Numerical cdf. :param x: The value to evaluate the cdf at. :return: The value of the cdf at x. """ if x <= lower_bound: return 0 elif x >= upper_bound: return 1 else: d = np.abs(x - lower_bound) if d > 1e-4: # Check that spacing isn't too small samples = np.linspace(lower_bound, x, 2**7 + 1) dx = np.abs(samples[1] - samples[0]) y = np.array([pdf_norm(s) for s in samples]) return romb(y, dx) else: return 0 def cdf_vector(x): """Vectorized cdf. :param x: The values to evaluate the cdf at. :return: The values of the cdf at x. """ try: return np.array([cdf_number(xi) for xi in x]) except AttributeError: return cdf_number(x) return cdf_vector
[docs] def it_sampling( pdf, num_samples: int = 1, lower_bd=-np.inf, upper_bd=np.inf, k: int = None, cdf_y: np.array = None, return_cdf: bool = False, ): """Sample from an arbitrary, un-normalized PDF. :param pdf: function, float -> float The probability density function (not necessarily normalized). Must take floats or ints as input, and return floats as an output. :param num_samples: The number of samples to be generated. :param lower_bd: Lower bound of the support of the pdf. This parameter allows one to manually establish cutoffs for the density. :param upper_bd: Upper bound of the support of the pdf. :param k: Step number between lower_bd and upper_bd :param cdf_y: precomputed values of the CDF :param return_cdf: Option to return the computed CDF values :return: samples: An array of samples from the provided PDF, with support between lower_bd and upper_bd. """ if k is None: k = 200 # Default step size if cdf_y is None: cdf = get_cdf(pdf) # CDF of the pdf cdf_y = cdf(np.linspace(lower_bd, upper_bd, k)) # CDF values if return_cdf: return cdf_y else: if cdf_y.any(): seeds = uniform(0, 1, num_samples) # Uniformly distributed seeds simple_samples = np.interp(x=seeds, xp=cdf_y, fp=pdf.x) # Samples else: simple_samples = np.zeros(num_samples) # Samples return simple_samples
[docs] def remove_outliers(data): """Removes outliers from the data. :param data: array-like :return: data without outliers """ q25 = np.quantile(data, 0.25, axis=0) q75 = np.quantile(data, 0.75, axis=0) iqr = q75 - q25 lower_bound = q25 - 1.5 * iqr upper_bound = q75 + 1.5 * iqr # if one element is out of bounds, delete its row data = data[(data > lower_bound).all(axis=1) & (data < upper_bound).all(axis=1)] return data