# Copyright (c) 2022. Robin Thibaut, Ghent University
"""Bayesian Evidential Learning Framework.
Currently, the common practice is to first transform predictor and target variables
through PCA, and then apply CCA.
Alternative blueprints could be written in the same style as the BEL class implementing the classic scheme.
"""
import numpy as np
from scipy import interpolate, stats
from sklearn.base import (
BaseEstimator,
MultiOutputMixin,
TransformerMixin,
)
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.utils import check_array
from sklearn.utils.validation import (
check_is_fitted,
)
from ..algorithms import it_sampling, kde_params, mvn_inference, posterior_conditional
from ..tmaps import TransportMap
__all__ = ["BEL"]
[docs]
class BEL(TransformerMixin, MultiOutputMixin, BaseEstimator):
"""Heart of the framework.
Inherits from scikit-learn base classes.
BEL stands for Bayesian Evidential Learning.
"""
[docs]
def __init__(
self,
mode: str = "tm",
copy: bool = True,
*,
X_pre_processing=None,
Y_pre_processing=None,
X_post_processing=None,
Y_post_processing=None,
regression_model=None,
n_comp_cca=None,
x_dim=None,
y_dim=None,
random_state=None,
):
"""Initialize the BEL class.
:param mode: How to infer the posterior distribution (if CCA is used). "kde", "mvn" or "tm".
:param copy: Whether to copy arrays or not (default is True).
:param X_pre_processing: sklearn pipeline for pre-processing the predictor.
:param Y_pre_processing: sklearn pipeline for pre-processing the target.
:param X_post_processing: sklearn pipeline for post-processing the predictor.
:param X_post_processing: sklearn pipeline for post-processing the target.
:param regression_model: The regression model to use. Default is Canonical Correlation Analysis.
:param n_comp_cca: Number of components to keep in CCA (only if CCA is used).
:param x_dim: Predictor original dimensions.
:param y_dim: Target original dimensions.
:param random_state: Seed to reproduce the same samples.
"""
self.copy = copy
# How to infer the posterior parameters
self.mode = mode
# Processing pipelines
if X_pre_processing is None:
X_pre_processing = Pipeline([("nothing", "passthrough")])
if Y_pre_processing is None:
Y_pre_processing = Pipeline([("nothing", "passthrough")])
if X_post_processing is None:
X_post_processing = Pipeline([("nothing", "passthrough")])
if Y_post_processing is None:
Y_post_processing = Pipeline([("nothing", "passthrough")])
if regression_model is None:
regression_model = Pipeline([("nothing", "passthrough")])
self.X_pre_processing = X_pre_processing
self.Y_pre_processing = Y_pre_processing
self.X_post_processing = X_post_processing
self.Y_post_processing = Y_post_processing
self.regression_model = regression_model
self.n_comp_cca = n_comp_cca
# Parameters for sampling
self._seed = random_state
# Original dataset dimensions
self.x_dim, self.y_dim = x_dim, y_dim
# Pre-processed predictor training data, possibly dimension reduced
self._x_pre_processed, self._y_pre_processed = None, None
# Pre-processed observation predictor, possibly dimension reduced
self._x_obs_pre_processed = None
# The following properties are central to the BEL framework
@property
def X_shape(self):
"""Predictor original shape."""
return self._X_shape
@X_shape.setter
def X_shape(self, x_shape):
self._X_shape = x_shape
@property
def Y_shape(self):
"""Target original shape."""
return self._Y_shape
@Y_shape.setter
def Y_shape(self, y_shape):
self._Y_shape = y_shape
@property
def n_posts(self):
"""Number of sample to extract from the posterior multivariate
distribution after post-processing."""
return self._n_posts
@n_posts.setter
def n_posts(self, n_p):
self._n_posts = n_p
@property
def seed(self):
"""Seed a.k.a.
random state to reproduce the same samples
"""
return self._seed
@seed.setter
def seed(self, s):
self._seed = s
np.random.seed(self._seed)
@property
def random_state(self):
"""Seed a.k.a.
random state to reproduce the same samples
"""
return self._seed
@random_state.setter
def random_state(self, s):
self._seed = s
@property
def x_pre_processed(self):
"""Pre-processed predictor."""
return self._x_pre_processed
@x_pre_processed.setter
def x_pre_processed(self, x_pre_processed):
self._x_pre_processed = x_pre_processed
@property
def y_pre_processed(self):
"""Pre-processed target."""
return self._y_pre_processed
@y_pre_processed.setter
def y_pre_processed(self, y_pre_processed):
self._y_pre_processed = y_pre_processed
@property
def x_observation(self):
"""Pre-processed observation."""
return self._x_obs_pre_processed
@x_observation.setter
def x_observation(self, x_observation):
self._x_obs_pre_processed = x_observation
[docs]
def fit(self, X: np.array = None, Y: np.array = None):
"""Fit all pipelines.
:param X: Predictor array.
:param Y: Target array.
:return: self
"""
if self.x_pre_processed is None:
_xt = self.X_pre_processing.fit_transform(X)
else:
_xt = self.x_pre_processed
if self.y_pre_processed is None:
_yt = self.Y_pre_processing.fit_transform(Y)
else:
_yt = self.y_pre_processed
# Regression
try:
if self.n_comp_cca is None: # If not specified, use all components
self.regression_model.n_components = min(_xt.shape[1], _yt.shape[1])
else:
self.regression_model.n_components = self.n_comp_cca
_xc, _yc = self.regression_model.fit_transform(X=_xt, y=_yt) # Learning
except ValueError: # If no CCA
_xc, _yc = _xt, _yt
# CV Normalized
_xf, _yf = (
self.X_post_processing.fit_transform(_xc),
self.Y_post_processing.fit_transform(_yc),
) # Post-processing
self.X_f, self.Y_f = _xf, _yf # At the moment, we have to save these.
return self
[docs]
def predict(
self,
X_obs: np.array = None,
n_posts: int = None,
mode: str = None,
noise: float = None,
return_samples: bool = True,
inverse_transform: bool = True,
precomputed_kde: np.array = None,
dtype: str = "float64",
) -> np.array:
"""Predict the posterior distribution of the target variable.
:param X_obs: The observed data.
:param n_posts: The number of posterior samples to draw.
:param mode: The mode of inference to use. Default is "tm".
:param noise: The noise level of the model (only if mode == 'mvn').
:param return_samples: Option to return samples or not. Default=True.
:param inverse_transform: Option to return the samples in the original space. If the dimensionality of the
original space is very high, this can be memory-consuming. It can be set to False to return the samples in the
reduced space, which is much faster, so that the samples can be back-transformed later. Default=True.
:param precomputed_kde: (if mode="kde) Precomputed KDE functions. Computing the KDEs can be time-consuming.
If the KDEs are precomputed, they can be passed as an argument.
:param dtype: The data type of the samples. Default=float64.
:return: The posterior samples in the original space or in the transformed space.
"""
if mode is not None: # If mode is provided
self.mode = mode
if noise is None:
self.noise = 0.01
if n_posts is not None: # If n_posts is provided
self.n_posts = n_posts # Set the number of posterior samples
# Project observed data into canonical space.
if self._x_obs_pre_processed is None:
X_obs_pc = self.X_pre_processing.transform(X_obs)
else:
X_obs_pc = self._x_obs_pre_processed
X_obs_c = self.regression_model.transform(
X_obs_pc
) # Project observed data into Canonical space.
X_obs_f = self.X_post_processing.transform(X_obs_c)
self.X_obs_f = X_obs_f
# Estimate the posterior mean and covariance
n_obs = X_obs_f.shape[0] # Number of observations
n_cca = self.regression_model.n_components # Number of canonical variables
if self.mode == "mvn": # If mode is mvn
self.posterior_mean, self.posterior_covariance = (
np.zeros((n_obs, n_cca)),
np.zeros((n_obs, n_cca, n_cca)),
)
for n, dp in enumerate(X_obs_f): # For each observation point
# Evaluate the covariance in d (here we assume no data error, so C is identity times a given factor)
# Number of PCA components for the curves
x_dim = self.X_pre_processing["pca"].n_components # Number of PCA components
# I matrix. (n_comp_PCA, n_comp_PCA)
x_cov = (
np.eye(x_dim) * self.noise
) # Noise level. We assume that the data is noisy with a given level of noise.
# (n_comp_CCA, n_comp_CCA)
# Get the rotation matrices
x_rotations = self.regression_model.x_rotations_
x_cov = x_rotations.T @ x_cov @ x_rotations
dict_args = {"x_cov": x_cov}
X, Y = self.X_f, self.Y_f
# mvn_inference is designed to accept 1 observation at a time.
post_mean, post_cov = mvn_inference(
X=X,
Y=Y,
X_obs=dp.reshape(1, -1),
**dict_args,
) # Posterior mean and covariance
self.posterior_mean[n] = post_mean
self.posterior_covariance[n] = post_cov
elif self.mode == "kde": # KDE
self.kde_functions = np.zeros((n_obs, n_cca), dtype="object") # KDE functions
if precomputed_kde is not None: # If precomputed KDE functions are provided
self.kde_functions = precomputed_kde
if not np.all(self.kde_functions): # If KDE functions are not provided
# KDE inference
for comp_n in range(n_cca):
# If the relation is almost perfectly linear, it doesn't make sense to perform a
# KDE estimation.
corr = np.corrcoef(self.X_f.T[comp_n], self.Y_f.T[comp_n]).diagonal(offset=1)[0]
# If the Pearson's correlation coefficient is > 0.999, linear regression is used instead of KDE.
if corr >= 0.999: # If the relation is almost perfectly linear
kind = "linear"
fun = LinearRegression().fit(
self.X_f.T[comp_n].reshape(-1, 1),
self.Y_f.T[comp_n].reshape(-1, 1),
) # Linear regression
bw = 0 # Bandwidth is not used in this case.
# The KDE inference method can be hybrid - the returned functions are saved as a dictionary
sample_fun = {"kind": kind, "function": fun, "bandwidth": bw}
functions = [sample_fun] * n_obs
else: # If the relation is not perfectly linear
# Compute KDE
dens, support, bw = kde_params(x=self.X_f.T[comp_n], y=self.Y_f.T[comp_n])
# Rule of thumb:
dens[dens < 1e-8] = 0 # Remove the small values
functions = []
for dp in X_obs_f: # For each observation point
# Conditional:
hp, sup = posterior_conditional(
X_obs=dp.T[comp_n],
dens=dens,
support=support,
k=2**7 + 1,
)
hp[np.abs(hp) < 1e-8] = 0 # Set very small values to 0.
kind = "pdf"
fun = interpolate.interp1d(sup, hp, kind="linear") # Interpolate
# The KDE inference method can be hybrid - the returned functions are saved as a dictionary
sample_fun = {
"kind": kind,
"function": fun,
"bandwidth": bw,
}
functions.append(sample_fun) # Save the function
# Shape = (n_obs, n_comp_CCA)
self.kde_functions[:, comp_n] = functions # noqa
elif self.mode == "tm":
nonmonotone = [
[
[],
[0],
[0, 0, "HF"],
[0, 0, 0, "HF"],
[0, 0, 0, 0, "HF"],
[0, 0, 0, 0, 0, "HF"],
[0, 0, 0, 0, 0, 0, "HF"],
[0, 0, 0, 0, 0, 0, 0, "HF"],
]
]
monotone = [
[
[1],
"iRBF 1",
"iRBF 1",
"iRBF 1",
"iRBF 1",
"iRBF 1",
"iRBF 1",
"iRBF 1",
"iRBF 1",
]
]
self.tm_functions = np.zeros((n_obs, n_cca), dtype="object")
for comp_n in range(n_cca):
X = np.vstack((self.X_f.T[comp_n], self.Y_f.T[comp_n])).T
# Initialize a transport map object
tm = TransportMap(
monotone=monotone,
nonmonotone=nonmonotone,
X=X,
polynomial_type="probabilist's hermite",
monotonicity="separable monotonicity",
standardize_samples=True,
workers=1,
) # Number of workers for the parallel optimization; 1 means no parallelization
tm.optimize()
kind = "tm"
sample_fun = {"kind": kind, "function": tm, "X": X}
functions = [sample_fun] * n_obs
# Shape = (n_obs, n_comp_CCA)
self.tm_functions[:, comp_n] = functions # noqa
if return_samples:
samples = self.random_sample(
X_obs_f=X_obs_f,
n_posts=n_posts,
mode=mode,
init_kde=precomputed_kde,
) # Samples from the posterior
if inverse_transform:
return self.inverse_transform(samples, dtype=dtype) # Inverse transform
else:
return samples # Return samples
[docs]
def random_sample(
self,
X_obs_f: None,
obs_n: int = None,
n_posts: int = None,
mode: str = None,
init_kde: np.array = None,
) -> np.array:
"""Random sample the inferred posterior distribution. It can be used to
generate samples from the posterior.
:param X_obs_f: Observed data points in the feature space. Shape = (n_obs, n_comp_CCA)
:param obs_n: If we want to generate samples from the posterior of a specific observation point, obs_n is the
index of the observation point.
:param n_posts: Number of posterior samples
:param mode: How to sample the posterior distribution
:param init_kde: Initial KDE function. If None, the KDE function is computed from the observed data.
:return: Samples from the posterior distribution (n_obs, n_posts, n_comp_CCA)
"""
if mode is not None:
self.mode = mode
# Set the seed for later use
if self.seed is None:
self.seed = np.random.randint(2**32 - 1, dtype="uint32")
if X_obs_f is None:
X_obs_f = self.X_obs_f
check_is_fitted(self.regression_model)
if n_posts is None:
n_posts = self.n_posts
else:
self.n_posts = n_posts
# Draw n_posts random samples from the multivariate normal distribution :
# Pay attention to the transpose operator
np.random.seed(self.seed)
if self.mode == "mvn": # Multivariate normal distribution
if obs_n is not None: # If we have a specific observation
post_mn = self.posterior_mean[obs_n].reshape(1, -1)
post_cv = self.posterior_covariance[obs_n].reshape(1, -1)
else:
post_mn = self.posterior_mean
post_cv = self.posterior_covariance
Y_samples = [] # Samples from the posterior
for _n, (mean, cov) in enumerate(zip(post_mn, post_cv, strict=False)):
Y_samples.append(
np.random.multivariate_normal(mean=mean, cov=cov, size=n_posts)
) # Draw n_posts samples from the multivariate normal distribution
if self.mode == "kde": # Kernel density estimation
n_obs = X_obs_f.shape[0] # Number of observations
Y_samples = np.zeros(
(n_obs, self.n_posts, self.kde_functions.shape[1])
) # Shape = (n_obs, n_posts, n_comp_CCA)
if obs_n is not None: # If we have a specific observation
kde_fn = self.kde_functions[obs_n].reshape(1, -1) # Shape = (1, n_comp_CCA)
else:
kde_fn = self.kde_functions # Shape = (n_obs, n_comp_CCA)
if init_kde is None:
# Parses the functions dict
for i, fun_per_comp in enumerate(kde_fn):
for j, fun in enumerate(fun_per_comp):
if fun["kind"] == "pdf": # If the function is a pdf
pdf = fun["function"]
uniform_samples = it_sampling( # Sample from the pdf
pdf=pdf,
num_samples=self.n_posts,
lower_bd=pdf.x.min(),
upper_bd=pdf.x.max(),
k=2**7 + 1,
)
elif fun["kind"] == "linear": # If the function is a linear interpolation
rel1d = fun["function"]
uniform_samples = np.ones(self.n_posts) * rel1d.predict(
np.array(X_obs_f[i][j].reshape(1, -1)) # check this line
) # Shape X_obs_f = (n_obs, n_components)
Y_samples[i, :, j] = uniform_samples # noqa
else: # If the KDE is already initialized
for i, fun_per_comp in enumerate(kde_fn): # Parses the function dict
for j, fun in enumerate(fun_per_comp):
pv = init_kde[i, j]
if fun["kind"] == "pdf":
pdf = fun["function"]
uniform_samples = it_sampling(
pdf=pdf,
num_samples=self.n_posts,
lower_bd=pdf.x.min(),
upper_bd=pdf.x.max(),
k=2**7 + 1,
cdf_y=pv,
)
elif fun["kind"] == "linear":
uniform_samples = np.ones(self.n_posts) * pv
Y_samples[i, :, j] = uniform_samples # noqa
if self.mode == "tm":
n_obs = X_obs_f.shape[0] # Number of observations
Y_samples = np.zeros(
(n_obs, self.n_posts, self.tm_functions.shape[1])
) # Shape = (n_obs, n_posts, n_comp_CCA)
if obs_n is not None: # If we have a specific observation
tm_fn = self.tm_functions[obs_n].reshape(1, -1) # Shape = (1, n_comp_CCA)
else:
tm_fn = self.tm_functions # Shape = (n_obs, n_comp_CCA)
# Parses the functions dict
for i, fun_per_comp in enumerate(tm_fn):
for j, fun in enumerate(fun_per_comp):
tm = fun["function"]
X = fun["X"]
N = X.shape[0]
# Map the target samples to the reference distribution; we only get a N-by-1
# vector because we only defined the map for the second dimension/column of X
if self.n_posts == N:
norm_samples = tm.map(X)
else:
norm_samples = stats.norm.rvs(size=(self.n_posts, 1))
N = self.n_posts
# Now define the value we wish to condition on
x1_obs = X_obs_f[i][j].reshape(-1)[0] # our 'observed' value
# In the inversion, we pretend that we have already inverted the first dimension
# of X and obtained x1_obs, so we create fake, pre-calculated values for it
X_precalc = np.ones((N, 1)) * x1_obs
# Now invert the map conditionally. X_star are the posterior samples.
X_star = tm.inverse_map(
X_precalc=X_precalc, Y=norm_samples
) # Only necessary when heuristic is deactivated
Y_samples[i, :, j] = X_star.reshape(-1) # noqa
return np.array(Y_samples) # noqa
[docs]
def kde_init(self, X_obs_f: np.array, obs_n: int = None):
"""Initialize the KDEs, i.e. the functions that will be used to sample
from the posterior distribution.
:param X_obs_f: Observed data points
:param obs_n: Observation number
:return: The initialized KDEs
"""
n_obs = X_obs_f.shape[0] # Number of observations
n_comp = X_obs_f.shape[1] # Number of components
init_samples = np.zeros((n_obs, n_comp), dtype="object") # Shape = (n_obs, n_comp)
if obs_n is not None: # If we have a specific observation
kde_fn = self.kde_functions[obs_n].reshape(1, -1) # Shape = (1, n_comp_CCA)
else:
kde_fn = self.kde_functions # Shape = (n_obs, n_comp_CCA)
# Parses the functions dict
for i, fun_per_comp in enumerate(kde_fn):
for j, fun in enumerate(fun_per_comp):
if fun["kind"] == "pdf":
pdf = fun["function"]
pv = it_sampling( # Sample from the pdf
pdf=pdf,
lower_bd=pdf.x.min(),
upper_bd=pdf.x.max(),
k=2**7
+ 1, # Number of samples. It is a power of 2 + 1 because Romberg integration will be used
return_cdf=True,
)
elif fun["kind"] == "linear":
rel1d = fun["function"]
pv = rel1d.predict( # Sample from the linear interpolation
np.array(X_obs_f[i][j].reshape(1, -1))
)
init_samples[i, j] = pv # noqa
return init_samples