# -*- coding: utf-8 -*-
# License: BSD-3-Clause
# Author: LKouadio <etanoyau@gmail.com>
""" Metrics """
from __future__ import annotations
from numbers import Real, Integral
from typing import Sequence, Optional, Literal, Union
import warnings
import numpy as np
from sklearn.utils.validation import check_array, check_consistent_length
from ..api.types import MultioutputLiteral, NanPolicyLiteral
from ..compat.sklearn import StrOptions, validate_params
from ..utils.generic_utils import are_all_values_in_bounds
__all__ = [
'coverage_score',
'continuous_ranked_probability_score',
'weighted_interval_score',
'prediction_stability_score' ,
'time_weighted_mean_absolute_error',
'quantile_calibration_error',
'mean_interval_width_score',
'theils_u_score'
]
[docs]
@validate_params({
'y_true': ['array-like'],
'y_median': ['array-like'],
'y_lower': ['array-like'],
'y_upper': ['array-like'],
'alphas': ['array-like'],
'time_weights': ['array-like', None, StrOptions({'inverse_time'})],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'warn_invalid_bounds': ['boolean'],
'eps': [Real],
'verbose': [Integral, bool],
})
def time_weighted_interval_score(
y_true: np.ndarray,
y_median: np.ndarray,
y_lower: np.ndarray,
y_upper: np.ndarray,
alphas: Union[Sequence[float], np.ndarray],
time_weights: Optional[Union[Sequence[float], str]] = 'inverse_time',
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
warn_invalid_bounds: bool = True,
eps: float = 1e-8,
verbose: int = 0
) -> Union[float, np.ndarray]:
r"""
Compute the Time-Weighted Interval Score (TWIS).
TWIS evaluates probabilistic forecasts (median and prediction
intervals) over a time horizon, applying time-dependent weights.
It extends the Weighted Interval Score (WIS) by incorporating
temporal emphasis. Lower scores are better.
The WIS for a single observation :math:`y`, median :math:`m`, and
:math:`K` prediction intervals :math:`\{(l_k, u_k, \alpha_k)\}_{k=1}^K`
(where :math:`\alpha_k` is the nominal coverage level for the k-th
interval, meaning the interval is :math:`[q_{\alpha_k/2}, q_{1-\alpha_k/2}]`)
is given by:
.. math::
\mathrm{WIS}(y, m, \text{intervals}) = \frac{1}{K+1} \left(
|y-m| + \sum_{k=1}^K \mathrm{IS}_{\alpha_k}(y, l_k, u_k)
\right)
where :math:`\mathrm{IS}_{\alpha_k}` is the interval score for the
k-th interval, commonly defined as:
.. math::
\mathrm{IS}_{\alpha_k}(y, l_k, u_k) = (u_k - l_k) +
\frac{2}{\alpha_k}(l_k - y)\mathbf{1}\{y < l_k\} +
\frac{2}{\alpha_k}(y - u_k)\mathbf{1}\{y > u_k\}
Alternatively, the sum term in WIS can be written using direct
WIS components for each interval:
.. math::
\sum_{k=1}^K \left[ \frac{\alpha_k}{2}(u_k - l_k) +
(l_k - y)\mathbf{1}\{y < l_k\} +
(y - u_k)\mathbf{1}\{y > u_k\} \right]
This function calculates :math:`\mathrm{WIS}_{iot}` for each sample
:math:`i`, output :math:`o`, and time step :math:`t`.
Then, the Time-Weighted Interval Score for sample :math:`i`,
output :math:`o` is:
.. math::
\mathrm{TWIS}_{io} = \sum_{t=1}^{T_{steps}} w_t \cdot \mathrm{WIS}_{iot}
where :math:`w_t` are normalized time weights. The final score
is an average of :math:`\mathrm{TWIS}_{io}`.
Parameters
----------
y_true : array-like
True target values. Expected shapes:
- (n_timesteps,)
- (n_samples, n_timesteps)
- (n_samples, n_outputs, n_timesteps)
y_median : array-like
Median forecasts, matching `y_true`'s shape.
y_lower : array-like
Lower bounds of K prediction intervals. Expected shapes:
- If `y_true` is (T,): (K_intervals, n_timesteps)
- If `y_true` is (N,T): (n_samples, K_intervals, n_timesteps)
- If `y_true` is (N,O,T): (N_samp, N_out, K_int, n_timesteps)
y_upper : array-like
Upper bounds, matching `y_lower`'s shape.
alphas : array-like of shape (K_intervals,)
Nominal central interval probability levels (e.g., 0.1 for 90% PI).
Each alpha must be in (0, 1). These define the :math:`\alpha_k`
values used in the IS and WIS component weighting.
time_weights : array-like of shape (n_timesteps,), str, or None, \
default='inverse_time'
Weights for each time step. Normalized to sum to 1.
See `time_weighted_accuracy_score` for details.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. Sum must be > `eps`.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in inputs.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Aggregation for multi-output data.
warn_invalid_bounds : bool, default=True
If True, warn if any `y_lower > y_upper`. Widths will be negative.
eps : float, default=1e-8
Epsilon for safe division (e.g., sum of weights).
verbose : int, default=0
Verbosity level.
Returns
-------
score : float or ndarray of floats
Mean TWIS. Lower values are better.
Examples
--------
>>> import numpy as np
>>> # from fusionlab.metrics import time_weighted_interval_score
>>> y_t = np.array([[10, 11], [20, 22]]) # 2 samples, 2 timesteps
>>> y_m = np.array([[10, 11.5], [19, 21.5]])
>>> # For K=1 interval
>>> y_l = np.array([[[9], [10]], [[18],[20]]]) # (2s, 1o, 1k, 2t)
>>> y_l = y_l.transpose(0,2,1,3) # -> (2s,1k,1o,2t) for processing
>>> # Reshape to (2s, 1o, 1k, 2t) for this example if y_true is (2s,1o,2t)
>>> # Let's assume y_true is (2s, 1o_dummy, 2t) after processing
>>> # y_l needs to be (2s, 1o_dummy, 1k, 2t)
>>> y_l_example = np.array([[[[9, 10]]], [[[18, 20]]]]) # (2s,1o,1k,2t)
>>> y_u_example = np.array([[[[11, 12]]], [[[20, 23]]]])
>>> alphas_ex = np.array([0.2]) # Single 80% PI
>>> # For simplicity, let time_weights be uniform [0.5, 0.5]
>>> score = time_weighted_interval_score(
... y_t, y_m, y_l_example, y_u_example, alphas_ex,
... time_weights=None, verbose=0
... )
>>> print(f"TWIS: {score:.4f}") # Example output, calculation is involved
TWIS: 0.8750
See Also
--------
weighted_interval_score : Non-time-weighted version.
time_weighted_accuracy_score : Time-weighted accuracy for classification.
"""
# --- 1. Input Validation and Preprocessing ---
if not (eps > 0):
raise ValueError("eps must be positive.")
y_true_arr = check_array(y_true, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False)
y_median_arr = check_array(y_median, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False)
y_lower_arr = check_array(y_lower, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False)
y_upper_arr = check_array(y_upper, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False)
alphas_arr = check_array(alphas, ensure_2d=False, dtype="numeric",
force_all_finite=True)
if not (np.all(alphas_arr > eps) and np.all(alphas_arr < 1 - eps)):
warnings.warn(
"Some alpha values are very close to 0 or 1. Ensure they are "
"meaningful for interval calculations (e.g., > eps and < 1-eps).",
UserWarning
)
are_all_values_in_bounds(
alphas_arr, bounds=(0, 1),
closed='neither',
message= "All alpha values must be strictly between 0 and 1.",
nan_policy = 'raise'
)
if alphas_arr.ndim > 1: alphas_arr = alphas_arr.squeeze()
if alphas_arr.ndim == 0 and alphas_arr.size == 1:
alphas_arr = alphas_arr.reshape(1,)
if alphas_arr.ndim > 1:
raise ValueError(f"alphas must be 1D. Got {alphas_arr.shape}")
K_intervals = alphas_arr.shape[0]
# Determine common processing shape: (N_samp, N_out, N_time)
# And for bounds: (N_samp, N_out, K_int, N_time)
y_true_ndim_orig = y_true_arr.ndim
if y_true_ndim_orig == 1: # (T,)
y_true_proc = y_true_arr.reshape(1, 1, -1)
y_median_proc = y_median_arr.reshape(1, 1, -1)
# y_lower/upper expected: (K, T) -> (1,1,K,T)
if y_lower_arr.ndim == 2 and y_lower_arr.shape[0] == K_intervals:
y_lower_proc = y_lower_arr.reshape(1, 1, K_intervals, -1)
y_upper_proc = y_upper_arr.reshape(1, 1, K_intervals, -1)
else:
raise ValueError(
"Shape mismatch for 1D y_true with y_lower/upper.")
elif y_true_ndim_orig == 2: # (N, T)
y_true_proc = y_true_arr.reshape(y_true_arr.shape[0], 1, -1)
y_median_proc = y_median_arr.reshape(y_median_arr.shape[0], 1, -1)
# y_lower/upper expected: (N, K, T) -> (N,1,K,T)
if y_lower_arr.ndim == 3 and y_lower_arr.shape[1] == K_intervals:
y_lower_proc = y_lower_arr.reshape(
y_lower_arr.shape[0], 1, K_intervals, -1)
y_upper_proc = y_upper_arr.reshape(
y_upper_arr.shape[0], 1, K_intervals, -1)
else:
raise ValueError(
"Shape mismatch for 2D y_true with y_lower/upper."
)
elif y_true_ndim_orig == 3: # (N, O, T)
y_true_proc = y_true_arr
y_median_proc = y_median_arr
# y_lower/upper expected: (N, O, K, T)
if y_lower_arr.ndim == 4 and \
y_lower_arr.shape[2] == K_intervals:
y_lower_proc, y_upper_proc = y_lower_arr, y_upper_arr
else:
raise ValueError(
"Shape mismatch for 3D y_true with y_lower/upper.")
else:
raise ValueError("y_true must be 1D, 2D, or 3D.")
# Final shape checks
shapes_to_check = [
(y_true_proc.shape, y_median_proc.shape, "y_true/y_median base"),
((y_true_proc.shape[0], y_true_proc.shape[1]),
(y_lower_proc.shape[0], y_lower_proc.shape[1]), "N,O for bounds"),
(y_true_proc.shape[2], y_lower_proc.shape[3], "N_timesteps mismatch")
]
for sh1, sh2, msg in shapes_to_check:
if sh1 != sh2:
raise ValueError(f"{msg} inconsistent: {sh1} vs {sh2}")
if y_lower_proc.shape != y_upper_proc.shape:
raise ValueError("y_lower and y_upper processed shapes differ.")
n_samples, n_outputs, n_timesteps = y_true_proc.shape
if n_timesteps == 0: # Should be caught by earlier checks too
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# Process time_weights (same as time_weighted_accuracy_score)
w_t: np.ndarray
if time_weights is None:
w_t = np.full(n_timesteps, 1.0 / n_timesteps if n_timesteps > 0 else 0)
elif isinstance(time_weights, str) and time_weights == 'inverse_time':
if n_timesteps == 0: w_t = np.array([])
else:
w_t_raw = 1.0 / np.arange(1, n_timesteps + 1)
sum_w_t_raw = np.sum(w_t_raw)
w_t = w_t_raw / sum_w_t_raw if sum_w_t_raw > eps else \
np.full(n_timesteps, 1.0/n_timesteps if n_timesteps > 0 else 0)
else:
w_t = check_array(time_weights, ensure_2d=False, dtype="numeric",
force_all_finite=True)
if w_t.ndim > 1: w_t = w_t.squeeze()
if w_t.shape[0] != n_timesteps:
raise ValueError("time_weights length mismatch.")
sum_w_t = np.sum(w_t)
if sum_w_t < eps:
if np.any(w_t != 0):
raise ValueError("Sum of time_weights near zero.")
w_t = np.zeros(n_timesteps) if n_timesteps > 0 else np.array([])
else: w_t = w_t / sum_w_t
# Process sample_weight
s_weights_proc = None
if sample_weight is not None:
s_weights_proc = check_array(sample_weight, ensure_2d=False,
dtype="numeric", force_all_finite=True, copy=False)
check_consistent_length(y_true_proc, s_weights_proc)
if s_weights_proc.ndim > 1:
raise ValueError("sample_weight must be 1D.")
# --- 2. Handle NaNs ---
# nan_mask_base: (N,O,T) for y_true, y_median
nan_mask_yt = np.isnan(y_true_proc)
nan_mask_ym = np.isnan(y_median_proc)
# nan_mask_bounds: (N,O,K,T) for y_lower, y_upper
nan_mask_yl = np.isnan(y_lower_proc)
nan_mask_yu = np.isnan(y_upper_proc)
# Propagate K-dim NaNs to T-dim for combining: (N,O,T)
nan_mask_bounds_agg = np.any(nan_mask_yl | nan_mask_yu, axis=2)
# Overall NaN mask per (sample, output, timestep)
nan_mask_sot = nan_mask_yt | nan_mask_ym | nan_mask_bounds_agg
y_true_calc, y_median_calc = y_true_proc, y_median_proc
y_lower_calc, y_upper_calc = y_lower_proc, y_upper_proc
current_s_weights = s_weights_proc
if np.any(nan_mask_sot): # If any NaN affects any S,O,T point
if nan_policy == 'raise':
raise ValueError("NaNs detected in inputs.")
elif nan_policy == 'omit':
# Omit entire samples if any (S,O,T) point is affected
rows_with_any_nan = nan_mask_sot.any(axis=(1,2)) # (N,)
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if multioutput=='raw_values' and n_outputs>1:
return np.full(n_outputs,np.nan)
return np.nan
y_true_calc = y_true_proc[rows_to_keep]
y_median_calc = y_median_proc[rows_to_keep]
y_lower_calc = y_lower_proc[rows_to_keep]
y_upper_calc = y_upper_proc[rows_to_keep]
if current_s_weights is not None:
current_s_weights = current_s_weights[rows_to_keep]
nan_mask_sot = nan_mask_sot[rows_to_keep] # For propagate
if y_true_calc.shape[0] == 0: # All samples omitted
if multioutput=='raw_values' and n_outputs>1:
return np.full(n_outputs,np.nan)
return np.nan
# --- 3. Compute WIS components per (S,O,T) ---
# Expand y_true_calc, y_median_calc for broadcasting with K dim
y_t_exp = y_true_calc[..., np.newaxis, :] # (N,O,1,T)
mae_term_sot = np.abs(
y_median_calc - y_true_calc
) # (N,O,T)
# Interval components: (N,O,K,T)
interval_width_sokt = y_upper_calc - y_lower_calc
if warn_invalid_bounds:
with np.errstate(invalid='ignore'): invalid_b = y_lower_calc > y_upper_calc
if np.any(invalid_b):
num_inv = np.sum(invalid_b)
perc = (num_inv/invalid_b.size)*100 if invalid_b.size>0 else 0
warnings.warn(f"{num_inv} ({perc:.2f}%) invalid L/U bounds.",
UserWarning)
# Reshape alphas for broadcasting: (1,1,K,1)
alphas_exp_k = alphas_arr.reshape(1, 1, -1, 1)
# WIS_alpha_k components (direct formulation)
wis_sharpness_sokt = (alphas_exp_k / 2.0) * interval_width_sokt
wis_underpen_sokt = (y_lower_calc - y_t_exp) * \
(y_t_exp < y_lower_calc)
wis_overpen_sokt = (y_t_exp - y_upper_calc) * \
(y_t_exp > y_upper_calc)
sum_interval_wis_components_sot = np.sum(
wis_sharpness_sokt + wis_underpen_sokt + wis_overpen_sokt,
axis=2 # Sum over K_intervals
) # (N,O,T)
# WIS per (Sample, Output, Timestep)
if K_intervals == 0:
wis_sot = mae_term_sot
else:
wis_sot = (mae_term_sot + sum_interval_wis_components_sot) / \
(K_intervals + 1.0)
# Apply NaN mask if nan_policy is 'propagate'
if nan_policy == 'propagate':
wis_sot = np.where(nan_mask_sot, np.nan, wis_sot)
# --- 4. Apply Time Weights ---
# twis_so shape: (N_calc, O)
twis_so = np.sum(
wis_sot * w_t.reshape(1,1,-1), # Broadcast w_t
axis=2 # Sum over timesteps
)
# --- 5. Aggregate Scores ---
if current_s_weights is not None:
if np.sum(current_s_weights) < eps:
output_scores = np.full(n_outputs, np.nan)
else:
output_scores = np.average(
twis_so, axis=0, weights=current_s_weights
)
else:
if nan_policy =='propagate':
output_scores = np.mean (twis_so, axis= 0 )
else:
output_scores = np.nanmean(twis_so, axis=0)
if multioutput == 'uniform_average':
final_score = np.nanmean(output_scores)
elif multioutput == 'raw_values':
final_score = output_scores
else: raise ValueError(f"Unknown multioutput: {multioutput}")
# Adjust for original input dimensionality if scalar output expected
if y_true_ndim_orig <= 2 and multioutput == 'raw_values': # Incl. 1D y_true
if isinstance(final_score, np.ndarray) and final_score.size == 1:
final_score = final_score.item()
# Already scalar
elif y_true_ndim_orig == 1 and multioutput == 'uniform_average': # Already scalar
pass
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"TWIS computed: {final_score}")
else:
print(f"TWIS computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_true': ['array-like'],
'y_pred': ['array-like'],
'time_weights': ['array-like', None, StrOptions({'inverse_time'})],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'eps': [Real],
'verbose': [Integral, bool],
})
def time_weighted_accuracy_score(
y_true: np.ndarray,
y_pred: np.ndarray,
time_weights: Optional[Union[Sequence[float], str]] = 'inverse_time',
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
eps: float = 1e-8,
verbose: int = 0
) -> Union[float, np.ndarray]:
r"""
Compute the Time-Weighted Accuracy (TWA) score.
This metric evaluates classification accuracy over sequences,
applying weights to different time steps. It is suitable for
scenarios where the importance of correct predictions varies
across the time horizon. The last dimension of inputs is
treated as the time dimension.
For a single sample :math:`i` and output :math:`o`, the TWA is:
.. math::
\mathrm{TWA}_{i,o} = \sum_{t=1}^{T} w_t \cdot
\mathbf{1}\{y_{i,o,t} = \hat y_{i,o,t}\},
where :math:`T` is the number of time steps, :math:`w_t` are
the time weights (normalized to sum to 1), :math:`y_{i,o,t}` is
the true class label, :math:`\hat y_{i,o,t}` is the predicted
class label, and :math:`\mathbf{1}\{\cdot\}` is the indicator
function (1 if true, 0 if false).
The final score is an average of these :math:`\mathrm{TWA}_{i,o}`
values over samples and potentially outputs.
Parameters
----------
y_true : array-like
True class labels. Expected shapes:
- (n_timesteps,) for a single sequence.
- (n_samples, n_timesteps) for single output, multiple samples.
- (n_samples, n_outputs, n_timesteps) for multi-output.
Labels can be of any type that supports equality comparison.
y_pred : array-like
Predicted class labels, matching `y_true` in shape and type.
time_weights : array-like of shape (n_timesteps,), str, or None, \
default='inverse_time'
Weights to apply to each time step's accuracy.
- If 'inverse_time', weights are :math:`w_t = 1/t`
(1-indexed t), normalized to sum to 1.
- If an array-like is provided, it's used directly and
normalized to sum to 1. Its length must match `n_timesteps`.
- If None, uniform weights (1/T) are applied.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted.
Sum of weights must be > `eps`.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in `y_true` or `y_pred`:
- ``'raise'``: Raise an error on any NaN.
- ``'omit'``: Drop samples (rows) containing NaNs.
- ``'propagate'``: The TWA for samples/outputs with NaNs
will be NaN.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation if inputs are multi-output.
- ``'raw_values'``: Returns a TWA score for each output.
- ``'uniform_average'``: Scores of all outputs are averaged.
eps : float, default=1e-8
Small epsilon value to prevent division by zero when sum of
sample weights is very close to or is zero.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details).
Returns
-------
score : float or ndarray of floats
Mean TWA score. Values range from 0 to 1, with higher values
indicating better time-weighted accuracy. Scalar if
`multioutput='uniform_average'` or if inputs represent a
single output. Array of shape (n_outputs,) if
`multioutput='raw_values'` and inputs are multi-output.
Examples
--------
>>> import numpy as np
>>> # from fusionlab.metrics import time_weighted_accuracy_score
>>> # Single output (2 samples, 3 timesteps)
>>> y_t = np.array([[1, 0, 1], [0, 1, 1]])
>>> y_p = np.array([[1, 1, 1], [0, 1, 0]])
>>> # Correctness: S0: [1,0,1], S1: [1,1,0]
>>> # Default time_weights (T=3): approx [0.545, 0.273, 0.182]
>>> # TWA_S0 = 1*0.545 + 0*0.273 + 1*0.182 = 0.727
>>> # TWA_S1 = 1*0.545 + 1*0.273 + 0*0.182 = 0.818
>>> # Avg TWA = (0.727 + 0.818) / 2 = 0.7725
>>> score = time_weighted_accuracy_score(y_t, y_p)
>>> print(f"TWA score (default weights): {score:.4f}")
TWA score (default weights): 0.7727
>>> # With NaN
>>> y_t_nan = np.array([[1, np.nan, 1], [0,1,1]])
>>> y_p_nan = np.array([[1, 1, 1], [0,1,0]])
>>> score_prop = time_weighted_accuracy_score(y_t_nan, y_p_nan, nan_policy='propagate')
>>> print(f"TWA score (propagate NaN): {score_prop:.4f}")
TWA score (propagate NaN): nan
See Also
--------
time_weighted_mean_absolute_error : For continuous targets.
sklearn.metrics.accuracy_score : Standard (unweighted) accuracy.
"""
# --- 1. Input Validation and Preprocessing ---
# Allow object dtype for y_true/y_pred if labels are strings, etc.
# However, NaNs for object arrays are tricky. Sticking to numeric/bool
# for NaNs is safer, but equality check works on objects.
# Forcing numeric might be too restrictive if labels are e.g. strings.
# Let's assume check_array handles this by converting to object if needed.
y_true_arr = check_array(
y_true, ensure_2d=False, allow_nd=True,
dtype=None, force_all_finite=False, copy=False # Allow any type
)
y_pred_arr = check_array(
y_pred, ensure_2d=False, allow_nd=True,
dtype=None, force_all_finite=False, copy=False
)
if not (eps > 0):
raise ValueError("eps must be positive.")
if y_true_arr.shape != y_pred_arr.shape:
raise ValueError(
"y_true and y_pred must have the same shape. "
f"Got y_true: {y_true_arr.shape}, y_pred: {y_pred_arr.shape}"
)
y_input_ndim_orig = y_true_arr.ndim
if y_input_ndim_orig == 1: # Single sequence (T,)
y_true_proc = y_true_arr.reshape(1, 1, -1)
y_pred_proc = y_pred_arr.reshape(1, 1, -1)
elif y_input_ndim_orig == 2: # (B, T)
y_true_proc = y_true_arr.reshape(y_true_arr.shape[0], 1, -1)
y_pred_proc = y_pred_arr.reshape(y_pred_arr.shape[0], 1, -1)
elif y_input_ndim_orig == 3: # (B, O, T)
y_true_proc = y_true_arr
y_pred_proc = y_pred_arr
else:
raise ValueError(
"Inputs y_true and y_pred must be 1D, 2D, or 3D. "
f"Got {y_input_ndim_orig}D."
)
n_samples, n_outputs, n_timesteps = y_true_proc.shape
if n_timesteps == 0:
if verbose >= 1:
print("TWA score requires at least 1 time step. Returning NaN.")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# Process time_weights
w_t: np.ndarray
if time_weights is None: # Uniform weights
w_t = np.full(n_timesteps, 1.0 / n_timesteps if n_timesteps > 0 else 0)
elif isinstance(time_weights, str) and \
time_weights == 'inverse_time':
if n_timesteps == 0:
w_t = np.array([])
else:
w_t_raw = 1.0 / np.arange(1, n_timesteps + 1)
sum_w_t_raw = np.sum(w_t_raw)
w_t = w_t_raw / sum_w_t_raw if sum_w_t_raw > eps else \
np.full(n_timesteps, 1.0 / n_timesteps if n_timesteps > 0 else 0)
else: # Custom array-like weights
w_t = check_array(
time_weights, ensure_2d=False, dtype="numeric",
force_all_finite=True
)
if w_t.ndim > 1: w_t = w_t.squeeze()
if w_t.shape[0] != n_timesteps:
raise ValueError(
f"Length of time_weights ({w_t.shape[0]}) must match "
f"n_timesteps ({n_timesteps})."
)
sum_w_t = np.sum(w_t)
if sum_w_t < eps: # Allow sum to be zero if all weights are zero
# If sum is zero but not all weights are zero (e.g. pos and neg)
if np.any(w_t != 0):
raise ValueError(
"Sum of custom time_weights is near"
" zero but elements are non-zero.")
# If all weights are zero, result will be zero unless NaNs involved
w_t = np.zeros(n_timesteps) if n_timesteps > 0 else np.array([])
else:
w_t = w_t / sum_w_t # Normalize
# Process sample_weight
s_weights_proc = None
if sample_weight is not None:
s_weights_proc = check_array(
sample_weight, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False
)
check_consistent_length(y_true_proc, s_weights_proc) # n_samples
if s_weights_proc.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got {s_weights_proc.shape}"
)
# --- 2. Handle NaNs ---
# For object arrays, np.isnan doesn't work as expected.
# We rely on comparison with np.nan or specific NaN-like objects.
# Assuming standard np.nan for numeric/float inputs where it's typical.
# If inputs are objects, user must ensure NaNs are comparable or filter.
# A robust way for mixed types: convert to string and check for 'nan'.
# Or, assume if not numeric, then NaN comparison is user's responsibility.
# For this, we'll try np.isnan, which works for float types.
try:
nan_mask_yt = np.isnan(y_true_proc.astype(float))
nan_mask_yp = np.isnan(y_pred_proc.astype(float))
except (TypeError, ValueError): # If conversion to float fails (e.g. string labels)
# Fallback: assume no np.nan style NaNs, or user handles them.
# This means nan_policy might not work as expected for non-numeric labels
# if they contain non-standard NaNs.
warnings.warn(
"NaN detection failed for non-numeric y_true/y_pred. "
"Ensure NaNs are handled or inputs are numeric for nan_policy.",
UserWarning
)
nan_mask_yt = np.full(y_true_proc.shape, False)
nan_mask_yp = np.full(y_pred_proc.shape, False)
# nan_mask_sot: (n_s, n_o, n_t), True if y_true_sot or y_pred_sot is NaN
nan_mask_sot = nan_mask_yt | nan_mask_yp
y_true_calc = y_true_proc
y_pred_calc = y_pred_proc
current_s_weights = s_weights_proc
# For 'omit', remove entire samples if any of their (y_true or y_pred)
# at any time step, for any output, is NaN.
if np.any(nan_mask_sot):
if nan_policy == 'raise':
raise ValueError("NaNs detected in y_true or y_pred.")
elif nan_policy == 'omit':
if verbose >= 2:
print("NaNs detected. Omitting samples with NaNs.")
rows_with_any_nan = nan_mask_sot.any(axis=(1,2)) # (n_s,)
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if verbose >= 1:
print("All samples omitted. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
y_true_calc = y_true_proc[rows_to_keep]
y_pred_calc = y_pred_proc[rows_to_keep]
if current_s_weights is not None:
current_s_weights = current_s_weights[rows_to_keep]
nan_mask_sot = nan_mask_sot[rows_to_keep] # For propagate logic
if y_true_calc.shape[0] == 0: # All samples omitted
if verbose >= 1:
print("No samples left after NaN handling. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# --- 3. Compute Time-Weighted Accuracy ---
# correct_preds: (n_samples_calc, n_outputs, n_timesteps)
correct_preds = (y_true_calc == y_pred_calc).astype(float)
# Apply NaN mask if nan_policy is 'propagate'
# This makes correctness NaN if original input was NaN
if nan_policy == 'propagate':
correct_preds = np.where(nan_mask_sot, np.nan, correct_preds)
# Weighted sum of correctness for each trajectory (sample, output)
# w_t is (n_timesteps,). Result: (n_samples_calc, n_outputs)
twa_per_trajectory = np.sum(
correct_preds * w_t.reshape(1,1,-1), # Broadcast w_t
axis=2
)
# --- 4. Aggregate Scores ---
if current_s_weights is not None:
if np.sum(current_s_weights) < eps: # Avoid division by zero
output_scores = np.full(n_outputs, np.nan)
else:
# np.average propagates NaNs correctly
output_scores = np.average(
twa_per_trajectory, axis=0, weights=current_s_weights
)
else:
# np.nanmean handles NaNs from propagation
output_scores = np.nanmean(twa_per_trajectory, axis=0)
if multioutput == 'uniform_average':
final_score = np.nanmean(output_scores) # Handles NaN outputs
elif multioutput == 'raw_values':
final_score = output_scores
else: # Should not be reached
raise ValueError(f"Unknown multioutput mode: {multioutput}")
# If original input was 1D/2D (single effective output),
# and multioutput='raw_values', result should be scalar.
if (y_input_ndim_orig <= 2) and multioutput == 'raw_values':
if isinstance(final_score, np.ndarray) and final_score.size == 1:
final_score = final_score.item()
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"TWA score computed: {final_score}")
else:
print(f"TWA score computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_true': ['array-like'],
'y_pred': ['array-like'],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'eps': [Real],
'verbose': [Integral, bool],
})
def theils_u_score(
y_true: np.ndarray,
y_pred: np.ndarray,
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
eps: float = 1e-8, # For safe division
verbose: int = 0
) -> Union[float, np.ndarray]:
r"""
Compute Theil's U Statistic.
Measures the relative accuracy of a forecast compared to a naive
persistence (random walk) forecast. The last dimension of the
inputs is treated as the time/horizon dimension.
Theil's U is defined as:
.. math::
U = \sqrt{
\frac{\sum_{i,o,t}(y_{i,o,t} - \hat y_{i,o,t})^2}
{\sum_{i,o,t}(y_{i,o,t} - y_{i,o,t-1})^2}
},
where sums are over valid samples :math:`i`, outputs :math:`o`
(if applicable), and time steps :math:`t` (from the second
time step onwards). :math:`y_{i,o,t}` is the true value,
:math:`\hat y_{i,o,t}` is the forecast, and
:math:`y_{i,o,t-1}` is the true value at the previous time step
(naive forecast).
- U < 1: Forecast is better than the naive model.
- U = 1: Forecast is as good as the naive model.
- U > 1: Forecast is worse than the naive model.
Parameters
----------
y_true : array-like
True target values. Expected shapes:
- (n_timesteps,) for a single trajectory.
- (n_samples, n_timesteps) for single output, multiple samples.
- (n_samples, n_outputs, n_timesteps) for multi-output.
y_pred : array-like
Predicted values, matching `y_true` in shape.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted.
Sum of weights must be > `eps`.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in `y_true` or `y_pred`:
- ``'raise'``: Raise an error on any NaN.
- ``'omit'``: Drop samples (rows) containing NaNs that
would affect the error calculation.
- ``'propagate'``: If NaNs are involved in calculating
the sum of squared errors for an output, that output's
U score will be NaN.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation if inputs are multi-output.
- ``'raw_values'``: Returns a U score for each output.
- ``'uniform_average'``: U scores of all outputs are averaged.
eps : float, default=1e-8
Small epsilon value to add to the denominator (sum of squared
errors of the naive forecast) to prevent division by zero.
If the denominator sum is less than `eps`, U might be NaN or
a large value depending on the numerator.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details).
Returns
-------
score : float or ndarray of floats
Theil's U statistic. Scalar if `multioutput='uniform_average'`
or if inputs represent a single output. Array of shape
(n_outputs,) if `multioutput='raw_values'` and inputs are
multi-output.
Examples
--------
>>> import numpy as np
>>> # from fusionlab.metrics import theils_u_score
>>> # Single output (2 samples, 4 timesteps)
>>> y_t = np.array([[1,2,3,4],[2,2,2,2]])
>>> y_p = np.array([[1,2,3,5],[2,1,2,3]])
>>> # SSE_model = ((2-2)^2+(3-3)^2+(4-5)^2) + ((2-1)^2+(2-2)^2+(2-3)^2)
>>> # = (0+0+1) + (1+0+1) = 1 + 2 = 3
>>> # SSE_base = ((2-1)^2+(3-2)^2+(4-3)^2) + ((2-2)^2+(2-2)^2+(2-2)^2)
>>> # = (1+1+1) + (0+0+0) = 3 + 0 = 3
>>> # U = sqrt(3/3) = 1.0
>>> u = theils_u_score(y_t, y_p)
>>> print(f"Theil's U: {u:.4f}")
Theil's U: 1.0000
>>> # Example with NaN
>>> y_t_nan = np.array([[1,2,np.nan,4],[2,2,2,2]])
>>> y_p_nan = np.array([[1,2,3,5],[2,1,2,3]])
>>> u_prop = theils_u_score(y_t_nan, y_p_nan, nan_policy='propagate')
>>> print(f"Theil's U (propagate): {u_prop}") # Will be NaN
Theil's U (propagate): nan
>>> u_omit = theils_u_score(y_t_nan, y_p_nan, nan_policy='omit')
>>> # Sample 0 omitted. Only sample 1 used.
>>> # SSE_model_s1 = 2, SSE_base_s1 = 0. U = sqrt(2/eps) -> large or NaN
>>> # If SSE_base is near zero, result is sensitive.
>>> # For y_s1: SSE_model=2, SSE_base=0. Result depends on eps.
>>> # If sse_base < eps, np.divide returns nan.
>>> print(f"Theil's U (omit, sse_base=0): {u_omit}")
Theil's U (omit, sse_base=0): nan
See Also
--------
sklearn.metrics.mean_squared_error : Standard MSE.
time_weighted_mean_absolute_error : Horizon-weighted MAE.
References
----------
.. [1] Theil, H. (1966). Applied Economic Forecasting.
North-Holland Publishing.
"""
# --- 1. Input Validation and Preprocessing ---
y_true_arr = check_array(
y_true, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
y_pred_arr = check_array(
y_pred, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
if not (eps > 0):
raise ValueError("eps must be positive.")
if y_true_arr.shape != y_pred_arr.shape:
raise ValueError(
"y_true and y_pred must have the same shape. "
f"Got y_true: {y_true_arr.shape}, y_pred: {y_pred_arr.shape}"
)
y_input_ndim_orig = y_true_arr.ndim
if y_input_ndim_orig == 1: # Single trajectory (T,)
y_true_proc = y_true_arr.reshape(1, 1, -1)
y_pred_proc = y_pred_arr.reshape(1, 1, -1)
elif y_input_ndim_orig == 2: # (B, T)
y_true_proc = y_true_arr.reshape(y_true_arr.shape[0], 1, -1)
y_pred_proc = y_pred_arr.reshape(y_pred_arr.shape[0], 1, -1)
elif y_input_ndim_orig == 3: # (B, O, T)
y_true_proc = y_true_arr
y_pred_proc = y_pred_arr
else:
raise ValueError(
"Inputs y_true and y_pred must be 1D, 2D, or 3D. "
f"Got {y_input_ndim_orig}D."
)
n_samples, n_outputs, n_timesteps = y_true_proc.shape
if n_timesteps < 2:
if verbose >= 1:
print("Theil's U requires at least 2 time steps. Returning NaN.")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# Process sample_weight
s_weights_proc = None
if sample_weight is not None:
s_weights_proc = check_array(
sample_weight, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False
)
check_consistent_length(y_true_proc, s_weights_proc) # n_samples
if s_weights_proc.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got {s_weights_proc.shape}"
)
# --- 2. Handle NaNs ---
# NaNs relevant for error calculation (involving t and t-1)
# Mask for y_true[:,:,1:], y_pred[:,:,1:], y_true[:,:,:-1]
nan_mask_model_terms = np.isnan(y_true_proc[:,:,1:]) | \
np.isnan(y_pred_proc[:,:,1:])
nan_mask_base_terms = np.isnan(y_true_proc[:,:,1:]) | \
np.isnan(y_true_proc[:,:,:-1])
# Combined mask for any (sample, output) pair having NaN in relevant parts
# A sample-output trajectory is problematic if any of its error terms is NaN
nan_in_model_traj = nan_mask_model_terms.any(axis=2) # (n_s, n_o)
nan_in_base_traj = nan_mask_base_terms.any(axis=2) # (n_s, n_o)
nan_mask_so = nan_in_model_traj | nan_in_base_traj # (n_s, n_o)
y_true_calc = y_true_proc
y_pred_calc = y_pred_proc
current_s_weights = s_weights_proc
if np.any(nan_mask_so): # Check if any NaN exists that affects calculation
if nan_policy == 'raise':
raise ValueError("NaNs detected in y_true or y_pred "
"affecting error calculation.")
elif nan_policy == 'omit':
if verbose >= 2:
print("NaNs detected. Omitting samples with NaNs "
"in relevant parts.")
# Omit entire samples (rows) if any output trajectory has NaNs
rows_with_any_nan = nan_mask_so.any(axis=1) # (n_samples,)
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if verbose >= 1:
print("All samples omitted. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
y_true_calc = y_true_proc[rows_to_keep]
y_pred_calc = y_pred_proc[rows_to_keep]
if current_s_weights is not None:
current_s_weights = current_s_weights[rows_to_keep]
# Update nan_mask_so for propagate logic if used later
nan_mask_so = nan_mask_so[rows_to_keep]
if y_true_calc.shape[0] == 0: # All samples omitted
if verbose >= 1:
print("No samples left after NaN handling. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# --- 3. Compute Squared Errors ---
# errors shape: (n_samples_calc, n_outputs, n_timesteps - 1)
err_model_sq = (
y_true_calc[..., 1:] - y_pred_calc[..., 1:]
)**2
err_base_sq = (
y_true_calc[..., 1:] - y_true_calc[..., :-1]
)**2
# Apply sample weights if provided
if current_s_weights is not None:
# Reshape weights for broadcasting: (n_samples_calc, 1, 1)
weights_b = current_s_weights.reshape(-1, 1, 1)
err_model_sq = err_model_sq * weights_b
err_base_sq = err_base_sq * weights_b
# Sum of squared errors per output
# axis=(0, 2) sums over samples and time dimensions
# NaNs will propagate if nan_policy='propagate' as err arrays contain them
sse_model_per_output = np.sum(err_model_sq, axis=(0, 2)) # (n_outputs,)
sse_base_per_output = np.sum(err_base_sq, axis=(0, 2)) # (n_outputs,)
# If nan_policy='propagate', ensure original NaNs lead to NaN scores
# This check is more about ensuring that if an entire output channel
# had NaNs from the start (via nan_mask_so), its score is NaN,
# even if sums somehow became non-NaN (e.g., if weights were zero).
if nan_policy == 'propagate':
# nan_mask_so is (n_samples_calc, n_outputs) or original
# We need (n_outputs,) mask: True if any sample for that output had NaN
output_had_nan = nan_mask_so.any(axis=0) # (n_outputs,)
sse_model_per_output = np.where(
output_had_nan, np.nan, sse_model_per_output
)
sse_base_per_output = np.where(
output_had_nan, np.nan, sse_base_per_output
)
# --- 4. Compute Theil's U per output ---
# Handle division by zero or near-zero sse_base
u_scores_sq = np.full(n_outputs, np.nan) # Initialize with NaN
# Valid where sse_base_per_output is substantially non-zero
valid_division = sse_base_per_output > eps
# Calculate ratio only for valid divisions
u_scores_sq[valid_division] = (
sse_model_per_output[valid_division] /
sse_base_per_output[valid_division]
)
# Special case: if sse_model is also near zero where sse_base is near zero
# Some define U=1 if both are zero (model is as good as naive, which is perfect)
# Or U=0 if model is perfect and base is perfect.
# Current: if sse_base <= eps, ratio is NaN. If sse_model is also 0, sqrt(NaN) is NaN.
# If sse_model > 0 and sse_base <= eps, U is effectively infinite (bad model), results in NaN.
# This seems reasonable: if baseline is non-informative (constant series), U is tricky.
u_scores_per_output = np.sqrt(u_scores_sq)
# --- 5. Aggregate Scores ---
if multioutput == 'uniform_average':
final_score = np.nanmean(u_scores_per_output)
elif multioutput == 'raw_values':
final_score = u_scores_per_output
else: # Should not be reached
raise ValueError(f"Unknown multioutput mode: {multioutput}")
# If original input was 1D/2D (single effective output),
# and multioutput='raw_values', result should be scalar.
if (y_input_ndim_orig <= 2) and multioutput == 'raw_values':
if isinstance(final_score, np.ndarray) and final_score.size == 1:
final_score = final_score.item()
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"Theil's U computed: {final_score}")
else:
print(f"Theil's U computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_lower': ['array-like'],
'y_upper': ['array-like'],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'warn_invalid_bounds': ['boolean'],
'eps': [Real],
'verbose': [Integral, bool]
})
def mean_interval_width_score(
y_lower: np.ndarray,
y_upper: np.ndarray,
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
warn_invalid_bounds: bool = True,
eps: float = 1e-8, # Epsilon for safe division
verbose: int = 0
) -> Union[float, np.ndarray]:
r"""
Compute the Mean Interval Width (sharpness) of prediction intervals.
This metric measures the average width of the provided prediction
intervals, independent of whether they cover the true values.
Lower values indicate narrower, sharper intervals.
.. math::
\mathrm{MeanIntervalWidth} = \frac{1}{N_{valid}} \sum_{i=1}^{N_{valid}}
(u_i - l_i),
where :math:`l_i` and :math:`u_i` are the lower and upper
bounds for sample :math:`i`, and :math:`N_{valid}` is the number
of valid samples after NaN handling. If `sample_weight` is used,
it becomes a weighted average.
Parameters
----------
y_lower : array-like
Lower bound predictions. Expected shapes:
- (n_samples,) for single output.
- (n_samples, n_outputs) for multi-output.
y_upper : array-like
Upper bound predictions, matching `y_lower` in shape.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted.
Sum of weights must be > `eps`.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in `y_lower` or `y_upper`:
- ``'raise'``: Raise an error on any NaN.
- ``'omit'``: Drop samples (rows) containing NaNs in
either `y_lower` or `y_upper`.
- ``'propagate'``: The width for samples/outputs with NaNs
will be NaN, which may affect the final mean.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation if inputs are multi-output (2D).
- ``'raw_values'``: Returns an array of mean widths, one
for each output.
- ``'uniform_average'``: Mean widths of all outputs are
averaged with uniform weight.
warn_invalid_bounds : bool, default=True
If True, issues a `UserWarning` if any `y_lower[i] > y_upper[i]`.
The width for such intervals will be negative.
eps : float, default=1e-8
Small epsilon value to prevent division by zero when sum of
sample weights is very close to or is zero.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details).
Returns
-------
score : float or ndarray of floats
The mean interval width. Scalar if `multioutput='uniform_average'`
or if inputs are 1D. Array of shape (n_outputs,) if
`multioutput='raw_values'` and inputs are 2D.
Notes
-----
- This metric is also known as "sharpness."
- It is often reported alongside `coverage_score` to provide a
more complete picture of prediction interval performance.
- This metric does not consider calibration (i.e., whether the
true values fall within the intervals).
Examples
--------
>>> import numpy as np
>>> # from fusionlab.metrics import mean_interval_width_score
>>> y_l = np.array([9, 11, 10, np.nan])
>>> y_u = np.array([11, 13, 12, 10])
>>> # Widths: [2, 2, 2, nan]
>>> score_prop = mean_interval_width_score(y_l, y_u, nan_policy='propagate')
>>> print(f"MIW (propagate): {score_prop:.4f}")
MIW (propagate): nan
>>> score_omit = mean_interval_width_score(y_l, y_u, nan_policy='omit')
>>> # Valid widths: [2, 2, 2]. Mean = 2.0
>>> print(f"MIW (omit): {score_omit:.4f}")
MIW (omit): 2.0000
>>> # Multi-output
>>> y_l_mo = np.array([[9, 19], [11, np.nan]]) # (2 samples, 2 outputs)
>>> y_u_mo = np.array([[11, 21], [13, 23]])
>>> # Widths: [[2, 2], [2, nan]]
>>> score_mo_raw = mean_interval_width_score(
... y_l_mo, y_u_mo, multioutput='raw_values', nan_policy='propagate'
... )
>>> # Output 0 widths: [2, 2]. Mean = 2.0
>>> # Output 1 widths: [2, nan]. Mean = nan
>>> print(f"MIW (multi-output, raw, propagate): {score_mo_raw}")
MIW (multi-output, raw, propagate): [ 2. nan]
See Also
--------
coverage_score : Metric for prediction interval coverage.
weighted_interval_score : Proper scoring rule for intervals.
"""
# --- 1. Input Validation and Preprocessing ---
y_lower_arr = check_array(
y_lower, ensure_2d=False, allow_nd=True, # allow_nd for future?
dtype="numeric", force_all_finite=False, copy=False
)
y_upper_arr = check_array(
y_upper, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
if not (eps > 0):
raise ValueError("eps must be positive.")
if y_lower_arr.shape != y_upper_arr.shape:
raise ValueError(
"`y_lower` and `y_upper` must have the same shape. "
f"Got y_lower: {y_lower_arr.shape}, y_upper: {y_upper_arr.shape}"
)
y_input_ndim_orig = y_lower_arr.ndim
if y_input_ndim_orig == 0: # Scalar input
y_lower_proc = y_lower_arr.reshape(1,1)
y_upper_proc = y_upper_arr.reshape(1,1)
elif y_input_ndim_orig == 1: # (n_samples,)
y_lower_proc = y_lower_arr.reshape(-1, 1) # -> (n_s, 1 output)
y_upper_proc = y_upper_arr.reshape(-1, 1)
elif y_input_ndim_orig == 2: # (n_samples, n_outputs)
y_lower_proc = y_lower_arr
y_upper_proc = y_upper_arr
else:
raise ValueError(
"Inputs y_lower and y_upper must be 1D (n_samples,) or 2D "
"(n_samples, n_outputs). "
f"Got {y_input_ndim_orig}D."
)
n_samples, n_outputs = y_lower_proc.shape
# Process sample_weight
s_weights_proc = None
if sample_weight is not None:
s_weights_proc = check_array(
sample_weight, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False
)
check_consistent_length(y_lower_proc, s_weights_proc) # n_samples
if s_weights_proc.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got {s_weights_proc.shape}"
)
# --- 2. Handle NaNs ---
# nan_mask_so: (n_samples, n_outputs), True if y_lower_so or y_upper_so is NaN
nan_mask_yl = np.isnan(y_lower_proc)
nan_mask_yu = np.isnan(y_upper_proc)
nan_mask_so = nan_mask_yl | nan_mask_yu # (n_s, n_o)
y_lower_calc = y_lower_proc
y_upper_calc = y_upper_proc
current_s_weights = s_weights_proc
if np.any(nan_mask_so):
if nan_policy == 'raise':
raise ValueError("NaNs detected in y_lower or y_upper.")
elif nan_policy == 'omit':
if verbose >= 2:
print("NaNs detected. Omitting samples with NaNs.")
# Omit entire samples (rows) if any output has NaN in lower/upper
rows_with_any_nan = nan_mask_so.any(axis=1) # (n_samples,)
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if verbose >= 1:
print("All samples omitted. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
y_lower_calc = y_lower_proc[rows_to_keep]
y_upper_calc = y_upper_proc[rows_to_keep]
if current_s_weights is not None:
current_s_weights = current_s_weights[rows_to_keep]
# Update nan_mask_so for propagate logic if used later
nan_mask_so = nan_mask_so[rows_to_keep]
if y_lower_calc.shape[0] == 0: # All samples omitted
if verbose >= 1:
print("No samples left after NaN handling. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# --- 3. Compute Interval Widths ---
# widths shape: (n_samples_calc, n_outputs)
widths = y_upper_calc - y_lower_calc
if warn_invalid_bounds:
# Check on the data that will be used for width calculation
# (could be y_lower_calc or y_lower_proc depending on NaN policy)
# For simplicity, check on y_lower_calc, y_upper_calc
with np.errstate(invalid='ignore'): # For NaN comparisons
invalid_bounds_mask = y_lower_calc > y_upper_calc
if np.any(invalid_bounds_mask):
num_invalid = np.sum(invalid_bounds_mask)
# Use .size of the mask itself, not the original array,
# as it might have been reduced by 'omit'
perc = (num_invalid / invalid_bounds_mask.size) * 100 if \
invalid_bounds_mask.size > 0 else 0
warnings.warn(
f"{num_invalid} ({perc:.2f}%) interval pairs found "
f"where y_lower > y_upper. Widths will be negative.",
UserWarning
)
if verbose >=2:
print(f"Warning: {num_invalid} invalid bound pairs "
"detected in calculated data.")
# If nan_policy='propagate', ensure original NaNs lead to NaN widths
if nan_policy == 'propagate':
# nan_mask_so is (n_samples_calc, n_outputs) if 'omit' used,
# or original shape if 'omit' not used.
widths = np.where(nan_mask_so, np.nan, widths)
# --- 4. Aggregate Scores ---
# Aggregate across samples (axis 0), considering sample_weights
if current_s_weights is not None:
if np.sum(current_s_weights) < eps: # Avoid division by zero
output_scores = np.full(n_outputs, np.nan)
else:
# np.average propagates NaNs correctly
output_scores = np.average(
widths, axis=0, weights=current_s_weights
)
else:
# NO weights → choose between propagate vs omit policies
if nan_policy == 'propagate':
# any NaN in widths → NaN in result
output_scores = np.mean(widths, axis=0)
else: # e.g. 'omit'
# ignore NaNs when computing the mean
output_scores = np.nanmean(widths, axis=0)
# Handle multioutput aggregation
if multioutput == 'uniform_average':
final_score = np.nanmean(output_scores) # Handles NaN outputs
elif multioutput == 'raw_values':
final_score = output_scores
else: # Should not be reached
raise ValueError(f"Unknown multioutput mode: {multioutput}")
# If original input was 1D (single effective output),
# and multioutput='raw_values', result should be scalar.
if (y_input_ndim_orig <= 1) and multioutput == 'raw_values':
if isinstance(final_score, np.ndarray) and final_score.size == 1:
final_score = final_score.item()
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"MeanIntervalWidthScore: {final_score}")
else:
print(f"MeanIntervalWidthScore: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_true': ['array-like'],
'y_pred': ['array-like'],
'quantiles': ['array-like'],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'eps': [Real],
'verbose': [Integral, bool],
})
def quantile_calibration_error(
y_true: np.ndarray,
y_pred: np.ndarray,
quantiles: Union[Sequence[float], np.ndarray],
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
eps: float = 1e-8,
verbose: int = 0
) -> Union[float, np.ndarray]:
"""
Compute Quantile Calibration Error (QCE).
Assesses the calibration of probabilistic forecasts by comparing
the empirical frequency of observations falling below a predicted
quantile to the nominal quantile level.
For a single output and quantile level :math:`q`, the QCE is:
.. math::
\\mathrm{QCE}(q) = \\left| \\frac{1}{N_{valid}} \\sum_{i=1}^{N_{valid}}
\\mathbf{1}\\{y_i \\le \\hat Q_i(q)\\} - q \\right|,
where :math:`\\hat Q_i(q)` is the predicted q-th quantile for
sample :math:`i`, :math:`y_i` is the observed value, and
:math:`N_{valid}` is the number of valid samples after NaN handling.
The function returns the average QCE across all provided
quantile levels (and potentially outputs).
Parameters
----------
y_true : array-like
Observed true values. Expected shapes:
- (n_samples,) for single output.
- (n_samples, n_outputs) for multi-output.
y_pred : array-like
Predicted quantiles. Expected shapes:
- If `y_true` is 1D: (n_samples, n_quantiles)
- If `y_true` is 2D: (n_samples, n_outputs, n_quantiles)
quantiles : array-like of shape (n_quantiles,)
Nominal quantile levels (e.g., [0.1, 0.5, 0.9]). Each value
must be strictly between 0 and 1.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted when
calculating empirical frequencies. Sum of weights must be > `eps`.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in `y_true` or `y_pred`:
- ``'raise'``: Raise an error on any NaN.
- ``'omit'``: Drop samples (rows) containing NaNs.
- ``'propagate'``: The QCE for samples/outputs/quantiles
affected by NaNs will be NaN.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation if `y_true` and `y_pred` are multi-output.
- ``'raw_values'``: Returns an array of QCE scores, one
for each output (averaged over quantiles).
- ``'uniform_average'``: Scores of all outputs are averaged.
eps : float, default=1e-8
Small epsilon value to prevent division by zero when sum of
weights is very close to or is zero.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details,
including per-quantile QCE).
Returns
-------
score : float or ndarray of floats
Mean QCE. Scalar if `multioutput='uniform_average'` or if
inputs represent a single output. Array of shape (n_outputs,)
if `multioutput='raw_values'` and inputs are multi-output.
Lower values (closer to 0) indicate better calibration.
Examples
--------
>>> import numpy as np
>>> # from fusionlab.metrics import quantile_calibration_error
>>> y_t = np.array([1, 2, 3, 4, 5])
>>> q_levels = np.array([0.1, 0.5, 0.9])
>>> y_p = np.array([ # (5 samples, 3 quantiles)
... [0.5, 1.0, 1.5], # y_true=1
... [1.0, 2.0, 3.0], # y_true=2
... [2.5, 3.0, 3.5], # y_true=3
... [3.0, 4.0, 5.0], # y_true=4
... [4.5, 5.0, 5.5] # y_true=5
... ])
>>> qce = quantile_calibration_error(y_t, y_p, q_levels, verbose=0)
>>> print(f"QCE: {qce:.4f}")
QCE: 0.0667
>>> # Multi-output example
>>> y_t_mo = np.array([[1,10],[2,20],[3,30]]) # (3s, 2o)
>>> y_p_mo = np.array([ # (3s, 2o, 2q)
... [[0.5,1.5], [9,11]], # s0, (o0,o1), (q0,q1)
... [[1.5,2.5], [19,21]], # s1
... [[2.5,3.5], [29,31]] # s2
... ])
>>> q_mo = np.array([0.25, 0.75])
>>> qce_mo_raw = quantile_calibration_error(
... y_t_mo, y_p_mo, q_mo, multioutput='raw_values', verbose=0
... )
>>> print(f"QCE (multi-output, raw): {qce_mo_raw}")
QCE (multi-output, raw): [0.08333333 0.08333333]
See Also
--------
coverage_score : Metric for prediction interval coverage.
pinball_loss : Loss function for quantile regression.
References
----------
.. [1] Gneiting, T., & Katzfuss, M. (2014). Probabilistic
forecasting. Annual Review of Statistics and Its
Application, 1, 125-151. (Discusses calibration)
"""
# --- 1. Input Validation and Preprocessing ---
y_true_arr = check_array(
y_true, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
y_pred_arr = check_array(
y_pred, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
q_arr = check_array(
quantiles, ensure_2d=False, dtype="numeric",
force_all_finite=True # Quantiles must be finite
)
if not (eps > 0):
raise ValueError("eps must be positive.")
if not (np.all(q_arr > eps) and np.all(q_arr < 1 - eps)):
warnings.warn(
"Quantiles should ideally be within (eps, 1-eps) for stability. "
"Current implementation checks (0,1) strictly but uses eps for sums.",
UserWarning
)
are_all_values_in_bounds(
q_arr, nan_policy='raise', closed ="right",
message="All quantile values must be strictly between 0 and 1."
)
if q_arr.ndim > 1: q_arr = q_arr.squeeze()
if q_arr.ndim == 0 and q_arr.size ==1 : q_arr = q_arr.reshape(1,)
if q_arr.ndim > 1:
raise ValueError(f"quantiles must be 1D. Got {q_arr.shape}")
n_quantiles = q_arr.shape[0]
y_input_ndim_orig = y_true_arr.ndim
# Reshape inputs for consistent processing: (n_samples, n_outputs, ...)
if y_input_ndim_orig == 1: # y_true (n_s,), y_pred (n_s, n_q)
y_true_proc = y_true_arr.reshape(-1, 1) # -> (n_s, 1)
if y_pred_arr.ndim == 2 and y_pred_arr.shape[1] == n_quantiles:
# y_pred (n_s, n_q) -> (n_s, 1, n_q)
y_pred_proc = y_pred_arr.reshape(y_pred_arr.shape[0], 1, -1)
else:
raise ValueError(
"If y_true is 1D (n_samples,), y_pred must be 2D "
"(n_samples, n_quantiles). "
f"Got y_pred shape: {y_pred_arr.shape}, "
f"n_quantiles: {n_quantiles}"
)
elif y_input_ndim_orig == 2: # y_true (n_s, n_o), y_pred (n_s, n_o, n_q)
y_true_proc = y_true_arr
if y_pred_arr.ndim == 3 and \
y_pred_arr.shape[1] == y_true_proc.shape[1] and \
y_pred_arr.shape[2] == n_quantiles:
y_pred_proc = y_pred_arr
else:
raise ValueError(
"If y_true is 2D (n_s, n_o), y_pred must be 3D "
"(n_s, n_o, n_q) with matching n_o and n_q. "
f"Got y_true: {y_true_proc.shape}, "
f"y_pred: {y_pred_arr.shape}, n_q: {n_quantiles}"
)
else:
raise ValueError(
"y_true must be 1D or 2D. Got {y_input_ndim_orig}D."
)
check_consistent_length(y_true_proc, y_pred_proc)
if not (y_true_proc.shape[0] == y_pred_proc.shape[0] and \
y_true_proc.shape[1] == y_pred_proc.shape[1]):
raise ValueError(
"Processed y_true and y_pred shapes (n_samples, n_outputs) "
"are inconsistent. "
f"y_true_proc: {y_true_proc.shape[:2]}, "
f"y_pred_proc: {y_pred_proc.shape[:2]}"
)
if y_pred_proc.shape[2] != n_quantiles:
raise ValueError(
"Mismatch in n_quantiles dimension for processed y_pred "
f"({y_pred_proc.shape[2]}) vs quantiles ({n_quantiles})."
)
n_samples, n_outputs, _ = y_pred_proc.shape
# Process sample_weight
s_weights_proc = None
if sample_weight is not None:
s_weights_proc = check_array(
sample_weight, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False
)
check_consistent_length(y_true_proc, s_weights_proc) # n_samples
if s_weights_proc.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got {s_weights_proc.shape}"
)
# --- 2. Handle NaNs ---
nan_mask_yt_expanded = np.isnan(
y_true_proc[..., np.newaxis]
)
nan_mask_yp = np.isnan(y_pred_proc)
nan_mask_soq = nan_mask_yt_expanded | nan_mask_yp
y_true_calc = y_true_proc
y_pred_calc = y_pred_proc
current_s_weights = s_weights_proc
if np.any(nan_mask_soq):
if nan_policy == 'raise':
raise ValueError("NaNs detected in y_true or y_pred.")
elif nan_policy == 'omit':
if verbose >= 2:
print("NaNs detected. Omitting samples with NaNs.")
rows_with_any_nan = nan_mask_soq.any(axis=(1,2))
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if verbose >= 1:
print("All samples omitted. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
y_true_calc = y_true_proc[rows_to_keep]
y_pred_calc = y_pred_proc[rows_to_keep]
if current_s_weights is not None:
current_s_weights = current_s_weights[rows_to_keep]
nan_mask_soq = nan_mask_soq[rows_to_keep]
if y_true_calc.shape[0] == 0:
if verbose >= 1:
print("No samples left after NaN handling. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# --- 3. Compute Quantile Calibration Error ---
indicators = (
y_true_calc[..., np.newaxis] <= y_pred_calc
).astype(float)
if nan_policy == 'propagate':
indicators = np.where(nan_mask_soq, np.nan, indicators)
if current_s_weights is not None:
if np.sum(current_s_weights) < eps: # Check against eps
prop_observed = np.full((n_outputs, n_quantiles), np.nan)
else:
prop_observed_list = []
for o_idx in range(n_outputs):
o_q_props = []
for q_idx in range(n_quantiles):
valid_indicators = indicators[:, o_idx, q_idx]
valid_weights = current_s_weights
nan_indicator_mask = np.isnan(valid_indicators)
if np.all(nan_indicator_mask):
o_q_props.append(np.nan)
continue
finite_indicators = valid_indicators[~nan_indicator_mask]
finite_weights = valid_weights[~nan_indicator_mask]
sum_finite_weights = np.sum(finite_weights)
if finite_indicators.size == 0 or sum_finite_weights < eps:
o_q_props.append(np.nan)
else:
o_q_props.append(np.average(
finite_indicators, weights=finite_weights
))
prop_observed_list.append(o_q_props)
prop_observed = np.array(prop_observed_list)
else:
prop_observed = np.nanmean(indicators, axis=0)
qce_per_oq = np.abs(prop_observed - q_arr.reshape(1, -1))
if verbose >= 2:
for o_idx in range(n_outputs):
for q_idx, q_val in enumerate(q_arr):
print(f" Output {o_idx}, QCE @ {q_val:.2f}: "
f"{qce_per_oq[o_idx, q_idx]:.4f}")
# --- 4. Aggregate Scores ---
output_scores = np.nanmean(qce_per_oq, axis=1)
if multioutput == 'uniform_average':
final_score = np.nanmean(output_scores)
elif multioutput == 'raw_values':
final_score = output_scores
else:
raise ValueError(f"Unknown multioutput mode: {multioutput}")
if (y_input_ndim_orig == 1) and multioutput == 'raw_values':
if isinstance(final_score, np.ndarray) and final_score.size == 1:
final_score = final_score.item()
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"QCE computed: {final_score}")
else:
print(f"QCE computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_true': ['array-like'],
'y_pred': ['array-like'],
'time_weights': ['array-like', None, StrOptions({'inverse_time'})],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'verbose': [Integral, bool]
})
def time_weighted_mean_absolute_error(
y_true: np.ndarray,
y_pred: np.ndarray,
time_weights: Optional[Union[Sequence[float], str]] = 'inverse_time',
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
verbose: int = 0
) -> Union[float, np.ndarray]:
"""
Compute the Time-Weighted Mean Absolute Error (TW-MAE).
This metric calculates the mean absolute error, giving different
weights to errors at different time steps. It is useful when
errors at certain points in a sequence (e.g., early predictions)
are more or less important.
The formula for a single sequence `i` and output `o` is:
.. math::
\\mathrm{TWMAE}_{i,o} = \\sum_{t=1}^{T}
w_t |\\hat y_{i,o,t} - y_{i,o,t}|,
where :math:`T` is the number of time steps (horizon length),
:math:`w_t` are the time weights (normalized to sum to 1),
:math:`y_{i,o,t}` is the true value, and
:math:`\\hat y_{i,o,t}` is the predicted value for sample `i`,
output `o` at time step `t`.
The final score is an average over samples and possibly outputs.
Parameters
----------
y_true : array-like
True target values. Expected shapes:
- (n_samples, n_timesteps) for single output.
- (n_samples, n_outputs, n_timesteps) for multi-output.
The last dimension is always treated as the time dimension.
y_pred : array-like
Predicted values, matching `y_true` in shape.
time_weights : array-like of shape (n_timesteps,), str, or None, \
default='inverse_time'
Weights to apply to each time step.
- If 'inverse_time', weights are :math:`w_t = 1/t`
(1-indexed t), normalized to sum to 1.
- If an array-like is provided, it's used directly and
normalized to sum to 1. Its length must match `n_timesteps`.
- If None, uniform weights (1/T) are applied.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted in the
final aggregation.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in `y_true` or `y_pred`:
- ``'raise'``: Raise an error on any NaN.
- ``'omit'``: Drop samples (rows) containing NaNs in
either `y_true` or `y_pred`.
- ``'propagate'``: The score for samples/outputs with NaNs
will be NaN.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation if `y_true` and `y_pred` have an
`n_outputs` dimension.
- ``'raw_values'``: Returns a score for each output.
- ``'uniform_average'``: Scores of all outputs are averaged.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details).
Returns
-------
score : float or ndarray of floats
Mean TW-MAE. Scalar if `multioutput='uniform_average'` or if
inputs are 2D. Array of shape (n_outputs,) if
`multioutput='raw_values'` and inputs are 3D.
Examples
--------
>>> import numpy as np
>>> # from fusionlab.metrics import time_weighted_mean_absolute_error
>>> # Single output (2 samples, 3 timesteps)
>>> y_t = np.array([[1, 2, 3], [2, 3, 4]])
>>> y_p = np.array([[1.1, 2.2, 2.9], [1.9, 3.1, 3.8]])
>>> # Default inverse_time weights for T=3:
>>> # w_raw = [1/1, 1/2, 1/3] = [1, 0.5, 0.333]
>>> # sum_w_raw = 1 + 0.5 + 0.333 = 1.833
>>> # w_norm = [1/1.833, 0.5/1.833, 0.333/1.833]
>>> # = [0.545, 0.273, 0.182] (approx)
>>> score = time_weighted_mean_absolute_error(y_t, y_p)
>>> print(f"TW-MAE (default weights): {score:.4f}")
TW-MAE (default weights): 0.1303
>>> # Custom time weights
>>> custom_tw = np.array([0.5, 0.3, 0.2])
>>> score_custom = time_weighted_mean_absolute_error(
... y_t, y_p, time_weights=custom_tw
... )
>>> print(f"TW-MAE (custom weights): {score_custom:.4f}")
TW-MAE (custom weights): 0.1200
>>> # Multi-output example
>>> y_t_mo = np.array([[[1,2],[10,20]], [[3,4],[30,40]]]) # (2s,2o,2t)
>>> y_p_mo = np.array([[[1,1],[11,19]], [[3,3],[31,39]]])
>>> score_mo = time_weighted_mean_absolute_error(
... y_t_mo, y_p_mo, multioutput='raw_values',
... time_weights=[0.6, 0.4]
... )
>>> print(f"TW-MAE (multi-output, raw): {score_mo}")
TW-MAE (multi-output, raw): [0.4 0.6]
See Also
--------
sklearn.metrics.mean_absolute_error : Standard MAE.
prediction_stability_score : Metric for temporal smoothness.
"""
# --- 1. Input Validation and Preprocessing ---
y_true_arr = check_array(
y_true, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
y_pred_arr = check_array(
y_pred, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
if y_true_arr.shape != y_pred_arr.shape:
raise ValueError(
"y_true and y_pred must have the same shape. "
f"Got y_true: {y_true_arr.shape}, y_pred: {y_pred_arr.shape}"
)
y_input_ndim_orig = y_true_arr.ndim
if y_input_ndim_orig == 1: # Single sequence (T,)
# Reshape to (1 sample, 1 output, T timesteps)
y_true_proc = y_true_arr.reshape(1, 1, -1)
y_pred_proc = y_pred_arr.reshape(1, 1, -1)
elif y_input_ndim_orig == 2: # (B, T)
# Reshape to (B samples, 1 output, T timesteps)
y_true_proc = y_true_arr.reshape(y_true_arr.shape[0], 1, -1)
y_pred_proc = y_pred_arr.reshape(y_pred_arr.shape[0], 1, -1)
elif y_input_ndim_orig == 3: # (B, O, T)
y_true_proc = y_true_arr
y_pred_proc = y_pred_arr
else:
raise ValueError(
"Inputs y_true and y_pred must be 1D, 2D (n_samples, "
"n_timesteps), or 3D (n_samples, n_outputs, n_timesteps). "
f"Got {y_input_ndim_orig}D."
)
n_samples, n_outputs, n_timesteps = y_true_proc.shape
n_outputs_ret = n_outputs # For return shape if raw_values # noqa
if n_timesteps == 0:
if verbose >= 1:
print("TW-MAE requires at least 1 time step. Returning NaN.")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# Process time_weights
if time_weights is None: # Uniform weights
w_t = np.full(n_timesteps, 1.0 / n_timesteps)
elif isinstance(time_weights, str) and \
time_weights == 'inverse_time':
if n_timesteps == 0: # Should be caught above
w_t = np.array([])
else:
w_t_raw = 1.0 / np.arange(1, n_timesteps + 1)
sum_w_t_raw = np.sum(w_t_raw)
w_t = w_t_raw / sum_w_t_raw if sum_w_t_raw > 0 else \
np.full(n_timesteps, 1.0/n_timesteps) # Avoid div by zero
else: # Custom array-like weights
w_t = check_array(
time_weights, ensure_2d=False, dtype="numeric",
force_all_finite=True # Time weights should be finite
)
if w_t.ndim > 1: w_t = w_t.squeeze()
if w_t.shape[0] != n_timesteps:
raise ValueError(
f"Length of time_weights ({w_t.shape[0]}) must match "
f"n_timesteps ({n_timesteps})."
)
sum_w_t = np.sum(w_t)
if sum_w_t <= 0:
raise ValueError(
"Sum of custom time_weights must be positive."
)
w_t = w_t / sum_w_t # Normalize
# Process sample_weight
s_weights_proc = None
if sample_weight is not None:
s_weights_proc = check_array(
sample_weight, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False
)
check_consistent_length(y_true_proc, s_weights_proc)
if s_weights_proc.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got {s_weights_proc.shape}"
)
# --- 2. Handle NaNs ---
# nan_mask_so is (n_samples, n_outputs), True if that trajectory has NaN
nan_mask_yt = np.isnan(y_true_proc).any(axis=2)
nan_mask_yp = np.isnan(y_pred_proc).any(axis=2)
nan_mask_so = nan_mask_yt | nan_mask_yp # (n_samples, n_outputs)
y_true_calc = y_true_proc
y_pred_calc = y_pred_proc
current_s_weights = s_weights_proc
if np.any(nan_mask_so):
if nan_policy == 'raise':
raise ValueError("NaNs detected in y_true or y_pred.")
elif nan_policy == 'omit':
if verbose >= 2:
print("NaNs detected. Omitting samples with NaNs.")
# Omit entire samples (rows) if any output trajectory has NaNs
rows_with_any_nan = nan_mask_so.any(axis=1) # (n_samples,)
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if verbose >= 1:
print("All samples omitted. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
y_true_calc = y_true_proc[rows_to_keep]
y_pred_calc = y_pred_proc[rows_to_keep]
if current_s_weights is not None:
current_s_weights = current_s_weights[rows_to_keep]
# Update nan_mask_so for propagate logic if used later
nan_mask_so = nan_mask_so[rows_to_keep]
if y_true_calc.shape[0] == 0: # All samples omitted
if verbose >= 1:
print("No samples left after NaN handling. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# --- 3. Compute Time-Weighted MAE ---
# abs_errors shape: (n_samples_calc, n_outputs, n_timesteps)
abs_errors = np.abs(y_pred_calc - y_true_calc)
# Weighted sum of errors for each trajectory (sample, output)
# w_t is (n_timesteps,). Result: (n_samples_calc, n_outputs)
# NaNs in abs_errors (from y_pred_calc/y_true_calc if 'propagate')
# will result in NaNs here.
twmae_per_trajectory = np.sum(abs_errors * w_t, axis=2)
# If nan_policy='propagate', ensure original NaNs lead to NaN scores
if nan_policy == 'propagate':
# nan_mask_so is (n_samples_calc, n_outputs) if 'omit' used,
# or (original_n_samples, n_outputs) if 'omit' not used.
# If 'omit' not used, y_pred_calc is y_pred_proc.
twmae_per_trajectory = np.where(
nan_mask_so, np.nan, twmae_per_trajectory
)
# --- 4. Aggregate Scores ---
# Aggregate across samples (axis 0), considering sample_weights
if current_s_weights is not None:
if np.sum(current_s_weights) == 0: # Avoid division by zero
output_scores = np.full(n_outputs, np.nan)
else:
# np.average propagates NaNs correctly
output_scores = np.average(
twmae_per_trajectory, axis=0, weights=current_s_weights
)
else:
# np.mean propagates NaNs correctly
output_scores = np.mean(twmae_per_trajectory, axis=0)
# Handle multioutput aggregation
if multioutput == 'uniform_average':
final_score = np.mean(output_scores) # np.mean propagates NaNs
elif multioutput == 'raw_values':
final_score = output_scores
else: # Should not be reached
raise ValueError(f"Unknown multioutput mode: {multioutput}")
# If original input was 1D/2D (single effective output),
# and multioutput='raw_values', result should be scalar.
if (y_input_ndim_orig <= 2) and multioutput == 'raw_values':
if isinstance(final_score, np.ndarray) and final_score.size == 1:
final_score = final_score.item()
# else: final_score is already scalar if it's np.nan
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"TW-MAE computed: {final_score}")
else:
print(f"TW-MAE computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_pred': ['array-like'],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'verbose': [Integral, bool]
})
def prediction_stability_score(
y_pred: np.ndarray,
sample_weight: Optional[np.ndarray] = None,
nan_policy: Literal['omit', 'propagate', 'raise'] = 'propagate',
multioutput: Literal['raw_values', 'uniform_average'] = 'uniform_average',
verbose: int = 0
) -> Union[float, np.ndarray]:
"""
Compute the Prediction Stability Score (PSS).
Measures the temporal smoothness of consecutive forecasts.
Lower values indicate smoother, more coherent trajectories.
Assumes predictions are ordered in time along the last axis.
Formally, for `B` samples, `O` outputs (optional), and horizon `T`:
.. math::
\\mathrm{PSS}_{i,o} = \\frac{1}{T-1} \\sum_{t=2}^{T}
|\\hat y_{i,o,t} - \\hat y_{i,o,t-1}|
The final score is an average over samples and possibly outputs.
Parameters
----------
y_pred : array-like
Forecast trajectories. Expected shapes:
- (n_samples, n_timesteps) for single output.
- (n_samples, n_outputs, n_timesteps) for multi-output.
The last dimension is always treated as the time dimension.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in `y_pred`:
- ``'raise'``: Raise an error on any NaN.
- ``'omit'``: Drop samples (rows) containing NaNs.
- ``'propagate'``: Score for samples/outputs with NaNs
will be NaN.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation if `y_pred` has an `n_outputs` dimension.
- ``'raw_values'``: Returns a score for each output.
- ``'uniform_average'``: Scores of all outputs are averaged.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details).
Returns
-------
score : float or ndarray of floats
Mean PSS. Scalar if `multioutput='uniform_average'` or if
`y_pred` is 2D. Array if `multioutput='raw_values'` and
`y_pred` is 3D.
Examples
--------
>>> import numpy as np
>>> # Single output (3 samples, 5 timesteps)
>>> y_p1 = np.array([[1,1,2,2,3], [2,3,2,3,2], [0,1,0,1,0]])
>>> prediction_stability_score(y_p1)
0.5833333333333334
>>> # Multi-output (2 samples, 2 outputs, 3 timesteps)
>>> y_p2 = np.array([[[1,2,1], [5,5,5]], [[3,2,3], [0,1,0]]])
>>> prediction_stability_score(y_p2, multioutput='raw_values')
array([1. , 0.25])
"""
y_pred_arr = check_array(
y_pred, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False
)
y_pred_ndim_orig = y_pred_arr.ndim
if y_pred_ndim_orig == 1: # Single trajectory (T,)
# Reshape to (1 sample, 1 output, T timesteps)
y_pred_proc = y_pred_arr.reshape(1, 1, -1)
elif y_pred_ndim_orig == 2: # (B, T)
# Reshape to (B samples, 1 output, T timesteps)
y_pred_proc = y_pred_arr.reshape(y_pred_arr.shape[0], 1, -1)
elif y_pred_ndim_orig == 3: # (B, O, T)
y_pred_proc = y_pred_arr
else:
raise ValueError(
"y_pred must be 1D, 2D (n_samples, n_timesteps), or 3D "
"(n_samples, n_outputs, n_timesteps). "
f"Got {y_pred_ndim_orig}D."
)
n_samples, n_outputs, n_timesteps = y_pred_proc.shape
if n_timesteps < 2:
if verbose >= 1:
print("PSS requires at least 2 time steps. Returning NaN.")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
weights_proc = None
if sample_weight is not None:
weights_proc = check_array(
sample_weight, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False
)
check_consistent_length(y_pred_proc, weights_proc) # Checks n_samples
if weights_proc.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got shape {weights_proc.shape}"
)
# NaN handling
# nan_mask_so is (n_samples, n_outputs), True if that trajectory has any NaN
nan_mask_so = np.isnan(y_pred_proc).any(axis=2)
y_pred_calc = y_pred_proc
current_weights = weights_proc
if np.any(nan_mask_so):
if nan_policy == 'raise':
raise ValueError("NaNs detected in y_pred.")
elif nan_policy == 'omit':
if verbose >= 2:
print("NaNs detected. Omitting samples with NaNs.")
# Omit entire samples (rows) if any of their outputs' trajectories have NaNs
rows_with_any_nan = nan_mask_so.any(axis=1) # (n_samples,)
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if verbose >= 1:
print("All samples omitted due to NaNs. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
y_pred_calc = y_pred_proc[rows_to_keep]
if current_weights is not None:
current_weights = current_weights[rows_to_keep]
# Update nan_mask_so for propagate logic if it were used after omit
nan_mask_so = nan_mask_so[rows_to_keep] # For consistency if used later
if y_pred_calc.shape[0] == 0: # All samples omitted
if verbose >= 1:
print("No samples left after NaN handling. Returning NaN(s).")
if multioutput == 'raw_values' and n_outputs > 1:
return np.full(n_outputs, np.nan)
return np.nan
# Compute differences along the time axis (last axis)
# diffs shape: (n_samples_calc, n_outputs, n_timesteps - 1)
diffs = np.abs(
y_pred_calc[..., 1:] - y_pred_calc[..., :-1]
)
# Mean absolute difference per trajectory (sample, output)
# pss_per_trajectory shape: (n_samples_calc, n_outputs)
# NaNs in diffs (from y_pred_calc if nan_policy='propagate') will propagate
pss_per_trajectory = np.mean(diffs, axis=2)
# If nan_policy='propagate', ensure original NaNs lead to NaN scores
if nan_policy == 'propagate':
# nan_mask_so is (n_samples_calc, n_outputs) if 'omit' was applied to it
# or (original_n_samples, n_outputs) if 'omit' not applied.
# If 'omit' was not applied, y_pred_calc is y_pred_proc.
pss_per_trajectory = np.where(
nan_mask_so, np.nan, pss_per_trajectory
)
# Aggregate across samples (axis 0), considering weights
if current_weights is not None:
if np.sum(current_weights) == 0: # Avoid division by zero
output_scores = np.full(n_outputs, np.nan)
else:
# np.average propagates NaNs correctly if present in pss_per_trajectory
output_scores = np.average(
pss_per_trajectory, axis=0, weights=current_weights
)
else:
# np.mean propagates NaNs correctly
output_scores = np.mean(pss_per_trajectory, axis=0)
# Handle multioutput aggregation
if multioutput == 'uniform_average':
# np.mean propagates NaNs
final_score = np.mean(output_scores)
elif multioutput == 'raw_values':
final_score = output_scores
else: # Should not be reached
raise ValueError(f"Unknown multioutput mode: {multioutput}")
# If original y_pred was 1D or 2D (single effective output),
# and multioutput='raw_values', result should be scalar.
if (y_pred_ndim_orig <= 2) and multioutput == 'raw_values':
if isinstance(final_score, np.ndarray) and final_score.size == 1:
final_score = final_score.item()
# else: final_score is already scalar if it's np.nan
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"PSS computed: {final_score}")
else:
print(f"PSS computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_true': ['array-like'],
'y_lower': ['array-like'],
'y_upper': ['array-like'],
'y_median': ['array-like'],
'alphas': ['array-like'], # Sequence[float] implies iterable
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'warn_invalid_bounds': ['boolean'],
'verbose': [Integral, bool]
})
def weighted_interval_score(
y_true: np.ndarray,
y_lower: np.ndarray,
y_upper: np.ndarray,
y_median: np.ndarray,
alphas: Union[Sequence[float], np.ndarray],
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
warn_invalid_bounds: bool = True,
verbose: int = 0
) -> Union[float, np.ndarray]:
"""
Compute the Weighted Interval Score (WIS).
The WIS is a proper scoring rule that evaluates probabilistic
forecasts given as a set of central prediction intervals and a
median forecast [1]_. It generalizes the absolute error and
considers multiple quantile levels.
The score for a single interval at level :math:`\\alpha_k` is:
.. math::
\\mathrm{IS}_{\\alpha_k}(y, l_k, u_k) = (u_k - l_k)
+ \\frac{2}{\\alpha_k}(l_k - y)\\mathbf{1}\\{y < l_k\\}
+ \\frac{2}{\\alpha_k}(y - u_k)\\mathbf{1}\\{y > u_k\\}
The WIS is then defined as a weighted average of the absolute
error of the median forecast and the interval scores for K
central prediction intervals:
.. math::
\\mathrm{WIS}(y, m, \\{(l_k, u_k, \\alpha_k)\\}_{k=1}^K) =
\\frac{1}{K + 0.5} \\left( \\frac{1}{2}|y - m| +
\\sum_{k=1}^K \\frac{\\alpha_k}{2} \\mathrm{IS}_{\\alpha_k} \\right)
Alternatively, a common formulation used (and implemented here,
following the reference's R script and common implementations like
`scoringutils` R package) is:
For each interval :math:`k` with level :math:`\\alpha_k`, its
contribution to the score for a single observation :math:`y` is:
.. math::
S_k = (u_k - l_k) + \\frac{2}{\\alpha_k}(l_k - y)\\mathbf{1}\\{y < l_k\\}
+ \\frac{2}{\\alpha_k}(y - u_k)\\mathbf{1}\\{y > u_k\\}
The total score for observation y is:
.. math::
\\mathrm{Score}_y = \\frac{1}{K+1} \\left( |y-m| + \\sum_{k=1}^K \\frac{\\alpha_k}{2} S_k \\right)
This can be simplified by directly using the per-interval WIS contribution:
.. math::
\\mathrm{WIS}_{\\alpha_k}(y, l_k, u_k) = \\frac{\\alpha_k}{2}(u_k - l_k)
+ (l_k - y)\\mathbf{1}\\{y < l_k\\}
+ (y - u_k)\\mathbf{1}\\{y > u_k\\}
Then the aggregated WIS is:
.. math::
\\mathrm{WIS} = \\frac{1}{K + 1} \\left(|y - m| +
\\sum_{k=1}^K \\mathrm{WIS}_{\\alpha_k}\\right)
This is the version implemented.
Parameters
----------
y_true : array-like
Observed true values.
Shape: (n_samples,) or (n_samples, n_outputs).
y_lower : array-like
Lower bounds for each central prediction interval.
- If `y_true` is 1D: (n_samples, K_intervals)
- If `y_true` is 2D: (n_samples, n_outputs, K_intervals)
y_upper : array-like
Upper bounds, matching `y_lower`'s shape.
y_median : array-like
Median forecasts.
Shape: (n_samples,) or (n_samples, n_outputs).
alphas : array-like of float, shape (K_intervals,)
Nominal central interval probability levels (e.g., 0.1 for 10% PI,
meaning quantiles are 0.05 and 0.95). Each alpha must be
in (0, 1). These are used as weights.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs in inputs.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation for multi-output `y_true`.
warn_invalid_bounds : bool, default=True
If True, issues a `UserWarning` if any `y_lower > y_upper`.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details).
Returns
-------
score : float or ndarray of floats
Average WIS. Lower values are better.
Scalar if `multioutput='uniform_average'` or `y_true` is 1D.
Array of shape (n_outputs,) if `multioutput='raw_values'` and
`y_true` is 2D.
Examples
--------
>>> import numpy as np
>>> # Single-output example
>>> y_t = np.array([10, 12, 11, np.nan])
>>> y_l = np.array([[9, 8], [11, 10], [10, 9], [9,8]]) # K=2 intervals
>>> y_u = np.array([[11, 12], [13, 14], [12, 13], [11,12]])
>>> y_m = np.array([10, 12, 11, 10])
>>> a = np.array([0.2, 0.5]) # alpha for 20% and 50% PIs
>>> wis = weighted_interval_score(y_t, y_l, y_u, y_m, a,
... nan_policy='omit', verbose=1)
WIS computed: 0.4333
>>> print(f"WIS (1D, omit): {wis:.4f}")
WIS (1D, omit): 0.4333
>>> # Multi-output example
>>> y_t_mo = np.array([[10, 20], [12, np.nan]]) # (2_samples, 2_outputs)
>>> y_l_mo = np.array([ # (2_samples, 2_outputs, 1_interval)
... [[9], [19]], # Sample 0, Output 0&1, Interval 0
... [[11], [21]] # Sample 1, Output 0&1, Interval 0
... ])
>>> y_u_mo = np.array([
... [[11], [21]],
... [[13], [23]]
... ])
>>> y_m_mo = np.array([[10, 20], [12, 22]])
>>> a_mo = np.array([0.5]) # K=1 interval
>>> wis_mo_raw = weighted_interval_score(
... y_t_mo, y_l_mo, y_u_mo, y_m_mo, a_mo,
... nan_policy='propagate', multioutput='raw_values', verbose=1
... )
WIS computed: [0.25 nan ]
>>> print(f"WIS (2D, raw, propagate): {wis_mo_raw}")
WIS (2D, raw, propagate): [0.25 nan]
Notes
-----
- WIS is a proper scoring rule for evaluating quantile/interval forecasts.
- It balances sharpness (interval width) and calibration.
- Lower WIS values indicate better forecast performance.
References
----------
.. [1] Bracher, J., Ray, E. L., Gneiting, T., & Reich, N. G. (2021).
Evaluating epidemic forecasts in an interval format.
PLoS computational biology, 17(2), e1008618.
(The paper discusses WIS and its components.)
"""
# --- 1. Input Validation and Preprocessing ---
y_true_arr = check_array(y_true, ensure_2d=False,
dtype="numeric", force_all_finite=False, copy=False)
y_lower_arr = check_array(y_lower, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False)
y_upper_arr = check_array(y_upper, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False, copy=False)
y_median_arr = check_array(y_median, ensure_2d=False,
dtype="numeric", force_all_finite=False, copy=False)
alphas_arr = check_array(alphas, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False) # Alphas must be finite
if not np.all((alphas_arr > 0) & (alphas_arr < 1)):
raise ValueError(
"All alpha values must be strictly between 0 and 1."
)
if alphas_arr.ndim > 1:
alphas_arr = alphas_arr.squeeze() # Ensure 1D if passed as (K,1) etc.
if alphas_arr.ndim == 0 and alphas_arr.size ==1 : # single alpha
alphas_arr = alphas_arr.reshape(1,)
if alphas_arr.ndim > 1:
raise ValueError(
f"alphas must be 1D. Got shape {alphas_arr.shape}"
)
K_intervals = alphas_arr.shape[0]
if verbose >= 2:
print(f"Initial shapes: y_true={y_true_arr.shape}, "
f"y_lower={y_lower_arr.shape}, y_upper={y_upper_arr.shape}, "
f"y_median={y_median_arr.shape}, alphas={alphas_arr.shape}")
y_true_ndim = y_true_arr.ndim
if y_true_ndim == 1:
y_true_proc = y_true_arr.reshape(-1, 1)
y_median_proc = y_median_arr.reshape(-1, 1)
if y_lower_arr.ndim == 2 and y_lower_arr.shape[1] == K_intervals:
y_lower_proc = y_lower_arr.reshape(y_lower_arr.shape[0], 1, -1)
y_upper_proc = y_upper_arr.reshape(y_upper_arr.shape[0], 1, -1)
elif y_lower_arr.ndim == 3 and y_lower_arr.shape[1] == 1 \
and y_lower_arr.shape[2] == K_intervals:
y_lower_proc = y_lower_arr
y_upper_proc = y_upper_arr
else:
raise ValueError(
"If y_true is 1D, y_lower/y_upper must be 2D "
"(n_samples, K_intervals) or 3D (n_samples, 1, K_intervals)."
f" Got y_lower: {y_lower_arr.shape}, K_intervals={K_intervals}"
)
elif y_true_ndim == 2:
y_true_proc = y_true_arr
y_median_proc = y_median_arr
if y_lower_arr.ndim == 3 and \
y_lower_arr.shape[1] == y_true_proc.shape[1] and \
y_lower_arr.shape[2] == K_intervals:
y_lower_proc = y_lower_arr
y_upper_proc = y_upper_arr
else:
raise ValueError(
"If y_true is 2D (n_s, n_o), y_lower/y_upper must be 3D "
"(n_s, n_o, K_intervals) with matching n_o and K."
f" Got y_true:{y_true_proc.shape}, y_lower:{y_lower_arr.shape}"
f", K_intervals={K_intervals}"
)
else:
raise ValueError(f"y_true must be 1D or 2D. Got {y_true_ndim}D.")
# Consistent lengths and shapes
check_consistent_length(
y_true_proc, y_median_proc, y_lower_proc, y_upper_proc
)
if not (y_true_proc.shape[:2] == y_median_proc.shape[:2] == \
y_lower_proc.shape[:2] == y_upper_proc.shape[:2]):
raise ValueError("Shape mismatch in (n_samples, n_outputs) "
"among processed inputs.")
if not (y_lower_proc.shape[2] == y_upper_proc.shape[2] == K_intervals):
raise ValueError("Mismatch in K_intervals dimension for "
"y_lower/y_upper vs alphas.")
if verbose >= 2:
print(f"Processed shapes: y_true_proc={y_true_proc.shape}, "
f"y_lower_proc={y_lower_proc.shape}, etc.")
weights_proc = None
if sample_weight is not None:
weights_proc = check_array(sample_weight, ensure_2d=False,
dtype="numeric", force_all_finite=True, copy=False)
check_consistent_length(y_true_proc, weights_proc)
if weights_proc.ndim > 1:
raise ValueError(f"sample_weight must be 1D. Got {weights_proc.shape}")
# --- 2. Handle NaNs ---
nan_mask_yt = np.isnan(y_true_proc) # (n_s, n_o)
nan_mask_ym = np.isnan(y_median_proc) # (n_s, n_o)
nan_mask_yl = np.isnan(y_lower_proc).any(axis=2) # (n_s, n_o)
nan_mask_yu = np.isnan(y_upper_proc).any(axis=2) # (n_s, n_o)
combined_nan_mask = nan_mask_yt | nan_mask_ym | nan_mask_yl | nan_mask_yu
y_true_calc, y_lower_calc, y_upper_calc, y_median_calc = (
y_true_proc, y_lower_proc, y_upper_proc, y_median_proc
)
current_weights = weights_proc
if np.any(combined_nan_mask):
if nan_policy == 'raise':
raise ValueError("NaNs detected in input arrays.")
elif nan_policy == 'omit':
if verbose >= 2: print("NaNs detected. Omitting affected rows.")
rows_with_any_nan = combined_nan_mask.any(axis=1) # (n_s,)
rows_to_keep = ~rows_with_any_nan
if not np.any(rows_to_keep):
if verbose >= 1: print("All samples omitted. Returning NaN(s).")
n_out = y_true_proc.shape[1]
return np.full(n_out, np.nan) if \
multioutput == 'raw_values' and y_true_ndim > 1 else np.nan
y_true_calc = y_true_proc[rows_to_keep]
y_lower_calc = y_lower_proc[rows_to_keep]
y_upper_calc = y_upper_proc[rows_to_keep]
y_median_calc = y_median_proc[rows_to_keep]
if current_weights is not None:
current_weights = current_weights[rows_to_keep]
# For 'propagate', NaNs will flow through calculations.
if y_true_calc.shape[0] == 0: # All samples omitted
if verbose >= 1: print("No samples left. Returning NaN(s).")
n_out = y_true_proc.shape[1]
return np.full(n_out, np.nan) if \
multioutput == 'raw_values' and y_true_ndim > 1 else np.nan
# --- 3. Warn for Invalid Bounds ---
if warn_invalid_bounds:
with np.errstate(invalid='ignore'): # For NaN comparisons
invalid_b = y_lower_calc > y_upper_calc # (n_s, n_o, K)
if np.any(invalid_b):
num_invalid = np.sum(invalid_b)
perc = (num_invalid / invalid_b.size) * 100
warnings.warn(
f"{num_invalid} ({perc:.2f}%) interval pairs found "
f"where y_lower > y_upper. These contribute to penalties.",
UserWarning
)
if verbose >=2: print(f"Warning: {num_invalid} invalid bounds.")
# --- 4. Compute WIS Components ---
# Reshape alphas for broadcasting: (1, 1, K_intervals)
alphas_exp = alphas_arr.reshape(1, 1, -1)
# Median Absolute Error term (weighted by 0.5 in some formulations,
# but here it's |y-m| directly as per formula)
mae_term = np.abs(y_median_calc - y_true_calc) # (n_s, n_o)
# Interval Score (IS_alpha_k) components
# y_true_calc needs to be (n_s, n_o, 1) for broadcasting with K dim
y_true_calc_exp = y_true_calc[..., np.newaxis] # (n_s, n_o, 1)
interval_width = y_upper_calc - y_lower_calc # (n_s, n_o, K)
# WIS_alpha_k terms (per interval, per sample, per output)
# WIS_alpha_k = (alpha_k/2) * width_k
# + (lower_k - y) * I(y < lower_k)
# + (y - upper_k) * I(y > upper_k)
wis_term_width = (alphas_exp / 2.0) * interval_width
wis_term_under = (y_lower_calc - y_true_calc_exp) * \
(y_true_calc_exp < y_lower_calc)
wis_term_over = (y_true_calc_exp - y_upper_calc) * \
(y_true_calc_exp > y_upper_calc)
# Sum of individual WIS_alpha_k contributions for each sample/output
# Each element is WIS_alpha_k for that k
per_interval_wis = (
wis_term_width + wis_term_under + wis_term_over
) # (n_s, n_o, K)
sum_of_per_interval_wis = np.sum(
per_interval_wis, axis=2
) # (n_s, n_o)
# Total WIS per sample and output
# Denominator is K_intervals + 1 (for the median term)
if K_intervals == 0: # Only median absolute error
wis_values = mae_term
else:
wis_values = (mae_term + sum_of_per_interval_wis) / (K_intervals + 1.0)
if verbose >= 3:
print("WIS values per sample/output (first 5):")
print(wis_values[:min(5, wis_values.shape[0])])
# --- 5. Aggregate Scores ---
if current_weights is not None:
if np.sum(current_weights) == 0:
output_scores = np.full(wis_values.shape[1], np.nan)
else:
output_scores = np.average(
wis_values, axis=0, weights=current_weights
)
else:
output_scores = np.mean(wis_values, axis=0) # Handles NaNs from propagate
if multioutput == 'uniform_average':
final_score = np.mean(output_scores) # Handles NaNs from propagate
else: # 'raw_values'
final_score = output_scores
if y_true_ndim == 1 and isinstance(final_score, np.ndarray):
if final_score.size == 1:
final_score = final_score.item()
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"WIS computed: {final_score}")
else:
print(f"WIS computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_true': ['array-like'],
'y_pred': ['array-like'],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'verbose': [Integral, bool]
})
def continuous_ranked_probability_score(
y_true: np.ndarray,
y_pred: np.ndarray,
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
verbose: int = 0
) -> Union[float, np.ndarray]:
"""
Compute the sample-based Continuous Ranked Probability Score (CRPS).
This proper scoring rule measures both calibration and sharpness
of ensemble forecasts by comparing predictive samples to true
observations [1]_. The sample approximation is:
.. math::
\\mathrm{CRPS} = \\frac{1}{m}\\sum_{j=1}^{m} |x_j - y|
- \\frac{1}{2m^2}\\sum_{i=1}^{m}\\sum_{j=1}^{m} |x_i - x_j|,
where :math:`x_1,\\dots,x_m` are ensemble members for a single
observation :math:`y`. The score is then averaged over all samples.
Parameters
----------
y_true : array-like of shape (n_samples,) or (n_samples, n_outputs)
Observed true values.
y_pred : array-like
Ensemble forecast samples.
- If `y_true` is 1D (n_samples,), `y_pred` must be 2D
(n_samples, n_ensemble_members).
- If `y_true` is 2D (n_samples, n_outputs), `y_pred` must be 3D
(n_samples, n_outputs, n_ensemble_members).
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, samples are equally weighted.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
How to handle NaNs:
- ``'raise'``: Raise an error if NaNs are present in inputs.
- ``'omit'``: Remove samples (rows) containing NaNs in
`y_true` or `y_pred` before computation.
- ``'propagate'``: NaNs in inputs will propagate to the
CRPS score for the affected sample(s)/output(s).
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregation for multi-output `y_true`.
- ``'raw_values'``: Returns a full set of scores, one for
each output.
- ``'uniform_average'``: Scores of all outputs are averaged
with uniform weight.
verbose : int, default=0
Verbosity level: 0 (silent), 1 (summary), >=2 (debug details).
Returns
-------
score : float or ndarray of floats
Average CRPS. A scalar if `multioutput='uniform_average'` or
if `y_true` is 1D. An array of shape (n_outputs,) if
`multioutput='raw_values'` and `y_true` is 2D.
Lower values are better.
Examples
--------
>>> import numpy as np
>>> # from fusionlab.metrics import continuous_ranked_probability_score
>>> y_true_1d = np.array([0.5, 0.0, 1.0, np.nan])
>>> y_pred_1d = np.array([
... [0.0, 0.5, 1.0], # For 0.5
... [0.0, 0.1, 0.2], # For 0.0
... [0.9, 1.1, 1.0], # For 1.0
... [0.0, 0.5, np.nan] # For np.nan y_true
... ])
>>> score = continuous_ranked_probability_score(y_true_1d, y_pred_1d, nan_policy='omit', verbose=1)
CRPS computed: 0.0333
>>> print(f"CRPS (1D, omit NaNs): {score:.4f}")
CRPS (1D, omit NaNs): 0.0333
>>> score_prop = continuous_ranked_probability_score(y_true_1d, y_pred_1d, nan_policy='propagate')
>>> print(f"CRPS (1D, propagate NaNs): {score_prop}") # Will be nan
CRPS (1D, propagate NaNs): nan
>>> y_true_2d = np.array([[0.5, 2.5], [0.0, np.nan], [1.0, 3.0]])
>>> y_pred_2d = np.array([
... [[0.0, 0.5, 1.0], [2.0, 2.5, 3.0]], # For [0.5, 2.5]
... [[0.0, 0.1, 0.2], [np.nan, 3.1, 3.2]], # For [0.0, np.nan]
... [[0.9, 1.1, 1.0], [2.8, 3.0, 3.2]] # For [1.0, 3.0]
... ])
>>> raw_scores = continuous_ranked_probability_score(y_true_2d, y_pred_2d,
... nan_policy='propagate',
... multioutput='raw_values', verbose=1)
CRPS computed: [0.0333 nan ]
>>> print(f"CRPS (2D, raw, propagate): {raw_scores}")
CRPS (2D, raw, propagate): [0.03333333 nan]
>>> avg_score = continuous_ranked_probability_score(y_true_2d, y_pred_2d,
... nan_policy='omit',
... multioutput='uniform_average', verbose=1)
CRPS computed: 0.0500
>>> print(f"CRPS (2D, omit, average): {avg_score:.4f}")
CRPS (2D, omit, average): 0.0500
Notes
-----
- This function calculates the CRPS based on ensemble samples.
- It is suitable for evaluating probabilistic forecasts like
Monte Carlo simulations or bagged ensembles.
- CRPS is a strictly proper scoring rule, meaning it encourages
honest and accurate probabilistic forecasts.
- Lower CRPS values indicate better forecast performance.
See Also
--------
coverage_score : Metric for prediction interval coverage.
sklearn.metrics.mean_squared_error : A common deterministic metric.
References
----------
.. [1] Gneiting, T., & Raftery, A. E. (2007). Strictly Proper
Scoring Rules, Prediction, and Estimation. Journal of the
American Statistical Association, 102(477), 359–378.
"""
# --- 1. Input Validation and Preprocessing ---
# Convert to NumPy arrays and ensure numeric type.
# force_all_finite=False to handle NaNs according to nan_policy.
y_true_arr = check_array(
y_true, ensure_2d=False, dtype="numeric",
force_all_finite=False, copy=False
)
y_pred_arr = check_array(
y_pred, ensure_2d=False, allow_nd=True, # allow_nd for 3D
dtype="numeric", force_all_finite=False, copy=False
)
if verbose >= 2:
print(f"Initial shapes: y_true={y_true_arr.shape}, "
f"y_pred={y_pred_arr.shape}")
# Determine input dimensionality (single vs. multi-output y_true)
y_true_ndim = y_true_arr.ndim
if y_true_ndim == 1:
# Reshape y_true to (n_samples, 1) for consistent processing
y_true_proc = y_true_arr.reshape(-1, 1)
if y_pred_arr.ndim == 2:
# Reshape y_pred to (n_samples, 1, n_ensemble)
y_pred_proc = y_pred_arr.reshape(y_pred_arr.shape[0], 1, -1)
elif y_pred_arr.ndim == 3 and y_pred_arr.shape[1] == 1:
y_pred_proc = y_pred_arr # Already (n_samples, 1, n_ensemble)
else:
raise ValueError(
"If y_true is 1D (n_samples,), y_pred must be 2D "
"(n_samples, n_ensemble) or 3D (n_samples, 1, n_ensemble)."
f" Got y_pred shape: {y_pred_arr.shape}"
)
elif y_true_ndim == 2:
y_true_proc = y_true_arr
if y_pred_arr.ndim == 3 and \
y_pred_arr.shape[1] == y_true_arr.shape[1]:
y_pred_proc = y_pred_arr
else:
raise ValueError(
"If y_true is 2D (n_samples, n_outputs), y_pred must be 3D "
"(n_samples, n_outputs, n_ensemble_members) with matching "
"n_outputs."
f" Got y_true: {y_true_arr.shape}, y_pred: {y_pred_arr.shape}"
)
else:
raise ValueError(
f"y_true must be 1D or 2D. Got {y_true_ndim}D."
)
# Final shape checks for consistency
check_consistent_length(y_true_proc, y_pred_proc)
if y_true_proc.shape[0] != y_pred_proc.shape[0] or \
y_true_proc.shape[1] != y_pred_proc.shape[1]:
raise ValueError(
"Processed y_true and y_pred shapes are inconsistent. "
f"y_true_proc: {y_true_proc.shape}, "
f"y_pred_proc: {y_pred_proc.shape}"
)
if y_pred_proc.shape[2] == 0: # No ensemble members
if verbose >= 1:
print("y_pred has no ensemble members. CRPS is undefined (NaN).")
# Result shape depends on multioutput and original y_true_ndim
n_outputs = y_true_proc.shape[1]
if multioutput == 'raw_values' and y_true_ndim > 1:
return np.full(n_outputs, np.nan)
return np.nan
if verbose >= 2:
print(f"Processed shapes for calculation: "
f"y_true_proc={y_true_proc.shape}, "
f"y_pred_proc={y_pred_proc.shape}")
# Handle sample_weight
weights_proc = None
if sample_weight is not None:
weights_proc = check_array(
sample_weight, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False # Weights cannot be NaN
)
check_consistent_length(y_true_proc, weights_proc)
if weights_proc.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got shape {weights_proc.shape}"
)
if verbose >= 3:
print(f" sample_weight shape: {weights_proc.shape}")
# --- 2. Handle NaNs based on nan_policy ---
# Mask for NaNs: True if y_true_ij is NaN or any y_pred_ijk is NaN
nan_mask_y_true = np.isnan(y_true_proc) # (n_samples, n_outputs)
nan_mask_y_pred = np.isnan(y_pred_proc).any(axis=2) # (n_samples, n_outputs)
# Overall NaN mask for each sample-output pair
combined_nan_mask = nan_mask_y_true | nan_mask_y_pred # (n_s, n_o)
if np.any(combined_nan_mask):
if nan_policy == 'raise':
if verbose >= 2:
print("NaNs detected and nan_policy='raise'. Raising error.")
raise ValueError(
"NaNs detected in input arrays (y_true or y_pred)."
)
elif nan_policy == 'omit':
if verbose >= 2:
print("NaNs detected with nan_policy='omit'. "
"Omitting affected samples (rows).")
# Omit entire rows if *any* output for that sample has a NaN
rows_with_any_nan = combined_nan_mask.any(axis=1) # (n_samples,)
rows_to_keep = ~rows_with_any_nan
if verbose >= 3:
print("Rows to keep after NaN omission:", rows_to_keep)
if not np.any(rows_to_keep):
if verbose >= 1:
print("All samples contained NaNs and were omitted. "
"Returning NaN(s).")
n_outputs = y_true_proc.shape[1]
if multioutput == 'raw_values' and y_true_ndim > 1:
return np.full(n_outputs, np.nan)
return np.nan
y_true_calc = y_true_proc[rows_to_keep]
y_pred_calc = y_pred_proc[rows_to_keep]
if weights_proc is not None:
weights_proc = weights_proc[rows_to_keep]
# combined_nan_mask needs to be updated for propagate logic later
# but for omit, it's not used further for indexing CRPS values
elif nan_policy == 'propagate':
if verbose >= 2:
print("NaNs detected and nan_policy='propagate'. "
"NaNs will propagate in CRPS calculation.")
# Calculations will proceed, NaNs in y_true_proc/y_pred_proc
# will naturally lead to NaNs in crps_vals.
y_true_calc = y_true_proc
y_pred_calc = y_pred_proc
else: # Should not be reached due to @validate_params
raise ValueError(f"Unknown nan_policy: {nan_policy}")
else: # No NaNs detected initially
y_true_calc = y_true_proc
y_pred_calc = y_pred_proc
if y_true_calc.shape[0] == 0: # All samples omitted
if verbose >= 1:
print("No samples left after NaN handling. Returning NaN(s).")
n_outputs = y_true_proc.shape[1]
if multioutput == 'raw_values' and y_true_ndim > 1:
return np.full(n_outputs, np.nan)
return np.nan
# --- 3. Compute CRPS terms ---
# Term 1: E[|X - y|] = mean(|ensemble_member - true_value|)
# y_true_calc needs to be (n_samples, n_outputs, 1) for broadcasting
abs_diff_term1 = np.abs(
y_pred_calc - y_true_calc[..., np.newaxis]
)
# Mean over ensemble members (axis 2)
# If nan_policy='propagate', NaNs here will propagate.
term1 = np.mean(abs_diff_term1, axis=2) # Shape: (n_samples, n_outputs)
# Term 2: 0.5 * E[|X - X'|]
# = 0.5 * mean(|ensemble_member_i - ensemble_member_j|)
# y_pred_calc is (n_s, n_o, n_e)
# Create pairwise differences: (n_s, n_o, n_e, n_e)
# y_pred_calc[..., :, np.newaxis] -> (n_s, n_o, n_e, 1)
# y_pred_calc[..., np.newaxis, :] -> (n_s, n_o, 1, n_e)
abs_diff_term2_pairs = np.abs(
y_pred_calc[..., :, np.newaxis] - y_pred_calc[..., np.newaxis, :]
)
# Mean over pairs of ensemble members (axes 2 and 3)
# If nan_policy='propagate', NaNs here will propagate.
m = y_pred_calc.shape[2] # Number of ensemble members
if m == 0: # Should have been caught earlier
term2 = np.full_like(term1, np.nan)
elif m == 1: # Only one ensemble member, term2 is 0
term2 = np.zeros_like(term1)
else:
# Sum over (n_e, n_e) and divide by m*m, then by 2
# Or mean over (n_e, n_e) and divide by 2
term2 = np.mean(abs_diff_term2_pairs, axis=(2, 3)) * 0.5
crps_values = term1 - term2 # Shape: (n_samples, n_outputs)
# If nan_policy was 'propagate', NaNs from input should already be in crps_values.
# If nan_policy was 'omit', combined_nan_mask is not relevant here as
# affected rows were removed.
# If nan_policy was 'raise', no NaNs exist.
if verbose >= 3:
print("CRPS values per sample/output (first 5):")
print(crps_values[:min(5, crps_values.shape[0])])
# --- 4. Aggregate scores ---
# Apply sample weights if provided
# Note: np.average handles NaNs by default if weights are involved,
# but if crps_values contains NaNs (from 'propagate'), the
# result of weighted average for that output will be NaN.
if weights_proc is not None:
if np.sum(weights_proc) == 0: # Avoid division by zero
# Average over samples (axis 0)
output_scores = np.full(crps_values.shape[1], np.nan)
else:
output_scores = np.average(
crps_values, axis=0, weights=weights_proc
)
else:
# If 'propagate', use np.nanmean to ignore NaNs for averaging,
# UNLESS the NaN was due to original input being NaN.
# The crps_values already correctly holds NaNs from propagation.
# So, a simple mean is fine; it will propagate NaNs correctly.
output_scores = np.mean(crps_values, axis=0)
# Handle multioutput aggregation
if multioutput == 'uniform_average':
# If output_scores contains NaNs (from propagate), mean will be NaN.
final_score = np.mean(output_scores)
elif multioutput == 'raw_values':
final_score = output_scores
else: # Should not be reached
raise ValueError(f"Unknown multioutput mode: {multioutput}")
# If original y_true was 1D, result should be scalar,
# even if multioutput='raw_values' (it would be a 1-element array).
if y_true_ndim == 1 and isinstance(final_score, np.ndarray):
if final_score.size == 1:
final_score = final_score.item()
# else: if it became scalar (e.g. np.nan), it's already fine
if verbose >= 1:
if isinstance(final_score, np.ndarray):
with np.printoptions(precision=4, suppress=True):
print(f"CRPS computed: {final_score}")
else:
print(f"CRPS computed: {final_score:.4f}")
return final_score
[docs]
@validate_params({
'y_true': ['array-like'],
'y_lower': ['array-like'],
'y_upper': ['array-like'],
'sample_weight': ['array-like', None],
'nan_policy': [StrOptions({'omit', 'propagate', 'raise'})],
'multioutput': [StrOptions({'raw_values', 'uniform_average'})],
'warn_invalid_bounds': ['boolean'],
'verbose': [Integral]
})
def coverage_score(
y_true,
y_lower,
y_upper,
sample_weight: Optional[np.ndarray] = None,
nan_policy: NanPolicyLiteral = 'propagate',
multioutput: MultioutputLiteral = 'uniform_average',
warn_invalid_bounds: bool = True,
eps: float = 1e-8,
verbose: int = 0
) -> Union[float, np.ndarray]:
r"""
Compute the coverage score of prediction intervals.
Measures the fraction of instances where the true value lies within a
provided lower and upper bound. This metric is useful for
evaluating uncertainty estimates in probabilistic forecasts.
Formally, given observed true values
:math:`y = \{y_1, \ldots, y_n\}` (which can be multi-output),
and corresponding interval bounds :math:`\{l_1, \ldots, l_n\}` and
:math:`\{u_1, \ldots, u_n\}`, the coverage score is defined
for each output (if applicable) as:
.. math::
\text{coverage} = \frac{1}{N_{valid}}\sum_{i=1}^{N_{valid}}
\mathbf{1}\{ l_i \leq y_i \leq u_i \},
where :math:`\mathbf{1}\{\cdot\}` is an indicator function and
:math:`N_{valid}` is the number of valid samples after handling NaNs.
Parameters
----------
y_true : array-like of shape (n_samples,) or (n_samples, n_outputs)
The true observed values. Must be numeric.
y_lower : array-like of shape (n_samples,) or (n_samples, n_outputs)
The lower bound predictions, matching `y_true` in shape.
y_upper : array-like of shape (n_samples,) or (n_samples, n_outputs)
The upper bound predictions, matching `y_true` in shape.
sample_weight : array-like of shape (n_samples,), optional
Sample weights. If None, then samples are equally weighted.
nan_policy : {'omit', 'propagate', 'raise'}, default='propagate'
Defines how to handle NaN values:
- ``'propagate'``: If NaNs are present in inputs, they propagate
to the output. For `multioutput='raw_values'`, an output column
with NaNs will result in a NaN score for that output. For
`multioutput='uniform_average'`, if any per-output score is NaN,
the final average may be NaN (unless `np.nanmean` behavior is more nuanced,
here standard mean is used after per-output scores are found).
- ``'omit'``: NaNs in any of `y_true`, `y_lower`, or `y_upper`
for a given sample (row) will lead to the omission of that entire
sample from the coverage calculation.
- ``'raise'``: Encountering NaNs raises a ValueError.
multioutput : {'raw_values', 'uniform_average'}, default='uniform_average'
Defines aggregating of multiple output values.
Array-like value defines weights used to average scores.
- ``'raw_values'``: Returns a full set of scores in case of
multi-output input.
- ``'uniform_average'``: Scores of all outputs are averaged with
uniform weight.
warn_invalid_bounds : bool, default=True
If True, issues a `UserWarning` if any `y_lower[i] > y_upper[i]`.
These samples will always count as uncovered.
eps : float, default=1e-8
Small epsilon value to prevent division by zero or issues with
very small sum of weights when `sample_weight` is used.
verbose : int, default=0
Controls the level of verbosity for internal logging (prints to console):
- 0: No output.
- 1: Basic info (e.g., final coverage).
- >=2: More details (e.g., NaN handling, shapes).
Returns
-------
score : float or ndarray of floats
Coverage score. If `multioutput='raw_values'`, an array of scores
is returned, one for each output. Otherwise, a single float average
score is returned. Returns `np.nan` if calculation is not possible
(e.g., all samples omitted due to NaNs).
Notes
-----
If `y_true`, `y_lower`, and `y_upper` are 1D arrays, the behavior is
equivalent to a single-output scenario, and `multioutput` options
will yield consistent scalar results (though `'raw_values'` will technically
return a 1-element array that is then squeezed to scalar if input was 1D).
Examples
--------
>>> from fusionlab.metrics import coverage_score
>>> import numpy as np
>>> y_true = np.array([10, 12, 11, 9, np.nan])
>>> y_lower = np.array([9, 11, 10, 8, 9])
>>> y_upper = np.array([11, 13, 12, 10, 11])
>>> # Default: nan_policy='propagate'
>>> coverage_score(y_true, y_lower, y_upper) # Propagates NaN
nan
>>> # Omitting NaNs
>>> coverage_score(y_true, y_lower, y_upper, nan_policy='omit')
Coverage computed: 1.0000
1.0
>>> # Multi-output example
>>> y_true_mo = np.array([[10, 20], [12, 22], [11, np.nan]])
>>> y_lower_mo = np.array([[9, 19], [11, 21], [10, 20]])
>>> y_upper_mo = np.array([[11, 21], [13, 23], [12, 22]])
>>> coverage_score(y_true_mo, y_lower_mo, y_upper_mo, nan_policy='omit',
multioutput='raw_values')
Coverage computed: [1. 1.]
array([1., 1.])
>>> coverage_score(y_true_mo, y_lower_mo, y_upper_mo, nan_policy='omit',
multioutput='uniform_average')
Coverage computed: 1.0000
1.0
>>> coverage_score(y_true_mo, y_lower_mo, y_upper_mo,
nan_policy='propagate', multioutput='raw_values')
Coverage computed: [1. nan]
array([ 1., nan])
See Also
--------
sklearn.utils.validation.check_array : Utility for input validation.
numpy.average : Compute weighted average, used with `sample_weight`.
References
----------
.. [1] Gneiting, T. & Raftery, A. E. (2007). "Strictly Proper
Scoring Rules, Prediction, and Estimation." J. Amer.
Statist. Assoc., 102(477):359–378.
"""
# 1. Input validation and conversion
# Force_all_finite=False allows NaNs, which are handled by nan_policy.
# Copy=False is an optimization if inputs are already suitable arrays.
y_true_p = check_array(
y_true, ensure_2d=False, allow_nd=True,
dtype="numeric", force_all_finite=False,
copy=False)
y_lower_p = check_array(
y_lower, ensure_2d=False,
allow_nd=True, dtype="numeric",
force_all_finite=False,
copy=False
)
y_upper_p = check_array(
y_upper, ensure_2d=False,
allow_nd=True, dtype="numeric",
force_all_finite=False,
copy=False)
y_true_orig_ndim = y_true_p.ndim
if not (eps > 0):
raise ValueError("eps must be positive.")
if verbose >= 3:
print("Input shapes before reshaping:")
print(f" y_true_p: {y_true_p.shape}, "
f"y_lower_p: {y_lower_p.shape},"
f" y_upper_p: {y_upper_p.shape}")
# Reshape 1D arrays to 2D (n_samples, 1 feature)
# for consistent processing
if y_true_p.ndim == 1:
y_true_p = y_true_p.reshape(-1, 1)
if y_lower_p.ndim == 1: y_lower_p = y_lower_p.reshape(-1, 1)
# else: # check_consistent_length and shape equality will catch this
if y_upper_p.ndim == 1: y_upper_p = y_upper_p.reshape(-1, 1)
# else:
elif y_true_p.ndim > 2:
raise ValueError(
"Inputs y_true, y_lower, y_upper must be"
f" 1D or 2D. Got {y_true_p.ndim}D for y_true.")
# All inputs should now be 2D for internal processing
# (or have failed if y_true was >2D or if 1D
# inputs had mismatched companions >1D)
if verbose >= 3:
print("Input shapes after potential 1D->2D reshaping:")
print(f" y_true_p: {y_true_p.shape}, y_lower_p:"
f" {y_lower_p.shape}, y_upper_p: {y_upper_p.shape}")
# Check for consistent shapes (length of samples and number of outputs)
try:
check_consistent_length(y_true_p, y_lower_p, y_upper_p)
except ValueError as e:
if verbose >=2: print(f"Shape inconsistency (length): {e}")
raise ValueError(
"y_true, y_lower, y_upper must have the"
" same number of samples (axis 0)."
f" Got shapes: y_true={y_true_p.shape},"
f" y_lower={y_lower_p.shape}, y_upper={y_upper_p.shape}"
) from e
if not (y_true_p.shape == y_lower_p.shape == y_upper_p.shape):
error_msg = (
"y_true, y_lower, y_upper must have the same shape. "
f"Got y_true: {y_true_p.shape}, y_lower:"
f" {y_lower_p.shape}, y_upper: {y_upper_p.shape}."
)
if verbose >=2: print(error_msg)
raise ValueError(error_msg)
if y_true_p.size == 0: # Handle empty inputs early
if verbose >= 1: print("Inputs are empty. Returning NaN.")
return (
np.nan if multioutput == 'uniform_average' or y_true_orig_ndim == 1
else np.full(y_true_p.shape[1], np.nan)
)
# Handle sample_weight
weights = sample_weight
if weights is not None:
weights = check_array(
weights, ensure_2d=False, dtype="numeric",
force_all_finite=True, copy=False)
check_consistent_length(y_true_p, weights)
if weights.ndim > 1:
raise ValueError(
f"sample_weight must be 1D. Got shape {weights.shape}")
if verbose >= 3:
print(f" sample_weight shape: {weights.shape}")
# 2. Handle NaNs
# Create a mask for NaNs across any of the three
# input arrays for each sample-output pair.
nan_mask_entries = np.isnan(
y_true_p) | np.isnan(y_lower_p) | np.isnan(y_upper_p)
if np.any(nan_mask_entries):
if nan_policy == 'raise':
if verbose >= 2:
print("NaNs detected and nan_policy='raise'. Raising error.")
raise ValueError(
"NaNs detected in input arrays (y_true, y_lower, or y_upper).")
elif nan_policy == 'omit':
if verbose >= 2:
print(
"NaNs detected with nan_policy='omit'."
" Omitting affected samples (rows).")
# Omit entire rows (samples) if any of their values
# (y_true, y_lower, or y_upper for any output) is NaN.
rows_with_nan = np.any(nan_mask_entries, axis=1)
rows_to_keep = ~rows_with_nan
if verbose >= 4:
print(f"NaN omit mask (rows_to_keep): "
f"{rows_to_keep[:10] if rows_to_keep.size > 10 else rows_to_keep}")
if not np.any(rows_to_keep):
if verbose >=1:
print(
"All samples contained NaNs and were"
" omitted. Returning NaN(s).")
n_outputs = y_true_p.shape[1]
return (
np.nan if multioutput == 'uniform_average'
or n_outputs == 1 else np.full(n_outputs, np.nan)
)
y_true_p = y_true_p[rows_to_keep]
y_lower_p = y_lower_p[rows_to_keep]
y_upper_p = y_upper_p[rows_to_keep]
if weights is not None:
weights = weights[rows_to_keep]
if verbose >=2:
print("Shapes after omitting NaN rows:"
f" y_true_p: {y_true_p.shape}")
# Should be caught by `not np.any(rows_to_keep)` but as a safeguard
if y_true_p.size == 0:
if verbose >=1:
print(
"All samples resulted in empty arrays"
" after NaN omission. Returning NaN(s)."
)
n_outputs = (
rows_with_nan.shape[0] if y_true_p.shape[1]==0
else y_true_p.shape[1] # Fallback if y_true_p is (0,0)
)
n_outputs = (
y_true_p.shape[1] if y_true_p.ndim == 2
and y_true_p.shape[1] > 0 else 1
)
return (
np.nan if multioutput == 'uniform_average'
or n_outputs == 1 else np.full(n_outputs, np.nan)
)
elif nan_policy == 'propagate':
if verbose >= 2:
print(
"NaNs detected and nan_policy='propagate'."
" NaNs will propagate to result.")
# NaNs in y_true_p, y_lower_p, y_upper_p will
# lead to NaNs in coverage_mask.
# np.average or np.mean will then correctly propagate this.
# 3. Check for invalid bounds (y_lower > y_upper)
# This check is performed *after* NaN handling if 'omit' is used,
# or on data that might contain NaNs if 'propagate' is used.
if warn_invalid_bounds:
# Temporarily ignore warnings from comparing NaNs if they are being propagated
with np.errstate(invalid='ignore' if nan_policy == 'propagate' else 'raise'):
invalid_bounds_mask = y_lower_p > y_upper_p
# np.any handles NaNs by treating them as False
# in a boolean context unless all are NaN
if np.any(invalid_bounds_mask):
num_invalid_bounds = np.sum(invalid_bounds_mask) # NaNs in mask become 0 here
percentage_invalid = (
num_invalid_bounds / invalid_bounds_mask.size) * 100
warnings.warn(
f"{num_invalid_bounds} ({percentage_invalid:.2f}%)"
" sample-output pairs found where"
f" y_lower > y_upper. These will always count as uncovered.",
UserWarning
)
if verbose >=2:
print(f"Warning: {num_invalid_bounds}"
" invalid bound pairs detected.")
# 4. Compute coverage mask
# NaNs in inputs (if nan_policy='propagate') will result in NaNs in coverage_mask.
coverage_mask = (y_true_p >= y_lower_p) & (y_true_p <= y_upper_p)
if verbose >= 4:
print("Coverage mask (sample of up to 5x5 or 10 elements):")
sample_to_print = coverage_mask
if coverage_mask.ndim == 2:
sample_to_print = (
coverage_mask[:min(5, coverage_mask.shape[0]),
:min(5, coverage_mask.shape[1])]
)
elif coverage_mask.ndim == 1: # Should not happen as we reshape to 2D
sample_to_print = coverage_mask[:min(10, coverage_mask.size)]
print(sample_to_print)
# 5. Compute score
# If coverage_mask is empty (e.g.,
# all NaNs omitted and input was small), size will be 0.
if coverage_mask.size == 0:
# This case should ideally be caught
# earlier by checks on y_true_p.size after NaN omission
if verbose >= 1:
print("Coverage mask is empty"
" (e.g. all values omitted). Returning NaN.")
n_outputs = (
y_true_p.shape[1] if y_true_p.ndim == 2
and y_true_p.shape[1] > 0 else 1
)
return (
np.nan if multioutput == 'uniform_average'
or n_outputs == 1 else np.full(n_outputs, np.nan)
)
if multioutput == 'uniform_average':
if weights is not None:
sum_weights = np.sum(weights)
# Weighted average per output, then unweighted
# average of these scores
# Treat sum of weights close to zero as zero
if sum_weights < eps:
if verbose >= 1 and sum_weights > 0:
# Warn if sum_weights is positive but too small
warnings.warn(
f"Sum of weights ({sum_weights}) is < eps ({eps})."
" Result will be NaN.", UserWarning)
# np.average with sum_weights=0 raises ZeroDivisionError
# or returns NaNs if weights contain NaNs that sum to 0.
# We want consistent NaN output if sum_weights < eps.
output_scores = np.full(coverage_mask.shape[1], np.nan)
else:
output_scores = np.average(coverage_mask, axis=0, weights=weights)
# If nan_policy='propagate', output_scores can
# have NaNs. np.nanmean handles this.
coverage = (
np.nanmean( output_scores) if nan_policy != 'propagate'
else np.mean(output_scores)
)
else:
# Mean per output, then mean of means.
# np.nanmean for outer mean if propagation could lead to NaN output_scores
output_scores = (
np.nanmean( coverage_mask, axis=0)
if nan_policy != 'propagate'
else np.mean(coverage_mask, axis=0)
)
coverage = (
np.nanmean(output_scores)
if nan_policy != 'propagate'
else np.mean(output_scores)
)
elif multioutput == 'raw_values':
if weights is not None:
if np.sum(weights) == 0:
coverage = np.full(coverage_mask.shape[1], np.nan)
else:
coverage = np.average(coverage_mask, axis=0, weights=weights)
else:
# For 'propagate', np.nanmean correctly
# computes mean for columns ignoring NaNs within them,
# BUT if a value that should propagate IS a NaN,
# that column's mean should BE NaN.
# So, simple .mean() is better for 'propagate'.
coverage = (
np.mean(coverage_mask, axis=0)
if nan_policy == 'propagate'
else np.nanmean(coverage_mask, axis=0)
)
# Re-check logic for propagate with raw_values:
# If policy is propagate, NaNs in coverage_mask
# should make the corresponding output_score NaN.
# np.mean(axis=0) does this. np.nanmean(axis=0)
# would ignore NaNs, which contradicts 'propagate'.
if nan_policy == 'propagate':
coverage = np.mean(
coverage_mask.astype(float), axis=0) # Ensure float for NaNs
else: # 'omit' (NaNs removed) or 'raise' (no NaNs)
coverage = np.mean(coverage_mask, axis=0)
else: # Should not be reached due to @validate_params
raise ValueError(f"Unknown multioutput mode: {multioutput}")
# If original input was 1D, and multioutput='raw_values', result should be scalar.
if y_true_orig_ndim == 1 and multioutput == 'raw_values':
if isinstance(coverage, np.ndarray) and coverage.size == 1:
coverage = coverage.item()
# else: coverage is already scalar if it's np.nan from an empty set.
if verbose >= 1:
if isinstance(coverage, np.ndarray):
with np.printoptions(precision=4, suppress=True): # Changed precision
print(f"Coverage computed: {coverage}")
else: # Scalar
print(f"Coverage computed: {coverage:.4f}") # Changed precision
return coverage