Source code for fusionlab.nn.components

# -*- coding: utf-8 -*-
#   License: BSD-3-Clause
#   Author: LKouadio <etanoyau@gmail.com>
"""
Provides a collection of specialized Keras-compatible 
layers and components for constructing advanced time series 
forecasting and anomaly detection models. It includes building 
blocks such as attention mechanisms, multi-scale LSTMs, gating 
and normalization layers, and multi-objective loss functions.
"""
from __future__ import annotations 
import warnings  
from numbers import Real, Integral  
from typing import Optional, Union, List, Dict, Tuple, Callable

import numpy as np 

from .._fusionlog import fusionlog 
from ..api.property import  NNLearner 
from ..core.checks import validate_nested_param
from ..core.handlers import param_deprecated_message
from ..compat.sklearn import validate_params, Interval, StrOptions
from ..utils.deps_utils import ensure_pkg

from . import KERAS_DEPS, KERAS_BACKEND, dependency_message
from ..compat.tf import standalone_keras

if KERAS_BACKEND:
    try:
        # Equivalent to: from tensorflow.keras import activations
        activations = KERAS_DEPS.activations  
    except (ImportError, AttributeError) as e: 
        try: 
            activations = standalone_keras('activations')
        except: 
            raise ImportError (str(e))
    except: 
        raise ImportError(
                "Module 'activations' could not be"
                " imported from either tensorflow.keras"
                " or standalone keras. Ensure that TensorFlow "
                "or standalone Keras is installed and the"
                " module exists."
        )

LSTM = KERAS_DEPS.LSTM
LayerNormalization = KERAS_DEPS.LayerNormalization 
TimeDistributed = KERAS_DEPS.TimeDistributed
MultiHeadAttention = KERAS_DEPS.MultiHeadAttention
Model = KERAS_DEPS.Model 
BatchNormalization = KERAS_DEPS.BatchNormalization
Input = KERAS_DEPS.Input
Softmax = KERAS_DEPS.Softmax
Flatten = KERAS_DEPS.Flatten
Dropout = KERAS_DEPS.Dropout 
Dense = KERAS_DEPS.Dense
Embedding =KERAS_DEPS.Embedding 
Concatenate=KERAS_DEPS.Concatenate 
Layer = KERAS_DEPS.Layer 
Loss=KERAS_DEPS.Loss
Tensor=KERAS_DEPS.Tensor
Sequential =KERAS_DEPS.Sequential
TensorShape =KERAS_DEPS.TensorShape 

register_keras_serializable=KERAS_DEPS.register_keras_serializable

tf_Assert= KERAS_DEPS.Assert
tf_TensorShape= KERAS_DEPS.TensorShape
tf_concat = KERAS_DEPS.concat
tf_shape = KERAS_DEPS.shape
tf_reshape=KERAS_DEPS.reshape
tf_repeat =KERAS_DEPS.repeat
tf_add = KERAS_DEPS.add
tf_cast=KERAS_DEPS.cast
tf_maximum = KERAS_DEPS.maximum
tf_reduce_mean = KERAS_DEPS.reduce_mean
tf_add_n = KERAS_DEPS.add_n
tf_float32=KERAS_DEPS.float32
tf_constant=KERAS_DEPS.constant 
tf_square=KERAS_DEPS.square 
tf_transpose=KERAS_DEPS.transpose 
tf_logical_and=KERAS_DEPS.logical_and 
tf_logical_not = KERAS_DEPS.logical_not 
tf_logical_or = KERAS_DEPS.logical_or
tf_get_static_value =KERAS_DEPS.get_static_value
tf_reduce_sum = KERAS_DEPS.reduce_sum
tf_stack = KERAS_DEPS.stack
tf_expand_dims = KERAS_DEPS.expand_dims
tf_tile = KERAS_DEPS.tile
tf_range=KERAS_DEPS.range 
tf_rank=KERAS_DEPS.rank
tf_split = KERAS_DEPS.split
tf_multiply=KERAS_DEPS.multiply
tf_cond=KERAS_DEPS.cond
tf_constant =KERAS_DEPS.constant 
tf_equal =KERAS_DEPS.equal 
tf_int32=KERAS_DEPS.int32 
tf_debugging =KERAS_DEPS.debugging 
tf_autograph=KERAS_DEPS.autograph
tf_pad =KERAS_DEPS.pad 
tf_maximum =KERAS_DEPS.maximum 

tf_newaxis = KERAS_DEPS.newaxis 
tf_pow = KERAS_DEPS.pow
tf_sin = KERAS_DEPS.sin
tf_cos = KERAS_DEPS.cos
tf_exp = KERAS_DEPS.exp 
tf_log = KERAS_DEPS.log
tf_ones = KERAS_DEPS.ones 
tf_linalg = KERAS_DEPS.linalg
tf_floordiv = KERAS_DEPS.floordiv
tf_greater =KERAS_DEPS.greater 
tf_float32 = KERAS_DEPS.float32
    

_logger = fusionlog().get_fusionlab_logger(__name__)

DEP_MSG = dependency_message('components') 

__all__ = [
     'AdaptiveQuantileLoss',
     'AnomalyLoss',
     'CrossAttention',
     'DynamicTimeWindow',
     'ExplainableAttention',
     'GatedResidualNetwork',
     'HierarchicalAttention',
     'LearnedNormalization',
     'MemoryAugmentedAttention',
     'MultiDecoder',
     'MultiModalEmbedding',
     'MultiObjectiveLoss',
     'MultiResolutionAttentionFusion',
     'MultiScaleLSTM',
     'PositionalEncoding',
     'QuantileDistributionModeling',
     'StaticEnrichmentLayer',
     'TemporalAttentionLayer',
     'VariableSelectionNetwork',
     'Activation', 
     'TransformerEncoderLayer', 
     'TransformerDecoderLayer', 
     'TSPositionalEncoding', 
     'aggregate_multiscale', 
     'aggregate_time_window_output', 
     'create_causal_mask',
     'aggregate_multiscale_on_3d'
    ]


@register_keras_serializable(
    'fusionlab.nn.components', name="Activation"
  )
class Activation(Layer, NNLearner):
    r"""
    Flexible activation layer that transparently delegates to any
    built‑in or user‑defined activation function.

    Parameters
    ----------
    activation : str or Callable or None, default ``'relu'``
        Identifier of the desired activation.

        * If *str*, it must be recognised by
          :pymeth:`keras.activations.get`.
        * If *Callable*, it must follow the signature
          ``f(tensor) -> tensor``.
        * If *None*, the layer acts as the identity mapping
          :math:`f(x)=x`.

    **kwargs
        Additional keyword arguments forwarded to
        :class:`keras.layers.Layer` (e.g. ``name`` or ``dtype``).

    Notes
    -----
    Let :math:`\mathbf{x}\in\mathbb{R}^{n}` be the input tensor and
    :math:`\phi` the resolved activation function.  The layer performs

    .. math::

        \mathbf{y} = \phi(\mathbf{x}).

    Because :pyclass:`Activation` inherits from
    :class:`keras.layers.Layer`, it can be freely composed inside a
    ``tf.keras.Sequential`` or functional graph.

    Methods
    -------
    call(inputs, training=False)
        Apply the resolved activation to *inputs*.

    get_config()
        Return a JSON‑serialisable configuration dictionary.

    __repr__()
        Nicely formatted string representation—helpful in interactive
        sessions.

    Examples
    --------
    >>> import tensorflow as tf
    >>> from fusionlab.nn.components import Activation
    >>> x  = tf.constant([‑2., 0., 1.5])
    >>> act = Activation('swish')
    >>> act(x).numpy()
    array([‑0.238, 0.   , 1.273], dtype=float32)

    Custom callable:

    >>> def leaky_relu(x, alpha=0.1):
    ...     return tf.where(x > 0, x, alpha * x)
    ...
    >>> act = Activation(leaky_relu)
    >>> act(x).numpy()
    array([‑0.2, 0. , 1.5], dtype=float32)

    See Also
    --------
    keras.activations.get
        Canonical resolver used under the hood.
    keras.layers.Activation
        Native Keras counterpart with fewer conveniences.

    References
    ----------
    .. [1] Ramachandran, Prajit, et al. *Searching for Activation
       Functions*. arXiv preprint arXiv:1710.05941 (2017).
    """

    @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG)
    def __init__(self,
                 activation: Union[str, Callable, None] = 'relu',
                 **kwargs):
        super().__init__(**kwargs)

        # Store original user input for debugging / introspection
        self.activation_original = activation

        # Resolve activation into (callable, canonical string)
        if activation is None:
            self.activation_fn  = activations.get(None)
            self.activation_str = 'linear'

        elif isinstance(activation, str):
            # Try to get a standard name via serialize,
            # fallback to object name
            try:
                self.activation_fn  = activations.get(activation)
                self.activation_str = activation
            except ValueError as err:
                raise ValueError(
                    f"Unknown activation '{activation}'."
                ) from err

        elif callable(activation):
            self.activation_fn = activation
            try:                                       # Try serialising
                ser = activations.serialize(activation)
                # Fallback if serialize doesn't give simple string
                self.activation_str = (
                    ser if isinstance(ser, str)
                    else getattr(activation, '__name__',
                                  activation.__class__.__name__)
                )
            except ValueError: 
                # Fallback if serialize doesn't give simple string
                self.activation_str = getattr(
                    activation, '__name__',
                    activation.__class__.__name__
                )
        else:
            raise TypeError(
                "Parameter 'activation' must be *str*, Callable, or "
                "*None*. Received type "
                f"{type(activation).__name__!r}."
            )

        if not callable(self.activation_fn):
            raise TypeError(
                f"Resolved activation '{self.activation_str}' is not "
                "callable."
            )

    @tf_autograph.experimental.do_not_convert
    def call(self, inputs, training: bool = False):
        """
        Apply the stored activation to `inputs`.

        Parameters
        ----------
        inputs : tf.Tensor
            Input tensor of arbitrary shape.
        training : bool, default ``False``
            Present for API compatibility; ignored because most
            activations do not behave differently at training time.

        Returns
        -------
        tf.Tensor
            Tensor with identical shape to *inputs* but transformed
            element‑wise by the activation.
        """
        # A single line keeps Autograph happy 
        # and maximises performance
        return self.activation_fn(inputs)


    def get_config(self) -> dict:
        """
        Configuration dictionary for model serialization.

        Returns
        -------
        dict
            JSON‑friendly mapping that allows
            :pyfunc:`keras.layers.deserialize` to recreate the layer.
        """
        config = super().get_config()
        # Save the CANONICAL STRING NAME for serialization
        config.update({
            'activation': self.activation_str
        })
        return config

    # String representation
    def __repr__(self) -> str:                         # noqa: D401
        """
        Return *repr(self)*.

        The canonical activation string is included for clarity.
        """
        return (f"{self.__class__.__name__}("
                f"activation={self.activation_str!r})")
    
# -------------------- Pure Transformers components ------------------------------

@register_keras_serializable(
    'fusionlab.nn.transformers', 
    name="TransformerEncoderLayer"
)
class TransformerEncoderLayer(Layer, NNLearner):
    """
    A single layer of the Transformer Encoder.

    Args:
        embed_dim (int): Dimensionality of the input and output.
        num_heads (int): Number of attention heads.
        ffn_dim (int): Hidden dimensionality of the feed-forward network.
        dropout_rate (float): Dropout rate.
        ffn_activation (str): Activation function for the FFN.
        layer_norm_epsilon (float): Epsilon for LayerNormalization.
    """
    def __init__(
        self, 
        embed_dim: int, 
        num_heads: int, 
        ffn_dim: int, 
        dropout_rate: float = 0.1,
        ffn_activation: str = 'relu',
        layer_norm_epsilon: float = 1e-6,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ffn_dim = ffn_dim
        self.dropout_rate = dropout_rate
        self.ffn_activation = ffn_activation
        self.layer_norm_epsilon = layer_norm_epsilon

        self.mha = MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=dropout_rate
        )
        self.ffn = Sequential([
            Dense(ffn_dim, activation=ffn_activation),
            Dense(embed_dim)
        ], name="encoder_ffn")
        self.layernorm1 = LayerNormalization(epsilon=layer_norm_epsilon)
        self.layernorm2 = LayerNormalization(epsilon=layer_norm_epsilon)
        self.dropout1 = Dropout(dropout_rate) # MHA output dropout is in MHA layer
        self.dropout_ffn = Dropout(dropout_rate)

    def call(
            self, x: Tensor, training: bool = False, 
            attention_mask: Optional[Tensor] = None) -> Tensor:
        attn_output = self.mha(
            query=x, value=x, key=x,
            attention_mask=attention_mask, training=training
        )
        # Dropout after MHA is already handled by MHA layer's dropout param.
        # self.dropout1 is if we want additional dropout on the residual sum.
        out1 = self.layernorm1(x + attn_output) # Post-norm

        ffn_output = self.ffn(out1, training=training)
        ffn_output = self.dropout_ffn(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output) # Post-norm
        return out2
        
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "ffn_dim": self.ffn_dim,
            "dropout_rate": self.dropout_rate,
            "ffn_activation": self.ffn_activation,
            "layer_norm_epsilon": self.layer_norm_epsilon,
        })
        return config

@register_keras_serializable(
    'fusionlab.nn.transformers', name="TransformerDecoderLayer")
class TransformerDecoderLayer(Layer, NNLearner):
    """
    A single layer of the Transformer Decoder.
    (Arguments similar to TransformerEncoderLayer)
    """
    def __init__(
        self, 
        embed_dim: int, 
        num_heads: int, 
        ffn_dim: int, 
        dropout_rate: float = 0.1,
        ffn_activation: str = 'relu',
        layer_norm_epsilon: float = 1e-6,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ffn_dim = ffn_dim
        self.dropout_rate = dropout_rate
        self.ffn_activation = ffn_activation
        self.layer_norm_epsilon = layer_norm_epsilon

        self.mha1_self_attn = MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, 
            dropout=dropout_rate
        )
        self.mha2_cross_attn = MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim,
            dropout=dropout_rate
        )
        self.ffn = Sequential([
            Dense(ffn_dim, activation=ffn_activation),
            Dense(embed_dim)
        ], name="decoder_ffn")
        
        self.layernorm1 = LayerNormalization(epsilon=layer_norm_epsilon)
        self.layernorm2 = LayerNormalization(epsilon=layer_norm_epsilon)
        self.layernorm3 = LayerNormalization(epsilon=layer_norm_epsilon)
        
        # Dropout layers if needed beyond MHA's internal dropout
        self.dropout_ffn = Dropout(dropout_rate)

    def call(
        self, 
        x: Tensor, 
        enc_output: Tensor, 
        training: bool = False, 
        look_ahead_mask: Optional[Tensor] = None, 
        # For encoder output in cross-attention
        padding_mask: Optional[Tensor] = None, 
    ) -> Tensor:
        
        # Masked Multi-Head Self-Attention (for decoder inputs)
        attn1_output = self.mha1_self_attn(
            query=x, value=x, key=x, 
            attention_mask=look_ahead_mask, 
            training=training
        )
        out1 = self.layernorm1(x + attn1_output)

        # Multi-Head Cross-Attention (Query=Decoder, Key/Value=Encoder)
        attn2_output = self.mha2_cross_attn(
            query=out1, value=enc_output, key=enc_output,
            attention_mask=padding_mask, training=training
        )
        out2 = self.layernorm2(out1 + attn2_output) 

        # Feed-Forward Network
        ffn_output = self.ffn(out2, training=training)
        ffn_output = self.dropout_ffn(ffn_output, training=training)
        out3 = self.layernorm3(out2 + ffn_output)
        return out3

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "ffn_dim": self.ffn_dim,
            "dropout_rate": self.dropout_rate,
            "ffn_activation": self.ffn_activation,
            "layer_norm_epsilon": self.layer_norm_epsilon,
        })
        return config
 

# -------------------- TFT components ----------------------------------------

[docs] @register_keras_serializable( "fusionlab.nn.components", name='PositionwiseFeedForward') class PositionwiseFeedForward(Layer, NNLearner): """Implements the Position-wise Feed-Forward Network (FFN) layer. This layer is a core component of a standard Transformer block, typically applied after the multi-head attention sub-layer. Its purpose is to process the context-rich output from the attention mechanism at each position independently, adding non-linearity and transformative capacity to the model. The network consists of two fully-connected (Dense) layers with a non-linear activation function in between. The first layer expands the input dimensionality, and the second layer projects it back down. Parameters ---------- embed_dim : int The input and output dimensionality of the layer. This must match the embedding dimension of the Transformer, often denoted as :math:`d_{model}`. ffn_dim : int The dimensionality of the inner, expanded hidden layer. It is common practice in Transformer architectures to set this to four times the `embed_dim`. activation : str, optional The activation function to use in the inner layer. Any valid Keras activation string is accepted. Defaults to ``"relu"``. dropout_rate : float, optional The dropout rate applied for regularization, typically after the first activation function. Defaults to ``0.1``. **kwargs Standard keyword arguments for a Keras ``Layer``. Notes ----- The "position-wise" nature of this layer is its defining characteristic. The same instance of this layer, with the exact same set of learned weights (:math:`W_1, b_1, W_2, b_2`), is applied to the feature vector at every single position (e.g., time step) in the input sequence. It does not mix information between positions; that task is handled by the preceding self-attention layer. The mathematical operation for a single position vector :math:`x` is: .. math:: \text{FFN}(x) = \text{Linear}_2(\text{activation}(\text{Linear}_1(x))) The residual connection (:math:`x + \text{Dropout}(\text{FFN}(x))`) is typically applied outside this layer, within the main Transformer block. See Also -------- fusionlab.nn.components.TransformerEncoderLayer : A typical consumer of this layer. tf.keras.layers.Dense : The core building block of the FFN. References ---------- .. [1] Vaswani, A., et al. "Attention Is All You Need." *NeurIPS 2017*. Examples -------- >>> import tensorflow as tf >>> # Create a dummy input tensor (batch, sequence_length, embed_dim) >>> input_tensor = tf.random.normal((32, 50, 128)) ... >>> # Instantiate the FFN layer >>> ffn_layer = PositionwiseFeedForward(embed_dim=128, ffn_dim=512) ... >>> # Pass the input through the layer >>> output_tensor = ffn_layer(input_tensor, training=True) ... >>> # The output shape remains the same as the input shape >>> print(f"Input Shape: {input_tensor.shape}") >>> print(f"Output Shape: {output_tensor.shape}") Input Shape: (32, 50, 128) Output Shape: (32, 50, 128) """
[docs] def __init__( self, embed_dim: int, ffn_dim: int, activation: str = "relu", dropout_rate: float = 0.1, **kwargs ): super().__init__(**kwargs) # Store configuration for serialization self.embed_dim = embed_dim self.ffn_dim = ffn_dim self.activation_str = activation self.dropout_rate = dropout_rate # Define the internal layers once in the constructor self.dense_1 = Dense( units=ffn_dim, name="ffn_dense_1" ) self.activation = Activation(activation).activation_fn self.dense_2 = Dense( units=embed_dim, name="ffn_dense_2" ) self.dropout = Dropout(rate=dropout_rate)
[docs] def call(self, x: Tensor, training: bool = False) -> Tensor: """Defines the forward pass for the FFN layer.""" # Project to the intermediate dimension x = self.dense_1(x) # Apply the non-linear activation function x = self.activation(x) # Apply dropout for regularization x = self.dropout(x, training=training) # Project back to the original embedding dimension x = self.dense_2(x) return x
[docs] def get_config(self): """Returns the configuration of the layer for serialization.""" config = super().get_config() config.update({ "embed_dim": self.embed_dim, "ffn_dim": self.ffn_dim, "activation": self.activation_str, "dropout_rate": self.dropout_rate, }) return config
[docs] @register_keras_serializable( 'fusionlab.nn.components', name='PositionalEncoding') class PositionalEncoding(Layer, NNLearner): r"""Injects positional information into an input tensor. This layer adds a positional encoding to the input, allowing models like Transformers to understand the order of the sequence. It uses the standard sinusoidal encoding from the "Attention Is All You Need" paper [1]_. The positional encoding :math:`PE` is defined as: .. math:: PE_{(pos, 2i)} = \sin\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right) .. math:: PE_{(pos, 2i+1)} = \cos\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right) where :math:`pos` is the position in the sequence, :math:`i` is the dimension index, and :math:`d_{\text{model}}` is the feature dimension. Parameters ---------- max_length : int, default 2048 The maximum possible sequence length. The encoding matrix will be pre-calculated up to this length. **kwargs Standard Keras Layer keyword arguments. Examples -------- >>> import tensorflow as tf >>> from fusionlab.nn.components import PositionalEncoding >>> batch_size = 4 >>> sequence_length = 50 >>> feature_dimension = 128 >>> # Create dummy input tensor >>> input_tensor = tf.random.normal( ... (batch_size, sequence_length, feature_dimension) ... ) >>> # Instantiate and apply the layer >>> pos_encoding_layer = PositionalEncoding(max_length=5000) >>> output_tensor = pos_encoding_layer(input_tensor) >>> print("Input Tensor Shape:", input_tensor.shape) >>> print("Output Tensor Shape:", output_tensor.shape) >>> # The shape should be unchanged. >>> assert input_tensor.shape == output_tensor.shape >>> # You can visualize the encoding if you wish >>> import matplotlib.pyplot as plt >>> pe_matrix = pos_encoding_layer.positional_encoding[0, :, :].numpy() >>> plt.figure(figsize=(10, 5)) >>> cax = plt.matshow(pe_matrix, fignum=1, aspect='auto', cmap='viridis') >>> plt.gcf().colorbar(cax) >>> plt.title("Sinusoidal Positional Encoding Matrix") >>> plt.xlabel("Feature Dimension") >>> plt.ylabel("Position in Sequence") >>> plt.show() References ---------- .. [1] Vaswani, A., et al. (2017). "Attention is all you need." *Advances in Neural Information Processing Systems*, 30. """
[docs] def __init__(self, max_length: int = 2048, **kwargs): super().__init__(**kwargs) self.max_length = max_length self.positional_encoding = None
[docs] def build(self, input_shape: TensorShape): """Pre-calculates the positional encoding matrix.""" # The input shape is (batch, sequence_length, feature_dim) _, _, feature_dim = input_shape if self.positional_encoding is None: # The calculation is done once and stored. # Ensure feature_dim is a concrete value for matrix creation. if feature_dim is None: raise ValueError( "The feature dimension of the input to " "PositionalEncoding cannot be `None`. Please " "ensure the input has a defined feature dimension." ) # Cast to float for calculations d_model = tf_cast(feature_dim, tf_float32) # Create a matrix of positions (max_length, 1) positions = tf_range( self.max_length, dtype=tf_float32)[:, tf_newaxis] # Create the division term for the sine/cosine functions # Shape: (feature_dim / 2) div_term = tf_exp( tf_range(0, feature_dim, 2, dtype=tf_float32) * \ (-tf_log(10000.0) / d_model) ) # Calculate sinusoidal values for even and odd indices # Shape of each: (max_length, feature_dim / 2) pe_sin = tf_sin(positions * div_term) pe_cos = tf_cos(positions * div_term) # Interleave sin and cos values to get final encoding # Resulting shape: (max_length, feature_dim) pe_interleaved = tf_reshape( tf_stack([pe_sin, pe_cos], axis=-1), shape=[self.max_length, feature_dim] ) # Add an extra dimension for broadcasting across the batch # Shape: (1, max_length, feature_dim) self.positional_encoding = pe_interleaved[tf_newaxis, :, :] super().build(input_shape)
[docs] def call(self, inputs: Tensor, training=False ) -> Tensor: r"""Adds positional encoding to the input tensor. The 'training' argument is accepted but not used. This ensures API compatibility with Keras. Parameters ---------- inputs : tf.Tensor A 3D tensor of shape :math:`(B, T, D)`, where ``B`` is the batch size, ``T`` is the sequence length, and ``D`` is the feature dimension. Returns ------- tf.Tensor The input tensor with positional encodings added. Shape: :math:`(B, T, D)`. Notes ------ The Positional encoding does not depends on training. The sinusoidal PositionalEncoding layer performs a deterministic mathematical operation. It calculates a fixed matrix of sine and cosine values based on position and feature dimension and simply adds it to the input. This calculation is the same whether you are training the model or running it for inference. Unlike layers such as Dropout or BatchNormalization, PositionalEncoding has no different behavior during training. """ # Get the sequence length of the current input batch. seq_len = tf_shape(inputs)[1] # Slice the pre-calculated encoding matrix to match the input # sequence length and add it to the input tensor. # The broadcasting mechanism will handle the batch dimension. return inputs + self.positional_encoding[:, :seq_len, :]
[docs] def get_config(self) -> dict: """Returns the configuration of the layer.""" config = super().get_config() config.update({ 'max_length': self.max_length, }) return config
@register_keras_serializable( 'fusionlab.nn.components', name="TSPositionalEncoding") class TSPositionalEncoding(Layer, NNLearner): """ Standard Transformer Positional Encoding using sine and cosine functions. Adds positional information to input embeddings. Args: max_position (int): Maximum sequence length that this layer can handle. embed_dim (int): The dimensionality of the embeddings (and the positional encoding). """ def __init__(self, max_position: int, embed_dim: int, **kwargs): super().__init__(**kwargs) self.max_position = max_position self.embed_dim = embed_dim # self.pos_encoding is created once and stored. self.pos_encoding = self._build_positional_encoding( max_position, embed_dim) def _build_positional_encoding( self, position: int, d_model: int) -> Tensor: """Builds the positional encoding matrix using NumPy then converts to Tensor.""" # 1. Calculate angles in NumPy # 'pos' is for positions (sequence length), 'i' is for dimension pos_np = np.arange(position)[:, np.newaxis] i_np = np.arange(d_model)[np.newaxis, :] angle_rates_np = 1 / np.power( 10000, (2 * (i_np // 2)) / np.float32(d_model) ) angle_rads_np = pos_np * angle_rates_np # 2. Apply sin to even indices in the array; 2i angle_rads_np[:, 0::2] = np.sin(angle_rads_np[:, 0::2]) # 3. Apply cos to odd indices in the array; 2i+1 angle_rads_np[:, 1::2] = np.cos(angle_rads_np[:, 1::2]) # 4. Add a new axis for batch dimension and cast to TensorFlow tensor # The self.pos_encoding expects (1, max_position, embed_dim) pos_encoding_tensor = tf_cast( angle_rads_np[np.newaxis, ...], dtype=tf_float32 ) return pos_encoding_tensor def _tf_build_positional_encoding(self, position, d_model): """Builds the positional encoding matrix.""" angle_rads = self._get_angles( # Use np.arange for non-Tensor context # if KERAS_DEPS.arange isn't suitable tf_range(position)[:, tf_newaxis], tf_range(d_model)[tf_newaxis, :], d_model ) # Apply sin to even indices in the array; 2i angle_rads[:, 0::2] = tf_sin(angle_rads[:, 0::2]) # Apply cos to odd indices in the array; 2i+1 angle_rads[:, 1::2] = tf_cos(angle_rads[:, 1::2]) pos_encoding_np = angle_rads[tf_newaxis, ...] return tf_cast(pos_encoding_np, dtype=tf_float32) def _get_angles(self, pos, i, d_model): """Calculates the angle rates for positional encoding.""" # Use np.power for non-Tensor context angle_rates = 1 / np.power(10000, ( 2 * (i // 2)) / np.float32(d_model)) return pos * angle_rates def _tf_get_angles(self, pos, i, d_model): """Calculates the angle rates for positional encoding.""" # cast d_model to float32 d_model_f = tf_cast(d_model, tf_float32) # compute floor(i/2) as an integer tensor half_i = tf_floordiv(i, 2) # build the numerator 2 * (i//2), then cast to float32 numer = tf_cast(2 * half_i, tf_float32) # now both numer and d_model_f are float32 exponent = numer / d_model_f # compute the rates with float constants angle_rates = 1.0 / tf_pow(10000.0, exponent) # and finally apply to pos (cast pos to float32 if needed) return tf_cast(pos, tf_float32) * angle_rates def call(self, x, training=False): """Adds positional encoding to the input tensor `x`. The 'training' argument is accepted but not used. This ensures API compatibility with Keras. """ if not KERAS_BACKEND: raise RuntimeError( "PositionalEncodingTF layer requires " "a Keras backend (TensorFlow)." ) input_seq_len = tf_shape(x)[1] # Add positional encoding up to the length of the input sequence. return x + self.pos_encoding[:, :input_seq_len, :] def get_config(self): config = super().get_config() config.update({ "max_position": self.max_position, "embed_dim": self.embed_dim, }) return config @register_keras_serializable('fusionlab.nn.components', name='PositionalEncoding') class LinearPositionalEncoding(Layer, NNLearner): r""" Positional Encoding layer that incorporates temporal positions into an input sequence by adding positional information to each time step. This helps models, especially those based on attention mechanisms, to capture the order of time steps [1]_. .. math:: \mathbf{Z} = \mathbf{X} + \text{PositionEncoding} where :math:`\mathbf{X}` is the original input and :math:`\mathbf{Z}` is the output with positional encodings added. Parameters ---------- None This layer does not define additional constructor parameters beyond the standard Keras ``Layer``. Notes ----- - This class adds a positional index to each feature across time steps, effectively encoding the temporal position. - Because attention-based models do not inherently encode sequence ordering, positional encoding is crucial for sequence awareness. Methods ------- call(`inputs`) Perform the forward pass, adding positional encoding to the input tensor. get_config() Return the configuration of this layer for serialization. Examples -------- >>> from fusionlab.nn.components import PositionalEncoding >>> import tensorflow as tf >>> # Create random input of shape ... # (batch_size, time_steps, feature_dim) >>> inputs = tf.random.normal((32, 10, 64)) >>> # Instantiate the positional encoding layer >>> pe = PositionalEncoding() >>> # Forward pass >>> outputs = pe(inputs) See Also -------- TemporalFusionTransformer Combines positional encoding in dynamic features for time series. References ---------- .. [1] Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., Kaiser, Ł., & Polosukhin, I. (2017). "Attention is all you need." In *Advances in Neural Information Processing Systems* (pp. 5998-6008). """ @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass that adds positional encoding to ``inputs``. Parameters ---------- inputs : tf.Tensor A 3D tensor of shape :math:`(B, T, D)`, where ``B`` is batch size, ``T`` is time steps, and ``D`` is feature dimension. training : bool, optional Boolean flag indicating whether the layer is in training mode. Not used in this layer but included for Keras API compatibility. Returns ------- tf.Tensor A 3D tensor of the same shape :math:`(B, T, D)`, where each time step has been augmented with its position index. Notes ----- 1. Construct position indices :math:`p = [0, 1, 2, \dots, T - 1]`. 2. Tile and broadcast across features. 3. Add positional index to inputs. """ # Extract shapes dynamically batch_size = tf_shape(inputs)[0] seq_len = tf_shape(inputs)[1] feature_dim = tf_shape(inputs)[2] # Create position indices position_indices = tf_range( 0, seq_len, dtype='float32' ) position_indices = tf_expand_dims( position_indices, axis=0 ) position_indices = tf_expand_dims( position_indices, axis=-1 ) # Tile to match input shape position_encoding = tf_tile( position_indices, [batch_size, 1, feature_dim] ) # Return input plus positional encoding return inputs + position_encoding def get_config(self): r""" Return the configuration of this layer for serialization. Returns ------- dict Dictionary of layer configuration. """ config = super().get_config().copy() return config
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="GatedResidualNetwork" ) @param_deprecated_message( conditions_params_mappings=[ { 'param': 'use_time_distributed', 'condition': lambda v: v is not None and v is not False, 'message': ( "The 'use_time_distributed' parameter in GatedResidualNetwork " "is deprecated and has no effect.\n" "The layer automatically handles time dimensions based on " "input rank.\n" "If using within VariableSelectionNetwork, control time " "distribution via the VSN's own 'use_time_distributed' parameter." ), } ], warning_category=DeprecationWarning ) class GatedResidualNetwork(Layer): """Gated Residual Network applying transformations with optional context.""" _COMMON_ACTIVATIONS = { "relu", "tanh", "sigmoid", "elu", "selu", "gelu", "linear", None }
[docs] @validate_params({ "units": [Interval(Integral, 0, None, closed='left')], "dropout_rate": [Interval(Real, 0, 1, closed="both")], "use_batch_norm": [bool], "activation": [StrOptions(_COMMON_ACTIVATIONS)], "output_activation": [StrOptions(_COMMON_ACTIVATIONS), None], "use_time_distributed": [bool, None], }) @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, units: int, dropout_rate: float = 0.0, activation: str = 'elu', output_activation: Optional[str] = None, use_batch_norm: bool = False, use_time_distributed: Optional[bool] = None, **kwargs ): """Initializes the GatedResidualNetwork layer.""" super().__init__(**kwargs) self.units = units self.dropout_rate = dropout_rate self.use_batch_norm = use_batch_norm self.activation_str = activation self.output_activation_str = output_activation # The use_time_distributed parameter is stored only to allow # the decorator to check its value. It is NOT used in the # layer's logic anymore. self._deprecated_use_td = use_time_distributed # --- Convert activation strings to callable functions --- try: self.activation_fn = activations.get(activation) self.output_activation_fn = activations.get(output_activation) \ if output_activation is not None else None except Exception as e: # Catch potential errors during activation lookup raise ValueError( f"Failed to get activation function '{activation}' or " f"'{output_activation}'. Error: {e}" ) from e # --- Define Internal Layers --- # Dense layer processing input (x + optional context) # Activation is applied *after* this layer manually self.input_dense = Dense(self.units, activation=None, name="input_dense") # Dense layer projecting context (if provided) # No bias as per original paper often; no activation needed here self.context_dense = Dense( self.units, use_bias=False, name="context_dense" ) # Optional Batch Normalization (applied after main activation) self.batch_norm = BatchNormalization( name="batch_norm" ) if self.use_batch_norm else None # Dropout Layer (applied after activation/norm) self.dropout = Dropout( self.dropout_rate, name="grn_dropout" ) # Dense layer for main transformation path (after dropout) self.output_dense = Dense( self.units, activation=None, name="output_dense" ) # Dense layer for gating mechanism applied to input projection self.gate_dense = Dense( self.units, activation='sigmoid', name="gate_dense" ) # Final Layer Normalization (standard in GRN) self.layer_norm = LayerNormalization( name="output_layer_norm" ) # Projection layer for residual # connection (created in build) self.projection = None
[docs] def build(self, input_shape): """Builds the residual projection layer if needed.""" # Use TensorShape object directly if available if not isinstance(input_shape, tf_TensorShape): # Attempt conversion, handles tuples, lists, TensorShape try: input_shape = tf_TensorShape(input_shape) except TypeError: raise ValueError( f"Could not convert input_shape to TensorShape:" f" {input_shape}" ) # Check rank using the TensorShape object property input_rank = input_shape.rank # This returns None if rank is unknown # Check minimum rank requirement only if rank is known if input_rank is not None and input_rank < 2: raise ValueError( "Input shape must have at least 2 dimensions " f"(Batch, Features). Received rank: {input_rank}" f", shape: {input_shape}" ) input_dim = None # Only try to get last dimension if rank is known if input_rank is not None: input_dim = input_shape[-1] # Further check if last dimension itself is known (is an integer) if not isinstance(input_dim, int) or input_dim <= 0 : # Last dimension is unknown or invalid warnings.warn( f"Input shape {input_shape} has unknown or invalid " "last dimension in GRN build. Cannot check " "if projection layer is needed.", RuntimeWarning ) input_dim = None # Treat as unknown if not valid int # Create projection layer only if dimensions are known and differ if (input_dim is not None) and (input_dim != self.units): if self.projection is None: # Avoid recreating self.projection = Dense(self.units, name="residual_projection") # Build projection layer using the full input shape object self.projection.build(input_shape) # Comment: Residual projection created and built. elif input_dim == self.units: # Set projection to None explicitly if dims match self.projection = None # context_dense builds lazily on first call # Call the build method of the parent class super().build(input_shape)
[docs] def call(self, x, context=None, training=False): """Forward pass implementing GRN with optional context.""" # Input x shape (B, ..., F_in) # Context shape (if provided) (B, ..., Units) after projection """Forward pass implementing GRN with optional context.""" _logger.debug( f"DEBUG_GRN: Entering call. x shape: {tf_shape(x)}," f" context provided: {context is not None}") # DEBUG # --- 1. Residual Connection Setup --- shortcut = x if self.projection is not None: _logger.debug("DEBUG_GRN: Applying projection.") # DEBUG shortcut = self.projection(shortcut) # Shape (B, ..., Units) # --- 2. Process Input and Context --- # Project input features to 'units' dimension _logger.debug( f"DEBUG_GRN: Applying input_dense to x shape: {tf_shape(x)}") # DEBUG projected_input = self.input_dense(x) # Shape (B, ..., Units) input_plus_context = projected_input # No context added; Default # Add processed context if provided if context is not None: _logger.debug("DEBUG_GRN: Applying context_dense" f" to context shape: {tf_shape(context)}") # DEBUG context_proj = self.context_dense(context) # Shape (B, ..., Units) # Ensure context can be added (handle broadcasting) # x_rank = tf_rank(projected_input) # Use standard Python len() on shapes now, # Use standard Python len() on shapes now, x_rank = len(projected_input.shape) ctx_rank = len(context_proj.shape) # x_rank = projected_input.shape.rank # #ctx_rank = tf_rank(context_proj) # ctx_rank = context_proj.shape.rank _logger.debug( f"DEBUG_GRN: x_rank={x_rank}, ctx_rank={ctx_rank}") # DEBUG if x_rank == 3 and ctx_rank == 2:# e.g., x=(B,T,U), ctx=(B,U) # Add time dimension for broadcasting: (B,U) -> (B,1,U) context_proj_expanded = tf_expand_dims(context_proj, axis=1) # Now shapes should be broadcast-compatible _logger.debug("DEBUG_GRN: Adding context.") # DEBUG input_plus_context = tf_add(projected_input, context_proj_expanded) elif x_rank == ctx_rank: # Ranks match, add directly _logger.debug( "DEBUG_GRN: Ranks match, Adding context directly." ) # DEBUG input_plus_context = tf_add(projected_input, context_proj) else: # Raise error for incompatible ranks raise ValueError( f"Incompatible ranks GRN input ({x_rank})" f" and context ({ctx_rank}). Cannot broadcast/add." ) # --- 3. Apply Activation and Regularization --- _logger.debug("Applying activation_fn.") # DEBUG activated_features = self.activation_fn(input_plus_context) if self.batch_norm is not None: # Apply BN after activation activated_features = self.batch_norm(activated_features, training=training) _logger.debug("Applying dropout.") # DEBUG regularized_features = self.dropout(activated_features, training=training) # --- 4. Main Transformation Path --- _logger.debug("Applying output_dense.") # DEBUG transformed_output = self.output_dense(regularized_features) # --- 5. Gating Path --- _logger.debug("Applying gate_dense.") # DEBUG # Gate depends on input+context projection *before* main activation gate_values = self.gate_dense(input_plus_context) # --- 6. Apply Gate --- _logger.debug("Applying gate multiplication.") # DEBUG gated_output = tf_multiply(transformed_output, gate_values) # --- 7. Add Residual --- _logger.debug("Adding residual connection.") # DEBUG residual_output = tf_add(shortcut, gated_output) # --- 8. Final Normalization & Optional Activation --- _logger.debug("Applying layer_norm.") # DEBUG normalized_output = self.layer_norm(residual_output) final_output = normalized_output if self.output_activation_fn is not None: _logger.debug("Applying output_activation_fn.") # DEBUG final_output = self.output_activation_fn(normalized_output) # Applied final output activation. _logger.debug("Exiting call successfully.") # DEBUG return final_output
[docs] def get_config(self): """Returns the layer configuration.""" config = super().get_config() config.update({ 'units': self.units, 'dropout_rate': self.dropout_rate, # 'use_time_distributed' removed from config 'activation': self.activation_str, # Use original string 'output_activation': self.output_activation_str, # Use original string 'use_batch_norm': self.use_batch_norm, }) return config
[docs] @classmethod def from_config(cls, config): """Creates layer from its config.""" return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="VariableSelectionNetwork" ) class VariableSelectionNetwork(Layer, NNLearner): """Applies GRN to each variable and learns importance weights."""
[docs] @validate_params({ "num_inputs": [Interval(Integral, 0, None, closed='left')], "units": [Interval(Integral, 1, None, closed='left')], "dropout_rate": [Interval(Real, 0, 1, closed="both")], "use_time_distributed": [bool], "use_batch_norm": [bool], "activation": [StrOptions( {"elu", "relu", "tanh", "sigmoid", "linear", "gelu", None} )] }) @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, num_inputs: int, units: int, dropout_rate: float = 0.0, use_time_distributed: bool = False, activation: str = 'elu', use_batch_norm: bool = False, **kwargs ): super().__init__(**kwargs) self.num_inputs = num_inputs self.units = units self.dropout_rate = dropout_rate self.use_time_distributed = use_time_distributed self.use_batch_norm = use_batch_norm # Store original activation string for config _Activation = Activation(activation) self.activation_str = _Activation.activation_str self.activation_fn = _Activation.activation_fn # --- Layers --- # 1. GRN for each individual input variable # GRN's __init__ should handle converting activation string self.single_variable_grns = [ GatedResidualNetwork( units=units, dropout_rate=dropout_rate, activation=self.activation_str, # Pass string use_batch_norm=use_batch_norm, name=f"single_var_grn_{i}" ) for i in range(num_inputs) ] # 2. Dense layer to compute variable importances (applied later) # Output units = 1 per variable for the original weighting method self.variable_importance_dense = Dense( 1, name="variable_importance_dense" ) # 3. Softmax for normalizing weights across variables (N dimension) # Axis -2 assumes stacked_outputs shape (B, [T,] N, units) self.softmax = Softmax(axis=-2, name="variable_weights_softmax") # 4. Optional context projection layer (created in build) # Projects external context to 'units' for GRNs self.context_projection = None # Attribute to store weights self.variable_importances_ = None
[docs] @tf_autograph.experimental.do_not_convert def build(self, input_shape): """Builds internal GRNs and projection layers with explicit shapes.""" # Use TensorShape object for robust handling if not isinstance(input_shape, tf_TensorShape): input_shape = tf_TensorShape(input_shape) input_rank = input_shape.rank expected_min_rank = 3 if self.use_time_distributed else 2 # Check if rank is known and sufficient if input_rank is None or input_rank < expected_min_rank: # If rank unknown or too low at build time, # we cannot proceed reliably. # This indicates an issue upstream or # requires dynamic shapes throughout. raise ValueError( f"VSN build requires input rank >= {expected_min_rank}" f" with known rank. Received shape: {input_shape}" ) # Determine shape of input slices passed to single_variable_grns # Add feature dim F=1 if missing # Add feature dimension if missing inferred_input_shape = tf_cond( tf_equal(input_rank, expected_min_rank), lambda: input_shape.as_list() + [1], lambda: input_shape.as_list() ) # Shape: (B, N, F=1) or (B, T, N, F=1) # Ensure dimensions (except batch) are # known for building sub-layers if any(d is None for d in inferred_input_shape[1:]): # This should ideally not happen if # input comes from previous layers # but handle defensively. raise ValueError( f"VSN build received unknown non-batch dimensions in shape " f"{inferred_input_shape}. Cannot reliably build sub-layers." ) # Calculate the expected shape for a single variable slice if self.use_time_distributed: # Input (B, T, N, F) -> Slice is (B, T, F) single_var_input_shape = tf_TensorShape( [inferred_input_shape[0], # Batch (can be None) inferred_input_shape[1], # Time (should be known) inferred_input_shape[3]] # Features (should be known) ) else: # Input (B, N, F) -> Slice is (B, F) single_var_input_shape = tf_TensorShape( [inferred_input_shape[0], # Batch (can be None) inferred_input_shape[2]] # Features (should be known) ) # --- Explicitly build each single_variable_grn --- # Use the calculated slice shape for grn in self.single_variable_grns: if not grn.built: try: grn.build(single_var_input_shape) # Comment: Built internal GRN with calculated shape. except Exception as e: # Add more context if GRN build fails raise RuntimeError( f"Failed to build internal GRN {grn.name} with shape " f"{single_var_input_shape} derived from VSN input " f"{input_shape}. Original error: {e}" ) from e # Build context projection layer lazily (or here if context shape known) if self.context_projection is None: self.context_projection = Dense( self.units, name="context_projection", # Pass string, Dense handles activation resolution activation=self.activation_str ) # Let Keras build context_projection on first call with context # Build other internal layers like weighting_grn if needed here super().build(input_shape) # Call parent build last
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, context=None, training=False): """Execute the forward pass with optional context.""" _logger.debug(f"VSN '{self.name}': Entering call method.") _logger.debug( f" Initial input shape: {getattr(inputs, 'shape', 'N/A')}") _logger.debug(f" Context provided: {context is not None}") _logger.debug(f" Training mode: {training}") # --- Input Validation and Reshaping --- # Use Python len() on shape - works reliably with decorator try: actual_rank = len(inputs.shape) except Exception as e: _logger.error(f"VSN '{self.name}': Failed to get input rank." f" Input type: {type(inputs)}. Error: {e}") raise TypeError(f"Could not determine rank of input with shape" f" {getattr(inputs, 'shape', 'N/A')}") from e expected_min_rank = 3 if self.use_time_distributed else 2 _logger.debug(f" Input rank: actual={actual_rank}, expected_min=" f"{expected_min_rank}") if actual_rank < expected_min_rank: # Raise error if rank is insufficient raise ValueError( f"VSN '{self.name}': Input rank must be >= " f"{expected_min_rank}. Got rank {actual_rank} for " f"shape {inputs.shape}." ) # Add feature dimension if missing (e.g., B,N -> B,N,1 or B,T,N -> B,T,N,1) if actual_rank == expected_min_rank: _logger.debug( f" Input rank matches minimum expected ({actual_rank})." " Expanding feature dimension." ) inputs = tf_expand_dims(inputs, axis=-1) _logger.debug( f" Input shape after expansion: {inputs.shape}" ) # Input shape is now (B, N, F) or (B, T, N, F) # --- Context Processing --- processed_context = None if context is not None: _logger.debug( f" Processing provided context. Shape: {context.shape}" ) # Ensure context projection layer is created (lazily if needed) if self.context_projection is None: _logger.warning( f"VSN '{self.name}': Context projection layer" " not built in build method. Building lazily." ) self.context_projection = Dense( self.units, name="context_projection", activation=self.activation_str # Use string ) processed_context = self.context_projection(context) _logger.debug( f" Processed context shape: {processed_context.shape}") # Note: GRN's call method handles broadcasting this context else: _logger.debug(" No context provided.") # --- Apply GRN to each variable --- var_outputs = [] _logger.debug( f" Applying single_variable_grns to {self.num_inputs}" " inputs..." ) # Python loop - should execute as Python code due to decorator for i in range(self.num_inputs): _logger.debug( f" Processing variable index {i}") # Slice input for the i-th variable if self.use_time_distributed: # Slice variable i: (B, T, N, F) -> (B, T, F) var_input = inputs[:, :, i, :] _logger.debug( " Sliced var_input shape (TD):" f" {var_input.shape}") else: # Slice variable i: (B, N, F) -> (B, F) var_input = inputs[:, i, :] _logger.debug( " Sliced var_input shape (non-TD):" f" {var_input.shape}") # Apply the i-th GRN, passing the (potentially None) context # GRN's call method should also have @do_not_convert if needed grn_output = self.single_variable_grns[i]( var_input, context=processed_context, # Pass processed context training=training ) var_outputs.append(grn_output) _logger.debug( " GRN output shape for var {i}:" f" {grn_output.shape}") # Output shape: (B, T, units) or (B, units) # --- Stack GRN outputs along variable dimension (N) --- # axis=-2 places N before the 'units' dimension stacked_outputs = tf_stack(var_outputs, axis=-2) _logger.debug( f" Stacked GRN outputs shape: {stacked_outputs.shape}") # Shape: (B, T, N, units) or (B, N, units) # --- Calculate Variable Importance Weights (Original Simple Logic) --- # 1. Apply Dense layer (output units = 1) to stacked outputs # Acts on the last dimension ('units') _logger.debug(" Calculating importance logits...") importance_logits = self.variable_importance_dense(stacked_outputs) _logger.debug( f" Importance logits shape: {importance_logits.shape}" ) # Shape: (B, [T,] N, 1) # 2. Apply Softmax across the variable dimension (N, axis=-2) _logger.debug(" Calculating importance weights (softmax)...") weights = self.softmax(importance_logits) _logger.debug(f" Importance weights shape: {weights.shape}") # Shape: (B, [T,] N, 1) self.variable_importances_ = weights # Store weights # --- Weighted Combination --- # Multiply stacked GRN outputs by weights and sum across N _logger.debug(" Performing weighted sum...") weighted_sum = tf_reduce_sum( tf_multiply(stacked_outputs, weights), axis=-2 # Sum across the variable dimension (N) ) _logger.debug( f" Final weighted sum output shape: {weighted_sum.shape}" ) # Final output shape: (B, T, units) or (B, units) _logger.debug(f"VSN '{self.name}': Exiting call method.") return weighted_sum
[docs] def get_config(self): """Returns the layer configuration.""" config = super().get_config() config.update({ 'num_inputs': self.num_inputs, 'units': self.units, 'dropout_rate': self.dropout_rate, 'use_time_distributed': self.use_time_distributed, 'activation': self.activation_str, 'use_batch_norm': self.use_batch_norm, }) return config
[docs] @classmethod def from_config(cls, config): """Creates layer from its config.""" return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="TemporalAttentionLayer" ) class TemporalAttentionLayer(Layer): """Temporal Attention Layer conditioning query with context."""
[docs] @validate_params({ "units": [Interval(Integral, 0, None, closed='left')], "num_heads": [Interval(Integral, 0, None, closed='left')], "dropout_rate": [Interval(Real, 0, 1, closed="both")], "use_batch_norm": [bool], }) @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, units: int, num_heads: int, dropout_rate: float = 0.0, activation: str = 'elu', use_batch_norm: bool = False, **kwargs ): """Initializes the TemporalAttentionLayer.""" super().__init__(**kwargs) self.units = units self.num_heads = num_heads self.dropout_rate = dropout_rate self.use_batch_norm = use_batch_norm self.activation_str = Activation(activation).activation_str # --- Define Internal Layers --- self.multi_head_attention = MultiHeadAttention( num_heads=num_heads, key_dim=units, dropout=dropout_rate, name="mha" ) self.dropout = Dropout(dropout_rate, name="attn_dropout") self.layer_norm1 = LayerNormalization(name="layer_norm_1") # GRN to process the input context_vector # Ensure this is a single instance, passing the activation string self.context_grn = GatedResidualNetwork( units=units, # Output matches main path 'units' dropout_rate=dropout_rate, activation=self.activation_str, use_batch_norm=self.use_batch_norm, name="context_grn" # Note: GRN's internal activation handling should be fixed ) # Final GRN (position-wise feedforward) # Ensure this is also a single instance self.output_grn = GatedResidualNetwork( units=units, dropout_rate=dropout_rate, activation=self.activation_str, use_batch_norm=self.use_batch_norm, name="output_grn" )
[docs] def build(self, input_shape): """Builds internal layers, especially GRNs.""" # input_shape corresponds to the main 'inputs' tensor (B, T, U) if not isinstance(input_shape, (list, tuple)): # If only main input shape is passed (common) main_input_shape = tuple(input_shape) elif len(input_shape) == 2: # [inputs_shape, context_shape] rarelly happended main_input_shape = tuple(input_shape[0]) # Optionally build context_grn if context_shape is known context_shape = tuple(input_shape[1]) if not self.context_grn.built: self.context_grn.build(context_shape) else: raise ValueError( "Unexpected input_shape format for build.") if len(main_input_shape) < 3: raise ValueError( "TemporalAttentionLayer expects input rank >= 3") # Define expected input shape for output_grn # It receives output from layer_norm1, which has same shape as input output_grn_input_shape = main_input_shape # Explicitly build the output GRN if not already built if not self.output_grn.built: self.output_grn.build(output_grn_input_shape) # Developer comment: Explicitly built output_grn. # Build context_grn lazily during call or here # Call the parent build method AFTER building sub-layers super().build(input_shape)
# Developer comment: Layer built status should now be True.
[docs] def call(self, inputs, context_vector=None, training=False): """Forward pass of the temporal attention layer.""" # Input shapes: inputs=(B, T, U), context_vector=(B, U_ctx) query = inputs # Default query processed_context = None # --- Process Context Vector (if provided) --- if context_vector is not None: # Pass context_vector as the main input 'x' to context_grn processed_context = self.context_grn( x=context_vector, context=None, # No nested context for the context_grn itself training=training ) # Output shape: (B, units) # Expand context across time: (B, units) -> (B, 1, units) context_expanded = tf_expand_dims(processed_context, axis=1) # Add to inputs (broadcasting handles time dimension) query = tf_add(inputs, context_expanded) # Comment: Query now incorporates static context. # --- Multi-Head Self-Attention --- attn_output = self.multi_head_attention( query=query, value=inputs, key=inputs, training=training ) # Shape: (B, T, units) # --- Add & Norm (First Residual Connection) --- attn_output_dropout = self.dropout(attn_output, training=training) # Residual connection uses original 'inputs' x_attn = self.layer_norm1(tf_add(inputs, attn_output_dropout)) # Shape: (B, T, units) # --- Position-wise Feedforward (Final GRN) --- # This GRN takes the output of the attention block as input 'x' # It does not receive the external 'context_vector' here. # --- DEBUG lines --- _logger.debug("\nDEBUG>> About to call self.output_grn") _logger.debug( "DEBUG>> Type of self.output_grn:" f" {type(self.output_grn)}") _logger.debug( "DEBUG>> Is self.output_grn callable:" f" {callable(self.output_grn)}") try: # Try accessing an attribute expected on a Keras layer _logger.debug( "DEBUG>> self.output_grn name:" f" {self.output_grn.name}") _logger.debug( "DEBUG>> self.output_grn built status:" f" {self.output_grn.built}") except AttributeError as ae: _logger.debug( "DEBUG>> Failed to access attributes" f" of self.output_grn: {ae}") _logger.debug( f"DEBUG>> Input x_attn shape: {tf_shape(x_attn)}\n") # --- End DEBUG lines --- output = self.output_grn( x=x_attn, context=None, # No external context for the final GRN training=training ) # Shape: (B, T, units) return output
[docs] def get_config(self): """Returns the layer configuration.""" config = super().get_config() config.update({ 'units': self.units, 'num_heads': self.num_heads, 'dropout_rate': self.dropout_rate, 'activation': self.activation_str, 'use_batch_norm': self.use_batch_norm, }) return config
[docs] @classmethod def from_config(cls, config): """Creates layer from its config.""" return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="StaticEnrichmentLayer" ) class StaticEnrichmentLayer(Layer, NNLearner): r""" Static Enrichment Layer for combining static and temporal features [1]_. This layer enriches temporal features with static context, enabling the model to modulate temporal dynamics based on static information. It concatenates a tiled static context vector to temporal features and processes them through a :class:`GatedResidualNetwork`, yielding an enriched feature map that combines both static and temporal information. .. math:: \mathbf{Z} = \text{GRN}\big([\mathbf{C}, \mathbf{X}]\big) where :math:`\mathbf{C}` is a static context vector tiled over the time dimension, and :math:`\mathbf{X}` are the temporal features. Parameters ---------- units : int Number of hidden units within the internally used `GatedResidualNetwork`. activation : str, optional Activation function used in the GRN. Must be one of {'elu', 'relu', 'tanh', 'sigmoid', 'linear'}. Defaults to ``'elu'``. use_batch_norm : bool, optional Whether to apply batch normalization within the GRN. Defaults to ``False``. **kwargs : Additional arguments passed to the parent Keras ``Layer``. Notes ----- This layer performs the following: 1. Expand static context from shape :math:`(B, U)` to :math:`(B, T, U)`. 2. Concatenate with temporal features :math:`(B, T, D)` along the last dimension. 3. Pass the combined tensor through a `GatedResidualNetwork`. Methods ------- call(`static_context_vector`, `temporal_features`, training=False) Forward pass of the static enrichment layer. get_config() Returns the configuration dictionary for serialization. from_config(`config`) Instantiates the layer from a configuration dictionary. Examples -------- >>> from fusionlab.nn.components import StaticEnrichmentLayer >>> import tensorflow as tf >>> # Define static context of shape (batch_size, units) ... # and temporal features of shape ... # (batch_size, time_steps, units) >>> static_context_vector = tf.random.normal((32, 64)) >>> temporal_features = tf.random.normal((32, 10, 64)) >>> # Instantiate the static enrichment layer >>> sel = StaticEnrichmentLayer( ... units=64, ... activation='relu', ... use_batch_norm=True ... ) >>> # Forward pass >>> outputs = sel( ... static_context_vector, ... temporal_features, ... training=True ... ) See Also -------- GatedResidualNetwork Used within the static enrichment layer to combine static and temporal features. TemporalFusionTransformer Incorporates the static enrichment mechanism. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @validate_params({ "units": [Interval(Integral, 1, None, closed='left')], "use_batch_norm": [bool], }) @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, units, activation='elu', use_batch_norm=False, **kwargs ): r""" Initialize the StaticEnrichmentLayer. Parameters ---------- units : int Number of hidden units in the internal :class:`GatedResidualNetwork`. activation : str, optional Activation function for the GRN. Defaults to ``'elu'``. use_batch_norm : bool, optional Whether to apply batch normalization in the GRN. Defaults to ``False``. **kwargs : Additional arguments passed to the parent Keras ``Layer``. """ super().__init__(**kwargs) self.units = units self.use_batch_norm = use_batch_norm # Create the activation object self.activation = activation # GatedResidualNetwork instance self.grn = GatedResidualNetwork( units=units, activation=self.activation, use_batch_norm=use_batch_norm )
[docs] @tf_autograph.experimental.do_not_convert def call( self, temporal_features, context_vector, training=False ): r""" Forward pass of the static enrichment layer. Parameters ---------- ``static_context_vector`` : tf.Tensor Static context of shape :math:`(B, U)`. ``temporal_features`` : tf.Tensor Temporal features of shape :math:`(B, T, D)`. training : bool, optional Whether the layer is in training mode. Defaults to ``False``. Returns ------- tf.Tensor Enriched temporal features of shape :math:`(B, T, U)`, assuming ``units = U``. Notes ----- 1. Expand and tile `static_context_vector` over time steps. 2. Concatenate with `temporal_features`. 3. Pass through internal GRN for final transformation. """ # Expand the static context to align # with temporal features along T static_context_expanded = tf_expand_dims( context_vector, axis=1 ) # Tile across the time dimension static_context_expanded = tf_tile( static_context_expanded, [ 1, tf_shape(temporal_features)[1], 1 ] ) # Concatenate static context # with temporal features combined = tf_concat( [static_context_expanded, temporal_features], axis=-1 ) # Transform with GRN output = self.grn(combined, training=training) return output
[docs] def get_config(self): r""" Return the layer configuration for serialization. Returns ------- dict Configuration dictionary containing initialization parameters. """ config = super().get_config().copy() config.update({ 'units': self.units, 'activation': self.activation, 'use_batch_norm': self.use_batch_norm, }) return config
[docs] @classmethod def from_config(cls, config): r""" Create a new instance from a config dictionary. Parameters ---------- ``config`` : dict Configuration as returned by ``get_config``. Returns ------- StaticEnrichmentLayer Instantiated layer object. """ return cls(**config)
# -------------------- XTFT components ----------------------------------------
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="LearnedNormalization" ) class LearnedNormalization(Layer, NNLearner): r""" Learned Normalization layer that learns mean and standard deviation parameters for normalizing input features. This layer can be used to replace or augment standard data preprocessing steps by allowing the model to learn the optimal scaling dynamically. Parameters ---------- None This layer does not define additional initialization parameters besides standard Keras `Layer`. Notes ----- This layer maintains two trainable weights: 1) mean: shape :math:`(D,)` 2) stddev: shape :math:`(D,)` where ``D`` is the last dimension of the input (feature dimension). Methods ------- call(`inputs`, training=False) Forward pass. Normalizes the input by subtracting the learned mean and dividing by the learned standard deviation plus a small epsilon. get_config() Returns the configuration dictionary for serialization. from_config(`config`) Instantiates the layer from a config dictionary. Examples -------- >>> from fusionlab.nn.components import LearnedNormalization >>> import tensorflow as tf >>> # Create input of shape (batch_size, features) >>> x = tf.random.normal((32, 10)) >>> # Instantiate the learned normalization layer >>> norm_layer = LearnedNormalization() >>> # Forward pass >>> x_norm = norm_layer(x) See Also -------- MultiModalEmbedding An embedding layer that can be used alongside learned normalization in a pipeline. HierarchicalAttention Another specialized layer for attention mechanisms. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, **kws): super().__init__(**kws)
[docs] def build(self, input_shape): r""" Build method that creates trainable weights for mean and stddev according to the last dimension of the input. Parameters ---------- input_shape : tuple Shape of the input, typically (batch_size, ..., feature_dim). """ self.mean = self.add_weight( "mean", shape=(input_shape[-1],), initializer="zeros", trainable=True ) self.stddev = self.add_weight( "stddev", shape=(input_shape[-1],), initializer="ones", trainable=True ) super().build(input_shape)
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass of the LearnedNormalization layer. Subtracts the learned `mean` from ``inputs`` and divides by ``stddev + 1e-6`` to avoid division by zero. Parameters ---------- ``inputs`` : tf.Tensor Input tensor of shape :math:`(B, ..., D)`. training : bool, optional Flag indicating if the layer is in training mode. Defaults to ``False``. Returns ------- tf.Tensor Normalized tensor of the same shape as ``inputs``. """ return (inputs - self.mean) / (self.stddev + 1e-6)
[docs] def get_config(self): r""" Returns the configuration dictionary for this layer. Returns ------- dict Configuration dictionary. """ config = super().get_config().copy() return config
[docs] @classmethod def from_config(cls, config): r""" Instantiates the layer from a config dictionary. Parameters ---------- ``config`` : dict Configuration dictionary. Returns ------- LearnedNormalization A new instance of this layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="MultiModalEmbedding" ) class MultiModalEmbedding(Layer, NNLearner): r""" MultiModalEmbedding layer for embedding multiple input modalities into a common feature space and concatenating them along the last dimension. This layer takes a list of tensors, each representing a different modality with the same batch and time dimensions. It applies a dense projection (with activation) to each modality, converting them to the same dimensionality before concatenation. .. math:: \mathbf{H}_{out} = \text{Concat}\big( \text{Dense}(\mathbf{M_1}),\, \text{Dense}(\mathbf{M_2}),\,\dots\big) where each :math:`\mathbf{M_i}` is a tensor for a specific modality. Parameters ---------- embed_dim : int Dimensionality of the output embedding for each modality. Notes ----- This layer expects each input modality tensor to have the same batch and time dimensions, but potentially different feature dimensions. Methods ------- call(`inputs`, training=False) Forward pass that projects each modality separately, then concatenates. get_config() Returns a configuration dictionary for serialization. from_config(`config`) Recreates the layer from a config dict. Examples -------- >>> from fusionlab.nn.components import MultiModalEmbedding >>> import tensorflow as tf >>> # Suppose we have two modalities: ... # dynamic_modality : (batch, time, dyn_dim) ... # future_modality : (batch, time, fut_dim) >>> dyn_input = tf.random.normal((32, 10, 16)) >>> fut_input = tf.random.normal((32, 10, 8)) >>> # Instantiate the layer >>> mm_embed = MultiModalEmbedding(embed_dim=32) >>> # Forward pass with both modalities >>> outputs = mm_embed([dyn_input, fut_input]) See Also -------- LearnedNormalization Normalizes input features before embedding. HierarchicalAttention Another specialized layer that can be used after embeddings are computed. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, embed_dim: int): super().__init__() self.embed_dim = embed_dim # Will hold a separate Dense layer # for each modality self.dense_layers = []
[docs] def build(self, input_shape): r""" Build method that creates a Dense layer for each modality based on input_shape. Parameters ---------- input_shape : list of tuples Each tuple corresponds to a modality's shape, typically (batch_size, time_steps, feature_dim). """ for modality_shape in input_shape: if modality_shape is not None: self.dense_layers.append( Dense( self.embed_dim, activation='relu' ) ) else: raise ValueError( "Unsupported modality type." ) super().build(input_shape)
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass: project each modality into `embed_dim` and concatenate. Parameters ---------- ``inputs`` : list of tf.Tensor Each tensor has shape :math:`(B, T, D_i)` where `D_i` can vary by modality. training : bool, optional Indicates if the layer is in training mode. Defaults to ``False``. Returns ------- tf.Tensor A concatenated embedding of shape :math:`(B, T, \sum_{i}(\text{embed_dim}))`. """ embeddings = [] for idx, modality in enumerate(inputs): if isinstance(modality, Tensor): modality_embed = ( self.dense_layers[idx]( modality ) ) else: raise ValueError( "Unsupported modality type." ) embeddings.append(modality_embed) return tf_concat(embeddings, axis=-1)
[docs] def get_config(self): r""" Returns the configuration dictionary of this layer. Returns ------- dict Configuration including `embed_dim`. """ config = super().get_config().copy() config.update({ 'embed_dim': self.embed_dim }) return config
[docs] @classmethod def from_config(cls, config): r""" Recreates a MultiModalEmbedding layer from a config dictionary. Parameters ---------- ``config`` : dict Configuration as produced by ``get_config``. Returns ------- MultiModalEmbedding A new instance of this layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="HierarchicalAttention" ) class HierarchicalAttention(Layer, NNLearner): r""" Hierarchical Attention layer that processes short-term and long-term sequences separately using multi-head attention, then combines their outputs [1]_. This allows the model to focus on different aspects of the data in short-term and long-term contexts and aggregate the attention outputs for a more comprehensive representation. .. math:: \mathbf{Z} = \text{MHA}(\mathbf{X}_{s}) + \text{MHA}(\mathbf{X}_{l}) where :math:`\mathbf{X}_{s}` and :math:`\mathbf{X}_{l}` are the short- and long-term sequences, respectively. Parameters ---------- units : int Dimensionality of the projection for the attention keys, queries, and values. num_heads : int Number of attention heads to use in each multi-head attention sub-layer. Notes ----- The output shape depends on the last dimension in the short and long sequences, projected to `units`. The final output is the sum of the short-term attention output and the long-term attention output. Methods ------- call(`inputs`, training=False) Forward pass. Expects a list `[short_term, long_term]` with shapes (B, T, D_s) and (B, T, D_l). get_config() Returns configuration dictionary for serialization. from_config(`config`) Recreates the layer from a config dict. Examples -------- >>> from fusionlab.nn.components import HierarchicalAttention >>> import tensorflow as tf >>> # Suppose short_term and long_term have ... # shape (batch_size, time_steps, features). >>> short_term = tf.random.normal((32, 10, 64)) >>> long_term = tf.random.normal((32, 10, 64)) >>> # Instantiate hierarchical attention >>> ha = HierarchicalAttention(units=64, num_heads=4) >>> # Forward pass >>> outputs = ha([short_term, long_term]) See Also -------- MultiModalEmbedding Can precede attention by embedding multiple sources of input. LearnedNormalization Can be applied to short_term and long_term sequences prior to attention. References ---------- .. [1] Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., Kaiser, L., & Polosukhin, I. (2017). "Attention is all you need." In *Advances in Neural Information Processing Systems* (pp. 5998-6008). """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, units: int, num_heads: int): super().__init__() self.units = units # Dense layers for short/long sequences self.short_term_dense = Dense(units) self.long_term_dense = Dense(units) # Multi-head attention for short/long self.short_term_attention = MultiHeadAttention( num_heads=num_heads, key_dim=units ) self.long_term_attention = MultiHeadAttention( num_heads=num_heads, key_dim=units )
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass of the HierarchicalAttention. Parameters ---------- ``inputs`` : list of tf.Tensor A list `[short_term, long_term]`. Each tensor should have shape :math:`(B, T, D)`. training : bool, optional Indicates whether the layer is in training mode. Defaults to ``False``. Returns ------- tf.Tensor A tensor of shape :math:`(B, T, U)`, where `U = units`, representing the combined attention outputs. """ short_term, long_term = inputs # Linear projections to unify # dimensionality short_term = self.short_term_dense( short_term ) long_term = self.long_term_dense( long_term ) # Multi-head attention on short_term short_term_attention = ( self.short_term_attention( short_term, short_term ) ) # Multi-head attention on long_term long_term_attention = ( self.long_term_attention( long_term, long_term ) ) # Combine return short_term_attention + long_term_attention
[docs] def get_config(self): r""" Returns a dictionary of config parameters for serialization. Returns ------- dict Dictionary with 'units', 'short_term_dense' config, and 'long_term_dense' config. """ config = super().get_config().copy() config.update({ 'units': self.units, 'short_term_dense': self.short_term_dense.get_config(), 'long_term_dense': self.long_term_dense.get_config() }) return config
[docs] @classmethod def from_config(cls, config): r""" Recreates the HierarchicalAttention layer from a config dictionary. Parameters ---------- ``config`` : dict Configuration dictionary. Returns ------- HierarchicalAttention A new instance with the specified configuration. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="CrossAttention" ) class CrossAttention(Layer, NNLearner): r""" CrossAttention layer that attends one source sequence to another [1]_. This layer transforms two input sources, ``source1`` and ``source2``, into a shared dimensionality via separate dense layers, then applies multi-head attention using ``source1`` as the query and ``source2`` as both key and value. The output shape depends on the specified ``units``. .. math:: \mathbf{H}_{\text{out}} = \text{MHA}( \mathbf{W}_{1}\,\mathbf{S}_1,\, \mathbf{W}_{2}\,\mathbf{S}_2,\, \mathbf{W}_{2}\,\mathbf{S}_2 ) where :math:`\mathbf{S}_1` and :math:`\mathbf{S}_2` are the two source sequences. Parameters ---------- units : int Dimensionality for the internal projections of the query/key/value in multi-head attention. num_heads : int Number of attention heads. Notes ----- Cross attention is particularly useful when focusing on how one sequence (the query) relates to another (the key/value). For example, in multi-modal time series settings, one might attend dynamic covariates to static ones or vice versa. Methods ------- call(`inputs`, training=False) Forward pass of the cross-attention layer. get_config() Returns the configuration dictionary for serialization. from_config(`config`) Creates a new layer from the given config. Examples -------- >>> from fusionlab.nn.components import CrossAttention >>> import tensorflow as tf >>> # Two sequences of shape (batch_size, time_steps, features) >>> source1 = tf.random.normal((32, 10, 64)) >>> source2 = tf.random.normal((32, 10, 64)) >>> # Instantiate the CrossAttention layer >>> cross_attn = CrossAttention(units=64, num_heads=4) >>> # Forward pass >>> outputs = cross_attn([source1, source2]) See Also -------- HierarchicalAttention Another attention-based layer focusing on short/long-term sequences. MemoryAugmentedAttention Uses a learned memory matrix to enhance representations. References ---------- .. [1] Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., Kaiser, L., & Polosukhin, I. (2017). "Attention is all you need." In *Advances in Neural Information Processing Systems* (pp. 5998-6008). """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, units: int, num_heads: int): r""" Initialize the CrossAttention layer. Parameters ---------- units : int Number of output units for the internal Dense projections and multi-head attention dimension. num_heads : int Number of attention heads to use in the multi-head attention module. """ super().__init__() self.units = units # Dense layers to project each source self.source1_dense = Dense(units) self.source2_dense = Dense(units) # Multi-head attention self.cross_attention = MultiHeadAttention( num_heads=num_heads, key_dim=units )
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass of CrossAttention. Parameters ---------- ``inputs`` : list of tf.Tensor A list [source1, source2], each of shape (batch_size, time_steps, features). training : bool, optional Indicates if the layer is in training mode (for dropout, if any). Defaults to ``False``. Returns ------- tf.Tensor A tensor of shape (batch_size, time_steps, units) representing cross-attended features. """ source1, source2 = inputs # Project each source source1 = self.source1_dense(source1) source2 = self.source2_dense(source2) # Apply cross attention return self.cross_attention( query=source1, value=source2, key=source2 )
[docs] def get_config(self): r""" Returns configuration dictionary for this layer. Returns ------- dict Configuration dictionary, including 'units'. """ config = super().get_config().copy() config.update({'units': self.units}) return config
[docs] @classmethod def from_config(cls, config): r""" Create a new CrossAttention layer from the given config dictionary. Parameters ---------- ``config`` : dict Configuration as returned by ``get_config``. Returns ------- CrossAttention A new instance of CrossAttention. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="MemoryAugmentedAttention" ) class MemoryAugmentedAttention(Layer, NNLearner): r""" Memory-Augmented Attention layer that uses a learned memory matrix to enhance temporal representation [1]_. This layer maintains a trainable memory of shape :math:`(\text{memory_size}, \text{units})` and attends over it with the input serving as the query. The resulting context is added back to the input as a residual connection, giving a memory-augmented feature. .. math:: \mathbf{Z} = \mathbf{X} + \text{MHA}(\mathbf{X}, \mathbf{M}, \mathbf{M}) where :math:`\mathbf{M}` is the learned memory. Parameters ---------- units : int Dimensionality for the memory and the multi-head attention projections. memory_size : int Number of slots in the learned memory matrix. num_heads : int Number of attention heads in the multi-head attention. Notes ----- The learned memory is a trainable parameter of shape (memory_size, units). It is expanded at each forward pass to match the batch size. Methods ------- call(`inputs`, training=False) Forward pass of the memory-augmented attention layer. get_config() Returns the configuration for serialization. from_config(`config`) Instantiates the layer from the given config dictionary. Examples -------- >>> from fusionlab.nn.components import MemoryAugmentedAttention >>> import tensorflow as tf >>> # Suppose we have an input of shape (batch_size, time_steps, units) >>> x = tf.random.normal((32, 10, 64)) >>> # Instantiate with a memory size of 20 >>> maa = MemoryAugmentedAttention( ... units=64, ... memory_size=20, ... num_heads=4 ... ) >>> # Forward pass >>> outputs = maa(x) See Also -------- CrossAttention Another specialized attention mechanism focusing on cross-sequence interactions. HierarchicalAttention Combines short/long-term sequences with attention. References ---------- .. [1] Graves, A., Wayne, G., & Danihelka, I. (2014). Neural Turing Machines. *arXiv preprint arXiv:1410.5401*. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, units: int, memory_size: int, num_heads: int ): super().__init__() self.units = units self.memory_size = memory_size self.attention = MultiHeadAttention( num_heads=num_heads, key_dim=units )
[docs] def build(self, input_shape): r""" Build method that creates the trainable memory matrix of shape (memory_size, units). Parameters ---------- input_shape : tuple Shape of the input, e.g. (batch_size, time_steps, units). """ self.memory = self.add_weight( "memory", shape=(self.memory_size, self.units), initializer="zeros", trainable=True ) super().build(input_shape)
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass of MemoryAugmentedAttention. Parameters ---------- ``inputs`` : tf.Tensor A 3D tensor of shape (batch_size, time_steps, units). training : bool, optional Indicates whether the layer is in training mode. Defaults to ``False``. Returns ------- tf.Tensor A tensor of the same shape as inputs: (batch_size, time_steps, units), augmented by the learned memory. """ # Expand memory to match batch dimension batch_size = tf_shape(inputs)[0] memory_expanded = tf_expand_dims(self.memory, axis=0) memory_expanded = tf_tile( memory_expanded, [batch_size, 1, 1] ) # Attend memory with inputs as query memory_attended = self.attention( query=inputs, value=memory_expanded, key=memory_expanded ) # Residual connection return memory_attended + inputs
[docs] def get_config(self): r""" Returns configuration of this layer. Returns ------- dict Dictionary including 'units' and 'memory_size'. """ config = super().get_config().copy() config.update({ 'units': self.units, 'memory_size': self.memory_size }) return config
[docs] @classmethod def from_config(cls, config): r""" Creates a new instance from a given config dictionary. Parameters ---------- ``config`` : dict Configuration dictionary as returned by ``get_config``. Returns ------- MemoryAugmentedAttention A new instance of this layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="AdaptiveQuantileLoss" ) class AdaptiveQuantileLoss(Loss, NNLearner): r""" Adaptive Quantile Loss layer that computes quantile loss for given quantiles [1]_. The layer expects ``y_true`` of shape :math:`(B, H, O)`, where ``B`` is batch size, ``H`` is horizon, and ``O`` is output dimension, and ``y_pred`` of shape :math:`(B, H, Q, O)`, where ``Q`` is the number of quantiles if they are specified. .. math:: \text{QuantileLoss}(\hat{y}, y) = \max(q \cdot (y - \hat{y}),\, (q - 1) \cdot (y - \hat{y})) The final loss is the mean across batch, time, quantiles, and output dimension. Parameters ---------- quantiles : list of float, optional A list of quantiles used to compute quantile loss. If set to ``'auto'``, defaults to [0.1, 0.5, 0.9]. If ``None``, the loss returns 0.0 (no quantile loss). Notes ----- For quantile regression, each quantile penalizes under- and over-estimates differently, encouraging a robust modeling of the distribution of possible outcomes. Methods ------- call(`y_true`, `y_pred`, training=False) Compute the quantile loss. get_config() Returns configuration for serialization. from_config(`config`) Creates a new instance from config dict. Examples -------- >>> from fusionlab.nn.components import AdaptiveQuantileLoss >>> import tensorflow as tf >>> # Suppose y_true is (B, H, O) ... # y_pred is (B, H, Q, O) >>> y_true = tf.random.normal((32, 10, 1)) >>> y_pred = tf.random.normal((32, 10, 3, 1)) >>> # Instantiate with custom quantiles >>> aq_loss = AdaptiveQuantileLoss([0.2, 0.5, 0.8]) >>> # Forward pass (loss calculation) >>> loss_value = aq_loss(y_true, y_pred) See Also -------- MultiObjectiveLoss Can combine this quantile loss with an anomaly loss. AnomalyLoss Computes anomaly-based loss, complementary to quantile loss. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, quantiles: Optional[List[float]], name="AdaptiveQuantileLoss"): super().__init__(name=name) if quantiles == 'auto': quantiles = [0.1, 0.5, 0.9] self.quantiles = quantiles
[docs] @tf_autograph.experimental.do_not_convert def call(self, y_true, y_pred): r""" Compute quantile loss. Parameters ---------- ``y_true`` : tf.Tensor Ground truth of shape (B, H, O). ``y_pred`` : tf.Tensor Predicted values of shape (B, H, Q, O) if quantiles is not None. training : bool, optional Unused parameter, included for consistency. Defaults to ``False``. Returns ------- tf.Tensor A scalar representing the mean quantile loss. 0.0 if ``quantiles`` is None. """ if self.quantiles is None: return 0.0 # Expand y_true to match y_pred's quantile # dimension y_true_expanded = tf_expand_dims( y_true, axis=2 ) # => (B, H, 1, O) error = y_true_expanded - y_pred quantiles = tf_constant( self.quantiles, dtype=tf_float32 ) quantiles = tf_reshape( quantiles, [1, 1, len(self.quantiles), 1] ) # quantile loss quantile_loss = tf_maximum( quantiles * error, (quantiles - 1) * error ) return tf_reduce_mean(quantile_loss)
[docs] def get_config(self): r""" Configuration for serialization. Returns ------- dict Dictionary with 'quantiles'. """ config = super().get_config().copy() config.update({'quantiles': self.quantiles}) return config
[docs] @classmethod def from_config(cls, config): r""" Creates a new instance from a config dict. Parameters ---------- ``config`` : dict Configuration dictionary. Returns ------- AdaptiveQuantileLoss A new instance of the layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="AnomalyLoss" ) class AnomalyLoss(Loss, NNLearner): r""" Anomaly Loss layer computing mean squared anomaly scores. This layer expects anomaly scores of shape :math:`(B, H, D)` and multiplies their mean squared value by a weight factor. .. math:: \text{AnomalyLoss}(\mathbf{a}) = w \cdot \frac{1}{BHD} \sum (\mathbf{a})^2 where :math:`\mathbf{a}` is the anomaly score, and :math:`w` is the weight. Parameters ---------- weight : float, optional Scalar multiplier for the computed mean squared anomaly scores. Defaults to 1.0. Notes ----- Anomaly loss is often combined with other losses in a multi-task setting where predictive performance and anomaly detection performance are both important. Methods ------- call(`anomaly_scores`) Compute mean squared anomaly loss. get_config() Return configuration for serialization. from_config(`config`) Instantiates a new instance from config. Examples -------- >>> from fusionlab.nn.components import AnomalyLoss >>> import tensorflow as tf >>> # Suppose anomaly_scores is (B, H, D) >>> anomaly_scores = tf.random.normal((32, 10, 8)) >>> # Instantiate anomaly loss >>> anomaly_loss_fn = AnomalyLoss(weight=2.0) >>> # Compute anomaly loss >>> loss_value = anomaly_loss_fn(anomaly_scores) See Also -------- AdaptiveQuantileLoss Another specialized loss that can be combined for multi-objective optimization. MultiObjectiveLoss Demonstrates how anomaly and quantile loss can be merged. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, weight: float = 1.0, name="AnomalyLoss"): super().__init__(name=name) self.weight = weight
[docs] @tf_autograph.experimental.do_not_convert def call(self, anomaly_scores: Tensor, y_pred=None): r""" Forward pass that computes the mean squared anomaly score multiplied by `weight`. Parameters ---------- ``anomaly_scores`` : tf.Tensor Tensor of shape (B, H, D) representing anomaly scores. ``y_pred``: Optional Does nothing, just for API consistency. Returns ------- tf.Tensor A scalar loss value representing the weighted mean squared anomaly. """ return self.weight * tf_reduce_mean( tf_square(anomaly_scores) )
[docs] def get_config(self): r""" Return configuration dictionary for this layer. Returns ------- dict Includes 'weight'. """ config = super().get_config().copy() config.update({'weight': self.weight}) return config
[docs] @classmethod def from_config(cls, config): r""" Recreates an AnomalyLoss layer from a config. Parameters ---------- ``config`` : dict Configuration containing 'weight'. Returns ------- AnomalyLoss A new instance of this layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="MultiObjectiveLoss" ) class MultiObjectiveLoss(Loss, NNLearner): r""" Multi-Objective Loss layer combining quantile loss and anomaly loss [1]_. This layer expects: 1. ``y_true``: :math:`(B, H, O)` 2. ``y_pred``: :math:`(B, H, Q, O)`, if quantiles are used (or (B, H, 1, O) for a single quantile). 3. ``anomaly_scores``: :math:`(B, H, D)`, optional. .. math:: \text{Loss} = \text{QuantileLoss} + \text{AnomalyLoss} If ``anomaly_scores`` is None, only quantile loss is returned. Parameters ---------- quantile_loss_fn : Layer A callable implementing quantile loss, e.g. :class:`AdaptiveQuantileLoss`. anomaly_loss_fn : Layer A callable implementing anomaly loss, e.g. :class:`AnomalyLoss`. Notes ----- This layer allows multi-task learning by combining two objectives: forecasting accuracy (quantile loss) and anomaly detection (anomaly loss). Methods ------- call(`y_true`, `y_pred`, `anomaly_scores`=None, training=False) Compute the combined loss. get_config() Returns configuration for serialization. from_config(`config`) Rebuilds the layer from a config dict. Examples -------- >>> from fusionlab.nn.components import ( ... MultiObjectiveLoss, ... AdaptiveQuantileLoss, ... AnomalyLoss ... ) >>> import tensorflow as tf >>> # Suppose y_true is (B, H, O), ... # and y_pred is (B, H, Q, O). >>> y_true = tf.random.normal((32, 10, 1)) >>> y_pred = tf.random.normal((32, 10, 3, 1)) >>> anomaly_scores = tf.random.normal((32, 10, 8)) >>> # Instantiate loss components >>> q_loss_fn = AdaptiveQuantileLoss([0.2, 0.5, 0.8]) >>> a_loss_fn = AnomalyLoss(weight=2.0) >>> # Combine them >>> mo_loss = MultiObjectiveLoss(q_loss_fn, a_loss_fn) >>> # Compute the combined loss >>> total_loss = mo_loss(y_true, y_pred, anomaly_scores) See Also -------- AdaptiveQuantileLoss Implements quantile loss to handle uncertainty in predictions. AnomalyLoss Computes anomaly-based MSE for anomaly detection tasks. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, quantile_loss_fn, anomaly_loss_fn, anomaly_scores =None, name="MultiObjectiveLoss" ): super().__init__(name=name) self.quantile_loss_fn = quantile_loss_fn self.anomaly_loss_fn = anomaly_loss_fn self.anomaly_scores = anomaly_scores
[docs] @tf_autograph.experimental.do_not_convert def call(self, y_true, y_pred): r""" Compute combined quantile and anomaly loss. Parameters ---------- y_true : tf.Tensor Ground truth of shape (B, H, O). y_pred : tf.Tensor Predictions of shape (B, H, Q, O) (or (B, H, 1, O) if Q=1). anomaly_scores : tf.Tensor or None, optional Tensor of shape (B, H, D). If None, anomaly loss is omitted. training : bool, optional Indicates training mode. Defaults to ``False``. Returns ------- tf.Tensor A scalar representing the sum of quantile loss and anomaly loss (if anomaly_scores is provided). """ quantile_loss = self.quantile_loss_fn( y_true, y_pred ) if self.anomaly_scores is not None: anomaly_loss = self.anomaly_loss_fn( # Avoid Keras signature to raise error then # pass y_pred as None, self.anomaly_scores, y_pred =None, ) return quantile_loss + anomaly_loss return quantile_loss
[docs] def get_config(self): r""" Returns configuration dictionary, including configs of the sub-layers. Returns ------- dict Contains serialized configs of quantile_loss_fn and anomaly_loss_fn. """ config = super().get_config().copy() config.update({ 'quantile_loss_fn': self.quantile_loss_fn.get_config(), 'anomaly_loss_fn': self.anomaly_loss_fn.get_config() }) return config
[docs] @classmethod def from_config(cls, config): r""" Creates a new MultiObjectiveLoss from the config dictionary. Parameters ---------- ``config`` : dict Configuration dictionary with sub-layer configs. Returns ------- MultiObjectiveLoss A new instance combining quantile and anomaly losses. """ # Rebuild sub-layers from their configs quantile_loss_fn = AdaptiveQuantileLoss.from_config( config['quantile_loss_fn'] ) anomaly_loss_fn = AnomalyLoss.from_config( config['anomaly_loss_fn'] ) return cls( quantile_loss_fn=quantile_loss_fn, anomaly_loss_fn=anomaly_loss_fn )
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="ExplainableAttention" ) class ExplainableAttention(Layer, NNLearner): r""" ExplainableAttention layer that returns attention scores from multi-head attention [1]_. This layer is useful for interpretability, providing insight into how the attention mechanism focuses on different time steps. .. math:: \mathbf{A} = \text{MHA}(\mathbf{X},\,\mathbf{X}) \rightarrow \text{attention\_scores} Here, :math:`\mathbf{X}` is an input tensor, and ``attention_scores`` is the matrix capturing attention weights. Parameters ---------- num_heads : int Number of heads for multi-head attention. key_dim : int Dimensionality of the query/key projections. Notes ----- Unlike standard layers that return the transformation output, this layer specifically returns the attention score matrix for interpretability. Methods ------- call(`inputs`, training=False) Forward pass that outputs only the attention scores. get_config() Returns the configuration for serialization. from_config(`config`) Creates a new instance from the given config. Examples -------- >>> from fusionlab.nn.components import ExplainableAttention >>> import tensorflow as tf >>> # Suppose we have input of shape (batch_size, time_steps, features) >>> x = tf.random.normal((32, 10, 64)) >>> # Instantiate explainable attention >>> ea = ExplainableAttention(num_heads=4, key_dim=64) >>> # Forward pass returns attention scores: (B, num_heads, T, T) >>> scores = ea(x) See Also -------- CrossAttention Another attention variant for cross-sequence contexts. MultiResolutionAttentionFusion For fusing features via multi-head attention. References ---------- .. [1] Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., Kaiser, L., & Polosukhin, I. (2017). "Attention is all you need." In *Advances in Neural Information Processing Systems* (pp. 5998-6008). """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, num_heads: int, key_dim: int): r""" Initialize the ExplainableAttention layer. Parameters ---------- num_heads : int Number of attention heads. key_dim : int Dimensionality of query/key projections in multi-head attention. """ super().__init__() self.num_heads = num_heads self.key_dim = key_dim # MultiHeadAttention, focusing on returning # the attention scores self.attention = MultiHeadAttention( num_heads=num_heads, key_dim=key_dim )
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass that returns only the attention scores. Parameters ---------- ``inputs`` : tf.Tensor Tensor of shape (B, T, D). training : bool, optional Indicates training mode; not used in this layer. Defaults to ``False``. Returns ------- tf.Tensor Attention scores of shape (B, num_heads, T, T). """ _, attention_scores = self.attention( inputs, inputs, return_attention_scores=True ) return attention_scores
[docs] def get_config(self): r""" Returns the layer configuration. Returns ------- dict Dictionary containing 'num_heads' and 'key_dim'. """ config = super().get_config().copy() config.update({ 'num_heads': self.num_heads, 'key_dim': self.key_dim }) return config
[docs] @classmethod def from_config(cls, config): r""" Creates a new instance from the config dictionary. Parameters ---------- ``config`` : dict Configuration dictionary. Returns ------- ExplainableAttention A new instance of this layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="MultiDecoder" ) class MultiDecoder(Layer, NNLearner): r""" MultiDecoder for multi-horizon forecasting [1]_. This layer takes a single feature vector per example of shape :math:`(B, F)` and produces a separate output for each horizon step, resulting in :math:`(B, H, O)`. .. math:: \mathbf{Y}_h = \text{Dense}_h(\mathbf{x}),\, h \in [1..H] Each horizon has its own decoder layer. Parameters ---------- output_dim : int Number of output features for each horizon. num_horizons : int Number of forecast horizons. Notes ----- This layer is particularly useful when you want separate parameters for each horizon, instead of a single shared head. Methods ------- call(`x`, training=False) Forward pass that produces horizon-specific outputs. get_config() Returns configuration for serialization. from_config(`config`) Builds a new instance from config. Examples -------- >>> from fusionlab.nn.components import MultiDecoder >>> import tensorflow as tf >>> # Input of shape (batch_size, feature_dim) >>> x = tf.random.normal((32, 128)) >>> # Instantiate multi-horizon decoder >>> decoder = MultiDecoder(output_dim=1, num_horizons=3) >>> # Output shape => (32, 3, 1) >>> y = decoder(x) See Also -------- MultiModalEmbedding Provides feature embeddings that can be fed into MultiDecoder. QuantileDistributionModeling Projects deterministic outputs into multiple quantiles per horizon. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, output_dim: int, num_horizons: int): r""" Initialize the MultiDecoder. Parameters ---------- output_dim : int Number of features each horizon decoder should output. num_horizons : int Number of horizons to predict, each with its own Dense layer. """ super().__init__() self.output_dim = output_dim self.num_horizons = num_horizons # Create a Dense decoder for each horizon self.decoders = [ Dense(output_dim) for _ in range(num_horizons) ]
[docs] @tf_autograph.experimental.do_not_convert def call(self, x, training=False): r""" Forward pass: each horizon has a separate Dense layer. Parameters ---------- ``x`` : tf.Tensor A 2D tensor (B, F). training : bool, optional Unused in this layer. Defaults to ``False``. Returns ------- tf.Tensor A 3D tensor of shape (B, H, O). """ outputs = [ decoder(x) for decoder in self.decoders ] return tf_stack(outputs, axis=1)
[docs] def get_config(self): r""" Returns layer configuration for serialization. Returns ------- dict Dictionary containing 'output_dim' and 'num_horizons'. """ config = super().get_config().copy() config.update({ 'output_dim': self.output_dim, 'num_horizons': self.num_horizons }) return config
[docs] @classmethod def from_config(cls, config): r""" Create a new MultiDecoder from the config. Parameters ---------- ``config`` : dict Contains 'output_dim', 'num_horizons'. Returns ------- MultiDecoder A new instance. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="MultiResolutionAttentionFusion" ) class MultiResolutionAttentionFusion(Layer, NNLearner): r""" MultiResolutionAttentionFusion layer applying multi-head attention fusion over features [1]_. This layer merges or fuses features at different resolutions or sources via multi-head attention. The input is projected to shape `(B, T, D)`, and the output shares the same shape. .. math:: \mathbf{Z} = \text{MHA}(\mathbf{X}, \mathbf{X}) Parameters ---------- units : int Dimension of the key, query, and value projections. num_heads : int Number of attention heads. Notes ----- Typically used in multi-resolution contexts where time steps or multiple feature sets are merged. Methods ------- call(`inputs`, training=False) Forward pass of the multi-head attention layer. get_config() Returns config for serialization. from_config(`config`) Reconstructs the layer from a config. Examples -------- >>> from fusionlab.nn.components import MultiResolutionAttentionFusion >>> import tensorflow as tf >>> x = tf.random.normal((32, 10, 64)) >>> # Instantiate multi-resolution attention >>> mraf = MultiResolutionAttentionFusion( ... units=64, ... num_heads=4 ... ) >>> # Forward pass => (32, 10, 64) >>> y = mraf(x) See Also -------- HierarchicalAttention Combines short and long-term sequences with attention. ExplainableAttention Another attention layer returning attention scores. References ---------- .. [1] Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., Kaiser, L., & Polosukhin, I. (2017). "Attention is all you need." In *Advances in Neural Information Processing Systems* (pp. 5998-6008). """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, units: int, num_heads: int): r""" Initialize the MultiResolutionAttentionFusion layer. Parameters ---------- units : int Dimensionality for the attention projections. num_heads : int Number of heads for multi-head attention. """ super().__init__() self.units = units self.num_heads = num_heads # MultiHeadAttention instance self.attention = MultiHeadAttention( num_heads=num_heads, key_dim=units )
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass applying multi-head attention to fuse features. Parameters ---------- ``inputs`` : tf.Tensor Tensor of shape (B, T, D). training : bool, optional Indicates training mode. Defaults to ``False``. Returns ------- tf.Tensor Tensor of shape (B, T, D), representing fused features. """ return self.attention(inputs, inputs)
[docs] def get_config(self): r""" Returns configuration dictionary with 'units' and 'num_heads'. Returns ------- dict Configuration for serialization. """ config = super().get_config().copy() config.update({ 'units': self.units, 'num_heads': self.num_heads }) return config
[docs] @classmethod def from_config(cls, config): r""" Instantiate a new MultiResolutionAttentionFusion layer from config. Parameters ---------- ``config`` : dict Configuration dictionary. Returns ------- MultiResolutionAttentionFusion A new instance of this layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="DynamicTimeWindow" ) class DynamicTimeWindow(Layer, NNLearner): r""" DynamicTimeWindow layer that slices the last `max_window_size` steps from the input sequence. This helps in focusing on the most recent time steps if the sequence is longer than `max_window_size`. .. math:: \mathbf{Z} = \mathbf{X}[:, -W:, :] where `W` = `max_window_size`. Parameters ---------- max_window_size : int Number of time steps to keep from the end of the sequence. Notes ----- This can be used for models that only need the last few time steps instead of the entire sequence. Methods ------- call(`inputs`, training=False) Slice the last `max_window_size` steps. get_config() Returns configuration dictionary. from_config(`config`) Recreates the layer from config. Examples -------- >>> from fusionlab.nn.components import DynamicTimeWindow >>> import tensorflow as tf >>> x = tf.random.normal((32, 50, 64)) >>> # Keep last 10 time steps >>> dtw = DynamicTimeWindow(max_window_size=10) >>> y = dtw(x) >>> y.shape TensorShape([32, 10, 64]) See Also -------- MultiResolutionAttentionFusion Another layer that can be used after slicing to fuse temporal features. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__(self, max_window_size: int): r""" Initialize the DynamicTimeWindow layer. Parameters ---------- max_window_size : int Number of steps to slice from the end of the sequence. """ super().__init__() self.max_window_size = max_window_size
[docs] def call(self, inputs, training=False): r""" Forward pass that slices the last `max_window_size` steps. Parameters ---------- ``inputs`` : tf.Tensor Tensor of shape :math:`(B, T, D)`. training : bool, optional Unused. Defaults to ``False``. Returns ------- tf.Tensor A sliced tensor of shape :math:`(B, W, D)` where W = `max_window_size`. """ return inputs[:, -self.max_window_size:, :]
[docs] def get_config(self): r""" Returns configuration dictionary. Returns ------- dict Contains 'max_window_size'. """ config = super().get_config().copy() config.update({ 'max_window_size': self.max_window_size }) return config
[docs] @classmethod def from_config(cls, config): r""" Creates a new DynamicTimeWindow layer from config. Parameters ---------- ``config`` : dict Must include 'max_window_size'. Returns ------- DynamicTimeWindow A new instance of this layer. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name="QuantileDistributionModeling" ) class QuantileDistributionModeling(Layer, NNLearner): r""" QuantileDistributionModeling layer projects deterministic outputs into quantile predictions [1]_. Depending on whether `quantiles` is specified, this layer: - Returns (B, H, O) if `quantiles` is None. - Returns (B, H, Q, O) otherwise, where Q is the number of quantiles. .. math:: \mathbf{Y}_q = \text{Dense}_q(\mathbf{X}), \forall q \in \text{quantiles} Parameters ---------- quantiles : list of float or str or None List of quantiles. If `'auto'`, defaults to [0.1, 0.5, 0.9]. If ``None``, no extra quantile dimension is added. output_dim : int Output dimension per quantile or in the deterministic case. Notes ----- This layer is often used after a decoder to provide probabilistic forecasts via quantile outputs. Methods ------- call(`inputs`, training=False) Projects inputs into desired quantile shape. get_config() Returns configuration dictionary. from_config(`config`) Instantiates from config. Examples -------- >>> from fusionlab.nn.components import QuantileDistributionModeling >>> import tensorflow as tf >>> x = tf.random.normal((32, 10, 64)) # (B, H, O) >>> # Instantiate with quantiles >>> qdm = QuantileDistributionModeling([0.25, 0.5, 0.75], output_dim=1) >>> # Forward pass => (B, H, Q, O) => (32, 10, 3, 1) >>> y = qdm(x) See Also -------- MultiDecoder Outputs multi-horizon predictions that can be further turned into quantiles. AdaptiveQuantileLoss Computes quantile losses for outputs generated by this layer. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, quantiles: Optional[Union[str, List[float]]], output_dim: int ): r""" Initialize the QuantileDistributionModeling layer. Parameters ---------- quantiles : list of float or str or None If `'auto'`, defaults to [0.1, 0.5, 0.9]. If None, returns deterministic output. output_dim : int Output dimension for each quantile or the deterministic case. """ super().__init__() if quantiles == 'auto': quantiles = [0.1, 0.5, 0.9] self.quantiles = quantiles self.output_dim = output_dim # Create Dense layers if quantiles specified if self.quantiles is not None: self.output_layers = [ Dense(output_dim) for _ in self.quantiles ] else: self.output_layer = Dense(output_dim)
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass projecting to quantile outputs or deterministic outputs. Parameters ---------- ``inputs`` : tf.Tensor A 3D tensor of shape (B, H, O). training : bool, optional Unused in this layer. Defaults to ``False``. Returns ------- tf.Tensor - If `quantiles` is None: (B, H, O) - Else: (B, H, Q, O) """ # No quantiles => deterministic if self.quantiles is None: return self.output_layer(inputs) # Quantile predictions => (B, H, Q, O) outputs = [] for output_layer in self.output_layers: quantile_output = output_layer(inputs) outputs.append(quantile_output) return tf_stack(outputs, axis=2)
[docs] def get_config(self): r""" Configuration dictionary for layer serialization. Returns ------- dict Contains 'quantiles' and 'output_dim'. """ config = super().get_config().copy() config.update({ 'quantiles': self.quantiles, 'output_dim': self.output_dim }) return config
[docs] @classmethod def from_config(cls, config): r""" Creates a new instance from the given config dict. Parameters ---------- ``config`` : dict Configuration dictionary with 'quantiles' and 'output_dim'. Returns ------- QuantileDistributionModeling A new instance. """ return cls(**config)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name='MultiScaleLSTM' ) class MultiScaleLSTM(Layer, NNLearner): r""" MultiScaleLSTM layer applying multiple LSTMs at different sampling scales and concatenating their outputs [1]_. Each LSTM can either return the full sequence or only the last hidden state, controlled by `return_sequences`. The user specifies `scales` to sub-sample the time dimension. For example, a scale of 2 processes every 2nd time step. Parameters ---------- lstm_units : int Number of units in each LSTM. scales : list of int or str or None, optional List of scale factors. If `'auto'` or None, defaults to `[1]` (no sub-sampling). return_sequences : bool, optional If True, each LSTM returns the entire sequence. Otherwise, it returns only the last hidden state. Defaults to False. **kwargs Additional arguments passed to the parent Keras `Layer`. Notes ----- - If `return_sequences=False`, the output is concatenated along features: :math:`(B, \text{units} \times \text{num\_scales})`. - If `return_sequences=True`, a list of sequence outputs is returned. Each may have a different time dimension if scales differ. Methods ------- call(`inputs`, training=False) Forward pass, applying each LSTM at the specified scale. get_config() Returns the layer's configuration dict. from_config(`config`) Builds the layer from the config dict. Examples -------- >>> from fusionlab.nn.components import MultiScaleLSTM >>> import tensorflow as tf >>> x = tf.random.normal((32, 20, 16)) # (B, T, D) >>> # Instantiating a multi-scale LSTM >>> mslstm = MultiScaleLSTM(lstm_units=32, ... scales=[1, 2], return_sequences=False) >>> y = mslstm(x) # shape => (32, 64) >>> # because scale=1 and scale=2 each produce 32 units, ... # which are concatenated => 64 See Also -------- DynamicTimeWindow For slicing sequences before applying multi-scale LSTMs. TemporalFusionTransformer A complex model that can incorporate multi-scale modules. References ---------- .. [1] Lim, B., & Zohren, S. (2021). "Time-series forecasting with deep learning: a survey." *Philosophical Transactions of the Royal Society A*, 379(2194), 20200209. """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, lstm_units: int, scales: Union[str, List[int], None] = None, return_sequences: bool = False, **kwargs ): super().__init__(**kwargs) if scales is None or scales == 'auto': scales = [1] # Validate that scales is a list of int scales = validate_nested_param( scales, List[int], 'scales' ) self.lstm_units = lstm_units self.scales = scales self.return_sequences = return_sequences # Create an LSTM for each scale self.lstm_layers = [ LSTM( lstm_units, return_sequences=return_sequences ) for _ in scales ]
[docs] @tf_autograph.experimental.do_not_convert def call(self, inputs, training=False): r""" Forward pass that processes the input at multiple scales. Parameters ---------- ``inputs`` : tf.Tensor Shape (B, T, D). training : bool, optional Training mode. Defaults to ``False``. Returns ------- tf.Tensor or list of tf.Tensor - If `return_sequences=False`, returns a single 2D tensor of shape (B, lstm_units * len(scales)). - If `return_sequences=True`, returns a list of 3D tensors, each with shape (B, T', lstm_units), where T' depends on the scale sub-sampling. """ outputs = [] for scale, lstm in zip(self.scales, self.lstm_layers): scaled_input = inputs[:, ::scale, :] lstm_output = lstm( scaled_input, training=training ) outputs.append(lstm_output) # If return_sequences=False: # => (B, units) from each sub-lstm # -> concat => (B, units*len(scales)) if not self.return_sequences: return tf_concat(outputs, axis=-1) else: # return a list of sequences return outputs
[docs] def get_config(self): r""" Returns a config dictionary containing 'lstm_units', 'scales', and 'return_sequences'. Returns ------- dict Configuration dictionary. """ config = super().get_config().copy() config.update({ 'lstm_units': self.lstm_units, 'scales': self.scales, 'return_sequences': self.return_sequences }) return config
[docs] @classmethod def from_config(cls, config): r""" Builds MultiScaleLSTM from the given config dictionary. Parameters ---------- ``config`` : dict Must include 'lstm_units', 'scales', 'return_sequences'. Returns ------- MultiScaleLSTM A new instance of this layer. """ return cls(**config)
@register_keras_serializable( 'fusionlab.nn.components', name='CategoricalEmbeddingProcessor' ) class CategoricalEmbeddingProcessor(Layer, NNLearner): """Embeds multiple categorical features and concatenates them.""" def __init__( self, categorical_embedding_info: Dict[int, Tuple[int, int]], # Dict mapping feature index to (vocab_size, embedding_dim) **kwargs ): super().__init__(**kwargs) self.categorical_embedding_info = categorical_embedding_info self.embedding_layers = {} # Create Embedding layers based on info provided for index, (vocab_size, embed_dim) in \ self.categorical_embedding_info.items(): self.embedding_layers[index] = Embedding( input_dim=vocab_size, output_dim=embed_dim, name=f"cat_embed_idx_{index}" ) def call(self, inputs): """Applies embedding to specified indices and concatenates.""" ## XXX TODO # Input shape: (Batch, [TimeSteps,] NumCatFeatures) # Note: Assumes input `inputs` ONLY contains the categorical features # in the correct order corresponding to the indices in the info dict. # A more robust version might take the full feature tensor and indices. embeddings = [] # Check rank to handle static (2D) vs dynamic/future (3D) input_rank = len(inputs.shape) num_cat_features_provided = inputs.shape[-1] # Validate number of features matches expected keys if num_cat_features_provided != len(self.categorical_embedding_info): raise ValueError( f"Number of input categorical features ({num_cat_features_provided})" f" does not match number of embedding layers" f" ({len(self.categorical_embedding_info)})." ) feature_index_counter = 0 for original_index in sorted(self.embedding_layers.keys()): # Assume the input tensor columns are ordered corresponding # to the sorted original indices. if input_rank == 2: # Static: (Batch, NumCatFeatures) feature_tensor = inputs[:, feature_index_counter] elif input_rank == 3: # Dynamic/Future: (Batch, Time, NumCatFeatures) feature_tensor = inputs[:, :, feature_index_counter] else: raise ValueError(f"Unsupported input rank: {input_rank}") # Apply embedding layer corresponding to the original index embed_layer = self.embedding_layers[original_index] embeddings.append(embed_layer(feature_tensor)) feature_index_counter += 1 # Concatenate embeddings along the last dimension if not embeddings: return None # Or handle appropriately if no categoricals return tf_concat(embeddings, axis=-1) def get_config(self): config = super().get_config() config.update({ "categorical_embedding_info": self.categorical_embedding_info, }) return config @classmethod def from_config(cls, config): # Keras serialization handles nested layers like Embedding return cls(**config) # -----functions --------------------------------------------------------------
[docs] @register_keras_serializable( 'fusionlab.nn.components', name='aggregate_multiscale' ) def aggregate_multiscale(lstm_output, mode="auto"): r"""Aggregate multi-scale LSTM outputs using specified temporal fusion strategy. This function implements multiple strategies for combining outputs from multi-scale LSTMs operating at different temporal resolutions. Supports six aggregation modes: ``average``, ``sum``, ``flatten``, ``concat``, ``last`` (default fallback), and ``auto``[1]_. Designed for compatibility with ``MultiScaleLSTM`` layer outputs. See more in :ref:`User Guide <user_guide>`. Parameters ---------- lstm_output : list of tf.Tensor or tf.Tensor Input features from multi-scale processing: - List of 3D tensors [(B, T', U), ...] when ``mode`` != 'auto' - Single 2D tensor (B, U*S) when ``mode=None`` where: B = Batch size T' = Variable time dimension (scale-dependent) U = LSTM units per scale S = Number of scales (len(scales)) mode : {'auto', 'sum', 'average', 'flatten', 'concat', 'last'}, optional Aggregation strategy: - ``auto`` : (Default) Concatenate last timesteps from each scale - ``sum`` : Temporal summation per scale + feature concatenation - ``average`` : Temporal mean per scale + feature concatenation - ``flatten`` : Flatten all time-feature dimensions (requires equal T') - ``concat`` : Feature concatenation + last global timestep - ``last`` : Alias for ``auto`` (backward compatibility) Returns ------- tf.Tensor Aggregated features with shape: - (B, U*S) for modes: ``average``, ``sum``, ``last`` - (B, T'*U*S) for ``flatten`` mode - (B, U*S) for ``concat`` mode (last timestep only) - (B, U*S) for ``auto`` mode In sum: - (B, U*S) for ``auto``/``last``, ``sum``, ``average``, ``concat`` - (B, T'*U*S) for ``flatten`` mode. Notes ----- * Mode Comparison Table: +------------+---------------------+---------------------+-------------------+ | Mode | Temporal Handling | Requirements | Typical Use Case | +============+=====================+=====================+===================+ | ``auto`` | Last step per scale | None | Default choice | | (last) | | | for variable T' | +------------+---------------------+---------------------+-------------------+ | ``sum`` | Full sequence sum | None | Emphasize temporal| | | per scale | | accumulation | +------------+---------------------+---------------------+-------------------+ | ``average``| Full sequence mean | None | Smooth temporal | | | per scale | | patterns | +------------+---------------------+---------------------+-------------------+ | ``flatten``| Preserve all time | Equal T' across | Fixed-length | | | steps | scales | sequence models | +------------+---------------------+---------------------+-------------------+ | ``concat`` | Last global step | Equal T' across | Specialized | | | of concatenated | scales | architectures | | | features | | with aligned T' | +------------+---------------------+---------------------+-------------------+ Mathematical Formulation: For S scales with outputs :math:`\{\mathbf{X}_s \in \mathbb{R}^{B \times T'_s \times U}\}_{s=1}^S`: .. math:: \text{auto} &: \bigoplus_{s=1}^S \mathbf{X}_s^{(:, T'_s, :)} \quad \text{(Last step concatenation)} \text{sum} &: \bigoplus_{s=1}^S \sum_{t=1}^{T'_s} \mathbf{X}_s^{(:, t, :)} \text{average} &: \bigoplus_{s=1}^S \frac{1}{T'_s} \sum_{t=1}^{T'_s} \mathbf{X}_s^{(:, t, :)} \text{flatten} &: \text{vec}\left( \bigoplus_{s=1}^S \mathbf{X}_s \right) \text{concat} &: \left( \bigoplus_{s=1}^S \mathbf{X}_s \right)^{(:, T', :)} where :math:`\bigoplus` = feature concatenation, :math:`\text{vec}` = flatten. * Critical differences between key modes ``'concat'`` and ``'last'``: +------------------+---------------------+-----------------------+ | Aspect | ``concat`` | ``last`` (default) | +==================+=====================+=======================+ | Time alignment | Requires equal T' | Handles variable T' | +------------------+---------------------+-----------------------+ | Feature mixing | Cross-scale mixing | Scale-independent | +------------------+---------------------+-----------------------+ | Scale validity | Only valid when | Robust to arbitrary | | | scales=[1,1,...] | scale configurations | +------------------+---------------------+-----------------------+ Examples -------- >>> from fusionlab.nn.components import aggregate_multiscale >>> import tensorflow as tf # Three scales with different time dimensions >>> outputs = [ ... tf.random.normal((32, 10, 64)), # Scale 1: T'=10 ... tf.random.normal((32, 5, 64)), # Scale 2: T'=5 ... tf.random.normal((32, 2, 64)) # Scale 3: T'=2 ... ] # Default auto mode (last timesteps) >>> agg_auto = aggregate_multiscale(outputs, mode='auto') >>> agg_auto.shape (32, 192) # 64 units * 3 scales # Last timestep aggregation (default) >>> agg_last = aggregate_multiscale(outputs, mode='last') >>> print(agg_last.shape) (32, 192) # Flatten mode (requires manual padding for equal T') >>> padded_outputs = [tf.pad(o, [[0,0],[0,3],[0,0]]) for o in outputs[:2]] >>> padded_outputs.append(outputs[2]) >>> agg_flat = aggregate_multiscale(padded_outputs, mode='flatten') >>> agg_flat.shape (32, 1280) # (10+3)*64*3 = 13*192 = 2496? Wait need to check dimensions See Also -------- MultiScaleLSTM : Base layer producing multi-scale LSTM outputs TemporalFusionTransformer : Advanced temporal fusion architecture HierarchicalAttention : Alternative temporal aggregation approach References ---------- .. [1] Lim, B., & Zohren, S. (2021). Time-series forecasting with deep learning: a survey. Philosophical Transactions of the Royal Society A, 379(2194), 20200209. https://doi.org/10.1098/rsta.2020.0209 """ # "auto", use the last LastStep-First Approach if mode is None: # No additional aggregation needed lstm_features = lstm_output # (B, units * len(scales)) # Apply chosen aggregation to full sequences elif mode == "average": # Average over time dimension for each scale and then concatenate averaged_outputs = [ tf_reduce_mean(o, axis=1) for o in lstm_output ] # Each is (B, units) lstm_features = tf_concat( averaged_outputs, axis=-1 ) # (B, units * len(scales)) elif mode== "flatten": # Flatten time and feature dimensions for all scales # Assume equal time lengths for all scales concatenated = tf_concat( lstm_output, axis=-1 ) # (B, T', units*len(scales)) shape = tf_shape(concatenated) (batch_size, time_dim, feat_dim) = shape[0], shape[1], shape[2] lstm_features = tf_reshape( concatenated, [batch_size, time_dim * feat_dim] ) elif mode =='sum': # Sum over time dimension for each scale and concatenate summed_outputs = [ tf_reduce_sum(o, axis=1) for o in lstm_output ] lstm_features = tf_concat( summed_outputs, axis=-1) elif mode=="concat": # Concatenate along the feature dimension for each # time step and take the last time step concatenated = tf_concat( lstm_output, axis=-1) # (B, T', units * len(scales)) last_output = concatenated[:, -1, :] # (B, units * len(scales)) lstm_features = last_output else: # "last" or "auto" # Default fallback: take the last time step from each scale # and concatenate last_outputs = [ o[:, -1, :] for o in lstm_output ] # (B, units) lstm_features = tf_concat( last_outputs, axis=-1 ) # (B, units * len(scales)) return lstm_features
[docs] def aggregate_multiscale_on_3d( lstm_output: Union[Tensor, List[Tensor]], mode: str = "auto" ) -> Tensor: r"""Aggregate multi-scale LSTM outputs using a specified strategy. This function combines outputs from `MultiScaleLSTM`. It is designed to either produce a single 3D sequence tensor (for attention mechanisms) or a single 2D context vector (by collapsing the time dimension). Parameters ---------- lstm_output : list of tf.Tensor or tf.Tensor The output from `MultiScaleLSTM`. - If a list: Expected to be from an LSTM with `return_sequences=True`. Each element is a 3D tensor `(B, T_scale, U)` where `T_scale` can vary. - If a single tensor: Assumed to be from an LSTM with `return_sequences=False`, shape `(B, U * num_scales)`. In this case, it's returned as is. mode : {'auto', 'sum', 'average', 'flatten', 'concat', 'last'}, optional Aggregation strategy: - **'concat'**: (For 3D output) Pads sequences to the max length and concatenates along the feature axis. This is the primary mode for creating a rich sequence representation for downstream attention layers. Result shape: `(B, T_max, U*S)`. - **'last'** or **'auto'**: (For 2D output) Takes the last time step from each sequence in the list and concatenates them. Result shape: `(B, U*S)`. - **'average'**: (For 2D output) Averages each sequence over its time dimension and concatenates the results. - **'sum'**: (For 2D output) Sums each sequence over its time dimension and concatenates the results. - **'flatten'**: (For 2D output) Concatenates and flattens all dimensions except the batch. Requires sequences to have the same length. Returns ------- tf.Tensor The aggregated feature tensor, either 2D or 3D depending on the mode. """ if not isinstance(lstm_output, list): # Input is likely already a 2D tensor, return as is. return lstm_output if not lstm_output: raise ValueError("Input `lstm_output` list cannot be empty.") # --- New 'concat' behavior to produce a single 3D tensor --- if mode == "concat": # This mode pads sequences to the same length and concatenates # on the feature axis, preserving the time dimension. # 1. Find the maximum sequence length in the list of tensors. max_len = 0 for tensor in lstm_output: if tensor.shape.ndims != 3: raise ValueError( "For 'concat' mode, all items in `lstm_output` must be " f"3D tensors, but found shape {tensor.shape}" ) max_len = tf_maximum(max_len, tf_shape(tensor)[1]) # 2. Pad each tensor to the max length. padded_tensors = [] for tensor in lstm_output: current_len = tf_shape(tensor)[1] # Paddings format: [[dim1_before, dim1_after], [dim2_before, dim2_after], ...] paddings = [[0, 0], [0, max_len - current_len], [0, 0]] padded_tensors.append(tf_pad(tensor, paddings, "CONSTANT")) # 3. Concatenate along the feature axis (-1). return tf_concat(padded_tensors, axis=-1) # --- Existing modes that reduce to a 2D tensor --- elif mode == "average": averaged_outputs = [ tf_reduce_mean(o, axis=1) for o in lstm_output ] return tf_concat(averaged_outputs, axis=-1) elif mode == "sum": summed_outputs = [ tf_reduce_sum(o, axis=1) for o in lstm_output ] return tf_concat(summed_outputs, axis=-1) elif mode == "flatten": # This mode requires all sequences to have the same length. concatenated = tf_concat(lstm_output, axis=-1) shape = tf_shape(concatenated) batch_size, time_dim, feat_dim = shape[0], shape[1], shape[2] return tf_reshape(concatenated, [batch_size, time_dim * feat_dim]) else: # Default for "last" or "auto" # Takes the last time step from each sequence and concatenates. last_outputs = [o[:, -1, :] for o in lstm_output] return tf_concat(last_outputs, axis=-1)
[docs] @register_keras_serializable( 'fusionlab.nn.components', name='aggregate_time_window_output' ) def aggregate_time_window_output( time_window_output:Tensor, mode: Optional[str]=None ): """ Aggregates time window output features based on the specified aggregation method. This function performs the final aggregation on a 3D tensor representing temporal features. The aggregation can be done by selecting the last time step, computing the average across time, or flattening the temporal and feature dimensions into a single vector per sample. The aggregation methods are defined as follows: .. math:: \text{last: } F = T[:, -1, :] .. math:: \text{average: } F = \frac{1}{T_{dim}} \sum_{i=1}^{T_{dim}} T[:, i, :] .. math:: \text{flatten: } F = \text{reshape}(T, (batch\_size, time\_dim \times feat\_dim)) where :math:`T` is the input tensor with shape :math:`(batch\_size, time\_dim, feat\_dim)` and :math:`F` is the aggregated output. Parameters ---------- time_window_output : tf.Tensor A 3D tensor of shape :math:`(batch\_size, time\_dim, feat\_dim)` representing the output features over time. mode : str, optional Aggregation method to apply. Supported values are: - ``"last"``: Selects the features from the last time step. - ``"average"``: Computes the mean of features across the time dimension. - ``"flatten"``: Flattens the time and feature dimensions into a single vector per sample. If ``mode`` is `None`, the function falls back to the ``flatten`` aggregation method. Returns ------- tf.Tensor The aggregated features tensor after applying the specified aggregation method. Raises ------ ValueError If an unsupported aggregation method is provided in the ``mode`` argument. Examples -------- >>> from fusionlab.nn.components import aggregate_time_window_output >>> import tensorflow as tf >>> # Create a dummy tensor with shape (2, 3, 4) >>> dummy = tf.random.uniform((2, 3, 4)) >>> # Apply average aggregation >>> result = aggregate_time_window_output(dummy, ... mode="average") Notes ----- - The function uses TensorFlow operations to ensure compatibility with TensorFlow's computation graph. - It is recommended to use this function as part of a larger neural network pipeline [1]_. See Also -------- tf.reduce_mean TensorFlow operation to compute mean along axes. References ---------- .. [1] Author Name, "Title of the reference", Journal/Conference, Year. """ mode = mode or 'flatten' if mode == "last": # Select the features corresponding to the last time step for # each sample. final_features = time_window_output[:, -1, :] elif mode == "average": # Compute the mean of the features across the time dimension. final_features = tf_reduce_mean(time_window_output, axis=1) elif mode == "flatten": # Retrieve the dynamic shape of the input tensor. shape = tf_shape(time_window_output) batch_size, time_dim, feat_dim = ( shape[0], shape[1], shape[2] ) # Flatten the time and feature dimensions into a single vector # per sample. final_features = tf_reshape( time_window_output, [batch_size, time_dim * feat_dim] ) else: # Raise an error if an unsupported aggregation method is provided. raise ValueError( f"Unsupported mode value: '{mode}'. Supported values are " f"'last', 'average', or 'flatten'." ) return final_features
[docs] def create_causal_mask(size: Union[int,Tensor]) -> Tensor: """ Creates a causal attention mask of shape [1,1,seq_len,seq_len] where mask[0,0,i,j] = 1.0 if j > i else 0.0. """ # Make sure size is a 0-D int32 Tensor size = tf_cast(size, tf_int32) # Build a vector [0,1,2,...,size-1] idxs = tf_range(size) # shape: [size] # Compare row < col for every pair (i,j) # row_idxs: [size,1], col_idxs: [1,size] row_idxs = tf_expand_dims(idxs, 1) # [size,1] col_idxs = tf_expand_dims(idxs, 0) # [1,size] # mask2d[i,j] = True if j > i, else False mask2d = tf_greater(col_idxs, row_idxs) # [size,size], dtype=bool # Cast to float (1.0 for masked positions, 0.0 elsewhere) mask2d = tf_cast(mask2d, tf_float32) # [size,size] # Expand to [1,1,size,size] so it broadcasts over (batch, heads) mask = tf_expand_dims(tf_expand_dims(mask2d, 0), 1) return mask
# def create_causal_mask(size: Union[int, Tensor]) -> Tensor: # """Creates a causal attention mask of shape [1,1,seq_len,seq_len].""" # # ensure `size` is an int32 Tensor # size = tf_cast(size, tf_int32) # # build shape as a Tensor so we don't capture Python tuples of Tensors # shape = tf_stack([size, size]) # ones = tf_ones(shape, dtype=tf_float32) # # make the [seq_len, seq_len] causal matrix # mask2d = 1.0 - tf_linalg.band_part(ones, -1, 0) # # now expand batch dim then head dim → [1,1,seq_len,seq_len] # mask = tf_expand_dims(mask2d, 0) # → [1, seq_len, seq_len] # mask = tf_expand_dims(mask, 1) # → [1, 1, seq_len, seq_len] # return mask def _create_causal_mask(size: Union[int, Tensor]) -> Tensor: """Creates a causal attention mask for the decoder.""" mask = 1 - tf_linalg.band_part(tf_ones((size, size)), -1, 0) # Add batch and head dimensions for broadcasting return mask[tf_expand_dims(tf_range(size), 0), :] # (1, 1, seq_len, seq_len) -> Keras MHA expects (B, T, T) or (B, N_heads, T, T) # TF MHA expects (B, N_heads, T, T) # Let's make it (1,1,T,T) for TF MHA layer, it will broadcast # # Keras MHA expects mask shape (batch_size, num_heads, query_length, key_length) # # or (batch_size, query_length, key_length) # # For causal, query_length == key_length == size # return tf_expand_dims(tf_expand_dims( # 1 - tf_linalg.band_part(tf_ones((size, size)), -1, 0), axis=0), axis=0)