# -*- coding: utf-8 -*-
# License: BSD-3-Clause
# Author: LKouadio <etanoyau@gmail.com>
"""
Misc / utility layers & helpers
"""
from __future__ import annotations
from typing import Callable, Union
import numpy as np
from ...api.property import NNLearner
from ...utils.deps_utils import ensure_pkg
from ._config import (
Layer, Dense, Dropout,
Tensor, TensorShape,
Constant,
activations,
tf_autograph,
tf_cast,
tf_float32,
tf_range,
tf_newaxis,
tf_sin,
tf_cos,
tf_shape,
tf_concat,
tf_pow,
tf_floordiv,
register_keras_serializable,
DEP_MSG, KERAS_BACKEND,
)
__all__ = [
"Activation",
"PositionwiseFeedForward",
"PositionalEncoding",
"TSPositionalEncoding",
"MultiModalEmbedding",
]
@register_keras_serializable(
'fusionlab.nn.components', name="Activation"
)
class Activation(Layer, NNLearner):
r"""
Flexible activation layer that transparently delegates to any
built‑in or user‑defined activation function.
Parameters
----------
activation : str or Callable or None, default ``'relu'``
Identifier of the desired activation.
* If *str*, it must be recognised by
:pymeth:`keras.activations.get`.
* If *Callable*, it must follow the signature
``f(tensor) -> tensor``.
* If *None*, the layer acts as the identity mapping
:math:`f(x)=x`.
**kwargs
Additional keyword arguments forwarded to
:class:`keras.layers.Layer` (e.g. ``name`` or ``dtype``).
Notes
-----
Let :math:`\mathbf{x}\in\mathbb{R}^{n}` be the input tensor and
:math:`\phi` the resolved activation function. The layer performs
.. math::
\mathbf{y} = \phi(\mathbf{x}).
Because :pyclass:`Activation` inherits from
:class:`keras.layers.Layer`, it can be freely composed inside a
``tf.keras.Sequential`` or functional graph.
Methods
-------
call(inputs, training=False)
Apply the resolved activation to *inputs*.
get_config()
Return a JSON‑serialisable configuration dictionary.
__repr__()
Nicely formatted string representation—helpful in interactive
sessions.
Examples
--------
>>> import tensorflow as tf
>>> from fusionlab.nn.components import Activation
>>> x = tf.constant([‑2., 0., 1.5])
>>> act = Activation('swish')
>>> act(x).numpy()
array([‑0.238, 0. , 1.273], dtype=float32)
Custom callable:
>>> def leaky_relu(x, alpha=0.1):
... return tf.where(x > 0, x, alpha * x)
...
>>> act = Activation(leaky_relu)
>>> act(x).numpy()
array([‑0.2, 0. , 1.5], dtype=float32)
See Also
--------
keras.activations.get
Canonical resolver used under the hood.
keras.layers.Activation
Native Keras counterpart with fewer conveniences.
References
----------
.. [1] Ramachandran, Prajit, et al. *Searching for Activation
Functions*. arXiv preprint arXiv:1710.05941 (2017).
"""
@ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG)
def __init__(self,
activation: Union[str, Callable, None] = 'relu',
**kwargs):
super().__init__(**kwargs)
# Store original user input for debugging / introspection
self.activation_original = activation
# Resolve activation into (callable, canonical string)
if activation is None:
self.activation_fn = activations.get(None)
self.activation_str = 'linear'
elif isinstance(activation, str):
# Try to get a standard name via serialize,
# fallback to object name
try:
self.activation_fn = activations.get(activation)
self.activation_str = activation
except ValueError as err:
raise ValueError(
f"Unknown activation '{activation}'."
) from err
elif callable(activation):
self.activation_fn = activation
try: # Try serialising
ser = activations.serialize(activation)
# Fallback if serialize doesn't give simple string
self.activation_str = (
ser if isinstance(ser, str)
else getattr(activation, '__name__',
activation.__class__.__name__)
)
except ValueError:
# Fallback if serialize doesn't give simple string
self.activation_str = getattr(
activation, '__name__',
activation.__class__.__name__
)
else:
raise TypeError(
"Parameter 'activation' must be *str*, Callable, or "
"*None*. Received type "
f"{type(activation).__name__!r}."
)
if not callable(self.activation_fn):
raise TypeError(
f"Resolved activation '{self.activation_str}' is not "
"callable."
)
@tf_autograph.experimental.do_not_convert
def call(self, inputs, training: bool = False):
"""
Apply the stored activation to `inputs`.
Parameters
----------
inputs : tf.Tensor
Input tensor of arbitrary shape.
training : bool, default ``False``
Present for API compatibility; ignored because most
activations do not behave differently at training time.
Returns
-------
tf.Tensor
Tensor with identical shape to *inputs* but transformed
element‑wise by the activation.
"""
# A single line keeps Autograph happy
# and maximises performance
return self.activation_fn(inputs)
def get_config(self) -> dict:
"""
Configuration dictionary for model serialization.
Returns
-------
dict
JSON‑friendly mapping that allows
:pyfunc:`keras.layers.deserialize` to recreate the layer.
"""
config = super().get_config()
# Save the CANONICAL STRING NAME for serialization
config.update({
'activation': self.activation_str
})
return config
# String representation
def __repr__(self) -> str: # noqa: D401
"""
Return *repr(self)*.
The canonical activation string is included for clarity.
"""
return (f"{self.__class__.__name__}("
f"activation={self.activation_str!r})")
@register_keras_serializable(
"fusionlab.nn.components",
name='PositionwiseFeedForward')
class PositionwiseFeedForward(Layer, NNLearner):
"""Implements the Position-wise Feed-Forward Network (FFN) layer.
This layer is a core component of a standard Transformer block,
typically applied after the multi-head attention sub-layer. Its
purpose is to process the context-rich output from the attention
mechanism at each position independently, adding non-linearity
and transformative capacity to the model.
The network consists of two fully-connected (Dense) layers with a
non-linear activation function in between. The first layer expands
the input dimensionality, and the second layer projects it back down.
Parameters
----------
embed_dim : int
The input and output dimensionality of the layer. This must match
the embedding dimension of the Transformer, often denoted as
:math:`d_{model}`.
ffn_dim : int
The dimensionality of the inner, expanded hidden layer. It is
common practice in Transformer architectures to set this to four
times the `embed_dim`.
activation : str, optional
The activation function to use in the inner layer. Any valid
Keras activation string is accepted. Defaults to ``"relu"``.
dropout_rate : float, optional
The dropout rate applied for regularization, typically after the
first activation function. Defaults to ``0.1``.
**kwargs
Standard keyword arguments for a Keras ``Layer``.
Notes
-----
The "position-wise" nature of this layer is its defining
characteristic. The same instance of this layer, with the exact
same set of learned weights (:math:`W_1, b_1, W_2, b_2`), is applied
to the feature vector at every single position (e.g., time step)
in the input sequence. It does not mix information between positions;
that task is handled by the preceding self-attention layer.
The mathematical operation for a single position vector :math:`x` is:
.. math::
\text{FFN}(x) = \text{Linear}_2(\text{activation}(\text{Linear}_1(x)))
The residual connection (:math:`x + \text{Dropout}(\text{FFN}(x))`)
is typically applied outside this layer, within the main
Transformer block.
See Also
--------
fusionlab.nn.components.TransformerEncoderLayer : A typical consumer of this layer.
tf.keras.layers.Dense : The core building block of the FFN.
References
----------
.. [1] Vaswani, A., et al. "Attention Is All You Need." *NeurIPS 2017*.
Examples
--------
>>> import tensorflow as tf
>>> # Create a dummy input tensor (batch, sequence_length, embed_dim)
>>> input_tensor = tf.random.normal((32, 50, 128))
...
>>> # Instantiate the FFN layer
>>> ffn_layer = PositionwiseFeedForward(embed_dim=128, ffn_dim=512)
...
>>> # Pass the input through the layer
>>> output_tensor = ffn_layer(input_tensor, training=True)
...
>>> # The output shape remains the same as the input shape
>>> print(f"Input Shape: {input_tensor.shape}")
>>> print(f"Output Shape: {output_tensor.shape}")
Input Shape: (32, 50, 128)
Output Shape: (32, 50, 128)
"""
def __init__(
self,
embed_dim: int,
ffn_dim: int,
activation: str = "relu",
dropout_rate: float = 0.1,
**kwargs
):
super().__init__(**kwargs)
# Store configuration for serialization
self.embed_dim = embed_dim
self.ffn_dim = ffn_dim
self.activation_str = activation
self.dropout_rate = dropout_rate
# Define the internal layers once in the constructor
self.dense_1 = Dense(
units=ffn_dim,
name="ffn_dense_1"
)
self.activation = Activation(activation).activation_fn
self.dense_2 = Dense(
units=embed_dim,
name="ffn_dense_2"
)
self.dropout = Dropout(rate=dropout_rate)
def call(self, x: Tensor, training: bool = False) -> Tensor:
"""Defines the forward pass for the FFN layer."""
# Project to the intermediate dimension
x = self.dense_1(x)
# Apply the non-linear activation function
x = self.activation(x)
# Apply dropout for regularization
x = self.dropout(x, training=training)
# Project back to the original embedding dimension
x = self.dense_2(x)
return x
def get_config(self):
"""Returns the configuration of the layer for serialization."""
config = super().get_config()
config.update({
"embed_dim": self.embed_dim,
"ffn_dim": self.ffn_dim,
"activation": self.activation_str,
"dropout_rate": self.dropout_rate,
})
return config
[docs]
@register_keras_serializable(
"fusionlab.nn.components",
name="PositionalEncoding",
)
class PositionalEncoding(Layer, NNLearner):
r"""
Sinusoidal positional encoding (Transformer-style).
This layer adds a deterministic (non-trainable) sinusoidal table to
the input sequence so the model can distinguish positions.
Key design goals (why this implementation looks “special”)
----------------------------------------------------------
1) **Graph-scope safety**
We build the table with NumPy inside `build()` and store it as a
**non-trainable Keras weight** via `add_weight(...)`.
*Why:* tensors created with TF ops inside `build()` can end up
attached to a temporary FuncGraph during tracing. Later, when the
model is re-traced or used in a different graph context (fit,
SavedModel, etc.), those tensors can become “out of scope” and
crash. A Keras weight is a TF Variable and is safe across graphs.
2) **Serialization safety across Keras 2 and Keras 3**
Older saved checkpoints may have **no positional_encoding weight**
(legacy versions stored a plain tensor attribute).
This class tolerates that during load by overriding:
- `set_weights` (Keras 2 / H5-like paths)
- `load_own_variables` (Keras 3 object-based save paths)
If the weight is missing, we keep the freshly initialized constant.
Parameters
----------
max_length : int, default=2048
Maximum sequence length supported by the precomputed table.
Notes
-----
The output shape matches the input: (B, T, D).
The `training` argument is accepted for API compatibility only.
References
----------
Vaswani et al., 2017, "Attention is All You Need".
"""
[docs]
def __init__(self, max_length: int = 2048, **kwargs):
super().__init__(**kwargs)
# Max supported sequence length for the lookup table.
# Stored as a Python int so it is JSON-serializable in config.
self.max_length = int(max_length)
# Will become a non-trainable weight of shape (1, max_length, D).
# Keeping it as None until `build()` ensures we know D.
self.positional_encoding = None
[docs]
def build(self, input_shape: TensorShape):
"""
Create the fixed sinusoidal table once.
`input_shape` is expected to be (B, T, D).
We only require D (feature dimension) to build the table.
"""
# Unpack shape; only feature_dim matters for the table.
_, _, feature_dim = input_shape
# D must be known at build time to allocate (1, max_length, D).
# If D is None, this layer cannot build a fixed table.
if feature_dim is None:
raise ValueError(
"The feature dimension of the input to "
"PositionalEncoding cannot be `None`."
)
# If Keras calls build multiple times, do not recreate the weight.
if self.positional_encoding is None:
d = int(feature_dim)
# ---------------------------------------------------------
# Build the sinusoidal table with NumPy (not TF ops).
#
# Why NumPy: avoids creating TF tensors inside `build()`
# that can be tied to a temporary FuncGraph during tracing.
# The result is then stored as a TF Variable via add_weight.
# ---------------------------------------------------------
pos = np.arange(self.max_length)[:, np.newaxis] # (L, 1)
i = np.arange(d)[np.newaxis, :] # (1, D)
# rates[j] = 1 / 10000^(2*floor(j/2)/D)
rates = 1.0 / np.power(
10000.0,
(2 * (i // 2)) / np.float32(d),
)
angles = pos * rates # (L, D)
# Interleave sin for even dims and cos for odd dims.
pe = np.zeros((self.max_length, d), dtype=np.float32)
pe[:, 0::2] = np.sin(angles[:, 0::2])
pe[:, 1::2] = np.cos(angles[:, 1::2])
# Add batch axis so broadcasting works: (1, L, D).
pe = pe[np.newaxis, :, :]
# ---------------------------------------------------------
# Store as a non-trainable weight:
# - a TF Variable (safe across graphs)
# - saved/restored by Keras serialization
# - excluded from optimizer updates
# ---------------------------------------------------------
self.positional_encoding = self.add_weight(
name="positional_encoding",
shape=pe.shape,
dtype=tf_float32,
initializer=Constant(pe),
trainable=False,
)
super().build(input_shape)
# -----------------------------------------------------------------
# Compatibility hooks (Keras 2 / Keras 3)
# -----------------------------------------------------------------
[docs]
def set_weights(self, weights):
"""
Keras 2 / H5-style loading hook.
Legacy checkpoints may provide an EMPTY list for this layer
(because old versions had no variables). If so, accept it and
keep the newly-initialized constant weight.
"""
if not weights:
return
return super().set_weights(weights)
[docs]
def load_own_variables(self, store):
"""
Keras 3 object-based loading hook.
In Keras 3, variable loading may use an internal "store" dict.
Legacy saves might not include 'positional_encoding'. If missing,
do nothing and keep the initialized constant.
"""
try:
if not store:
return
v = store.get("positional_encoding", None)
if v is None:
return
# Ensure the weight exists (build should have run).
if self.positional_encoding is not None:
self.positional_encoding.assign(v)
except Exception:
# Never fail deserialization for a deterministic constant.
return
[docs]
def call(self, inputs: Tensor, training=False) -> Tensor:
"""
Add positional encoding to the input.
inputs: (B, T, D)
returns: (B, T, D)
"""
# Current sequence length T (dynamic at runtime).
seq_len = tf_shape(inputs)[1]
# Slice to the required length and broadcast across batch:
# (B, T, D) + (1, T, D) -> (B, T, D)
return inputs + self.positional_encoding[:, :seq_len, :]
[docs]
def get_config(self) -> dict:
"""
Keras serialization config.
Keep config minimal and JSON-serializable.
"""
config = super().get_config()
config.update({"max_length": self.max_length})
return config
@register_keras_serializable(
'fusionlab.nn.components', name='PositionalEncoding')
class _PositionalEncoding(Layer, NNLearner):
r"""Injects positional information into an input tensor.
This layer adds a positional encoding to the input, allowing models
like Transformers to understand the order of the sequence. It uses
the standard sinusoidal encoding from the "Attention Is All You
Need" paper [1]_.
The positional encoding :math:`PE` is defined as:
.. math::
PE_{(pos, 2i)} = \sin\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right)
.. math::
PE_{(pos, 2i+1)} = \cos\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right)
where :math:`pos` is the position in the sequence, :math:`i` is the
dimension index, and :math:`d_{\text{model}}` is the feature dimension.
Parameters
----------
max_length : int, default 2048
The maximum possible sequence length. The encoding matrix will be
pre-calculated up to this length.
**kwargs
Standard Keras Layer keyword arguments.
Examples
--------
>>> import tensorflow as tf
>>> from fusionlab.nn.components import PositionalEncoding
>>> batch_size = 4
>>> sequence_length = 50
>>> feature_dimension = 128
>>> # Create dummy input tensor
>>> input_tensor = tf.random.normal(
... (batch_size, sequence_length, feature_dimension)
... )
>>> # Instantiate and apply the layer
>>> pos_encoding_layer = PositionalEncoding(max_length=5000)
>>> output_tensor = pos_encoding_layer(input_tensor)
>>> print("Input Tensor Shape:", input_tensor.shape)
>>> print("Output Tensor Shape:", output_tensor.shape)
>>> # The shape should be unchanged.
>>> assert input_tensor.shape == output_tensor.shape
>>> # You can visualize the encoding if you wish
>>> import matplotlib.pyplot as plt
>>> pe_matrix = pos_encoding_layer.positional_encoding[0, :, :].numpy()
>>> plt.figure(figsize=(10, 5))
>>> cax = plt.matshow(pe_matrix, fignum=1, aspect='auto', cmap='viridis')
>>> plt.gcf().colorbar(cax)
>>> plt.title("Sinusoidal Positional Encoding Matrix")
>>> plt.xlabel("Feature Dimension")
>>> plt.ylabel("Position in Sequence")
>>> plt.show()
References
----------
.. [1] Vaswani, A., et al. (2017). "Attention is all you need."
*Advances in Neural Information Processing Systems*, 30.
"""
def __init__(self, max_length: int = 2048, **kwargs):
super().__init__(**kwargs)
self.max_length = max_length
self.positional_encoding = None
# def build(self, input_shape: TensorShape):
# """Pre-calculates the positional encoding matrix."""
# # The input shape is (batch, sequence_length, feature_dim)
# _, _, feature_dim = input_shape
# if self.positional_encoding is None:
# # The calculation is done once and stored.
# # Ensure feature_dim is a concrete value for matrix creation.
# if feature_dim is None:
# raise ValueError(
# "The feature dimension of the input to "
# "PositionalEncoding cannot be `None`. Please "
# "ensure the input has a defined feature dimension."
# )
# # Cast to float for calculations
# d_model = tf_cast(feature_dim, tf_float32)
# # Create a matrix of positions (max_length, 1)
# positions = tf_range(
# self.max_length, dtype=tf_float32)[:, tf_newaxis]
# # Create the division term for the sine/cosine functions
# # Shape: (feature_dim / 2)
# div_term = tf_exp(
# tf_range(0, feature_dim, 2, dtype=tf_float32) * \
# (-tf_log(10000.0) / d_model)
# )
# # Calculate sinusoidal values for even and odd indices
# # Shape of each: (max_length, feature_dim / 2)
# pe_sin = tf_sin(positions * div_term)
# pe_cos = tf_cos(positions * div_term)
# # Interleave sin and cos values to get final encoding
# # Resulting shape: (max_length, feature_dim)
# pe_interleaved = tf_reshape(
# tf_stack([pe_sin, pe_cos], axis=-1),
# shape=[self.max_length, feature_dim]
# )
# # Add an extra dimension for broadcasting across the batch
# # Shape: (1, max_length, feature_dim)
# self.positional_encoding = pe_interleaved[tf_newaxis, :, :]
# super().build(input_shape)
def build(self, input_shape: TensorShape):
# `input_shape` is expected to be (B, T, D).
# We only need the feature dimension D to
# construct the sinusoidal table.
_, _, feature_dim = input_shape
# D must be concrete at build time because we
# allocate a fixed (1, max_length, D) tensor.
# If D is None, we cannot create the table.
if feature_dim is None:
raise ValueError(
"The feature dimension of the input to "
"PositionalEncoding cannot be `None`."
)
# Cache: build the encoding only once even if
# `build()` is called multiple times.
if self.positional_encoding is None:
# Convert to a Python int for NumPy ops.
d = int(feature_dim)
# XXX IMPORTANT:
# Build in NumPy (not TF ops) to avoid creating
# graph-tensors during `build()`.
#
# Why: when the model is traced (tf.function /
# Keras training graph), TF tensors created in a
# different FuncGraph can later be "out of scope"
# and crash when reused.
pos = np.arange(self.max_length)[:, np.newaxis]
i = np.arange(d)[np.newaxis, :]
# Compute angle rates:
# rate[j] = 1 / 10000^(2*floor(j/2)/d)
# Use float32 to keep the table compact and to
# match typical model dtype.
rates = 1.0 / np.power(
10000.0,
(2 * (i // 2)) / np.float32(d),
)
angles = pos * rates
# Interleave sin/cos:
# even dims -> sin, odd dims -> cos.
pe = np.zeros(
(self.max_length, d),
dtype=np.float32,
)
pe[:, 0::2] = np.sin(angles[:, 0::2])
pe[:, 1::2] = np.cos(angles[:, 1::2])
# Add a leading batch axis so call() can do:
# inputs + pe[:, :seq_len, :]
# and broadcast across the batch dimension.
pe = pe[np.newaxis, :, :] # (1, max_len, d)
# Store as a non-trainable weight:
# - becomes a TF Variable (safe across graphs)
# - serialized with the layer/model
# - not updated by the optimizer
self.positional_encoding = self.add_weight(
name="positional_encoding",
shape=pe.shape,
dtype=tf_float32,
initializer=Constant(pe),
trainable=False,
)
# Mark the layer as built for Keras bookkeeping.
super().build(input_shape)
def call(self, inputs: Tensor, training=False ) -> Tensor:
r"""Adds positional encoding to the input tensor.
The 'training' argument is accepted but not used.
This ensures API compatibility with Keras.
Parameters
----------
inputs : tf.Tensor
A 3D tensor of shape :math:`(B, T, D)`, where ``B`` is
the batch size, ``T`` is the sequence length, and ``D``
is the feature dimension.
Returns
-------
tf.Tensor
The input tensor with positional encodings added.
Shape: :math:`(B, T, D)`.
Notes
------
The Positional encoding does not depends on training.
The sinusoidal PositionalEncoding layer performs a deterministic
mathematical operation. It calculates a fixed matrix of sine and
cosine values based on position and feature dimension and simply
adds it to the input. This calculation is the same whether you are
training the model or running it for inference. Unlike layers such
as Dropout or BatchNormalization, PositionalEncoding has no different
behavior during training.
"""
# Get the sequence length of the current input batch.
seq_len = tf_shape(inputs)[1]
# Slice the pre-calculated encoding matrix to match the input
# sequence length and add it to the input tensor.
# The broadcasting mechanism will handle the batch dimension.
return inputs + self.positional_encoding[:, :seq_len, :]
def get_config(self) -> dict:
"""Returns the configuration of the layer."""
config = super().get_config()
config.update({
'max_length': self.max_length,
})
return config
@register_keras_serializable(
'fusionlab.nn.components', name="TSPositionalEncoding")
class TSPositionalEncoding(Layer, NNLearner):
"""
Standard Transformer Positional Encoding using sine and cosine functions.
Adds positional information to input embeddings.
Args:
max_position (int): Maximum sequence length that this layer can handle.
embed_dim (int): The dimensionality of the embeddings (and the
positional encoding).
"""
def __init__(self, max_position: int, embed_dim: int, **kwargs):
super().__init__(**kwargs)
self.max_position = max_position
self.embed_dim = embed_dim
# self.pos_encoding is created once and stored.
self.pos_encoding = self._build_positional_encoding(
max_position, embed_dim)
def _build_positional_encoding(
self, position: int, d_model: int) -> Tensor:
"""Builds the positional encoding matrix using NumPy
then converts to Tensor."""
# 1. Calculate angles in NumPy
# 'pos' is for positions (sequence length), 'i' is for dimension
pos_np = np.arange(position)[:, np.newaxis]
i_np = np.arange(d_model)[np.newaxis, :]
angle_rates_np = 1 / np.power(
10000, (2 * (i_np // 2)) / np.float32(d_model)
)
angle_rads_np = pos_np * angle_rates_np
# 2. Apply sin to even indices in the array; 2i
angle_rads_np[:, 0::2] = np.sin(angle_rads_np[:, 0::2])
# 3. Apply cos to odd indices in the array; 2i+1
angle_rads_np[:, 1::2] = np.cos(angle_rads_np[:, 1::2])
# 4. Add a new axis for batch dimension and cast to TensorFlow tensor
# The self.pos_encoding expects (1, max_position, embed_dim)
pos_encoding_tensor = tf_cast(
angle_rads_np[np.newaxis, ...], dtype=tf_float32
)
return pos_encoding_tensor
def _tf_build_positional_encoding(self, position, d_model):
"""Builds the positional encoding matrix."""
angle_rads = self._get_angles(
# Use np.arange for non-Tensor context
# if KERAS_DEPS.arange isn't suitable
tf_range(position)[:, tf_newaxis],
tf_range(d_model)[tf_newaxis, :],
d_model
)
# Apply sin to even indices in the array; 2i
angle_rads[:, 0::2] = tf_sin(angle_rads[:, 0::2])
# Apply cos to odd indices in the array; 2i+1
angle_rads[:, 1::2] = tf_cos(angle_rads[:, 1::2])
pos_encoding_np = angle_rads[tf_newaxis, ...]
return tf_cast(pos_encoding_np, dtype=tf_float32)
def _get_angles(self, pos, i, d_model):
"""Calculates the angle rates for positional encoding."""
# Use np.power for non-Tensor context
angle_rates = 1 / np.power(10000, (
2 * (i // 2)) / np.float32(d_model))
return pos * angle_rates
def _tf_get_angles(self, pos, i, d_model):
"""Calculates the angle rates for positional encoding."""
# cast d_model to float32
d_model_f = tf_cast(d_model, tf_float32)
# compute floor(i/2) as an integer tensor
half_i = tf_floordiv(i, 2)
# build the numerator 2 * (i//2), then cast to float32
numer = tf_cast(2 * half_i, tf_float32)
# now both numer and d_model_f are float32
exponent = numer / d_model_f
# compute the rates with float constants
angle_rates = 1.0 / tf_pow(10000.0, exponent)
# and finally apply to pos (cast pos to float32 if needed)
return tf_cast(pos, tf_float32) * angle_rates
def call(self, x, training=False):
"""Adds positional encoding to the input tensor `x`.
The 'training' argument is accepted but not used.
This ensures API compatibility with Keras.
"""
if not KERAS_BACKEND:
raise RuntimeError(
"PositionalEncodingTF layer requires "
"a Keras backend (TensorFlow)."
)
input_seq_len = tf_shape(x)[1]
# Add positional encoding up to the length of the input sequence.
return x + self.pos_encoding[:, :input_seq_len, :]
def get_config(self):
config = super().get_config()
config.update({
"max_position": self.max_position,
"embed_dim": self.embed_dim,
})
return config
[docs]
@register_keras_serializable(
'fusionlab.nn.components', name="MultiModalEmbedding"
)
class MultiModalEmbedding(Layer, NNLearner):
r"""
MultiModalEmbedding layer for embedding multiple
input modalities into a common feature space and
concatenating them along the last dimension.
This layer takes a list of tensors, each representing
a different modality with the same batch and time
dimensions. It applies a dense projection (with
activation) to each modality, converting them to
the same dimensionality before concatenation.
.. math::
\mathbf{H}_{out} = \text{Concat}\big(
\text{Dense}(\mathbf{M_1}),\,
\text{Dense}(\mathbf{M_2}),\,\dots\big)
where each :math:`\mathbf{M_i}` is a tensor for a
specific modality.
Parameters
----------
embed_dim : int
Dimensionality of the output embedding for
each modality.
Notes
-----
This layer expects each input modality tensor to
have the same batch and time dimensions,
but potentially different feature dimensions.
Methods
-------
call(`inputs`, training=False)
Forward pass that projects each modality
separately, then concatenates.
get_config()
Returns a configuration dictionary for
serialization.
from_config(`config`)
Recreates the layer from a config dict.
Examples
--------
>>> from fusionlab.nn.components import MultiModalEmbedding
>>> import tensorflow as tf
>>> # Suppose we have two modalities:
... # dynamic_modality : (batch, time, dyn_dim)
... # future_modality : (batch, time, fut_dim)
>>> dyn_input = tf.random.normal((32, 10, 16))
>>> fut_input = tf.random.normal((32, 10, 8))
>>> # Instantiate the layer
>>> mm_embed = MultiModalEmbedding(embed_dim=32)
>>> # Forward pass with both modalities
>>> outputs = mm_embed([dyn_input, fut_input])
See Also
--------
LearnedNormalization
Normalizes input features before embedding.
HierarchicalAttention
Another specialized layer that can be used
after embeddings are computed.
"""
[docs]
@ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG)
def __init__(self, embed_dim: int):
super().__init__()
self.embed_dim = embed_dim
# Will hold a separate Dense layer
# for each modality
self.dense_layers = []
[docs]
def build(self, input_shape):
r"""
Build method that creates a Dense layer
for each modality based on input_shape.
Parameters
----------
input_shape : list of tuples
Each tuple corresponds to a modality's
shape, typically (batch_size, time_steps,
feature_dim).
"""
for modality_shape in input_shape:
if modality_shape is not None:
self.dense_layers.append(
Dense(
self.embed_dim,
activation='relu'
)
)
else:
raise ValueError(
"Unsupported modality type."
)
super().build(input_shape)
[docs]
@tf_autograph.experimental.do_not_convert
def call(self, inputs, training=False):
r"""
Forward pass: project each modality
into `embed_dim` and concatenate.
Parameters
----------
``inputs`` : list of tf.Tensor
Each tensor has shape
:math:`(B, T, D_i)` where `D_i` can
vary by modality.
training : bool, optional
Indicates if the layer is in training
mode. Defaults to ``False``.
Returns
-------
tf.Tensor
A concatenated embedding of shape
:math:`(B, T, \sum_{i}(\text{embed_dim}))`.
"""
embeddings = []
for idx, modality in enumerate(inputs):
if isinstance(modality, Tensor):
modality_embed = (
self.dense_layers[idx](
modality
)
)
else:
raise ValueError(
"Unsupported modality type."
)
embeddings.append(modality_embed)
return tf_concat(embeddings, axis=-1)
[docs]
def get_config(self):
r"""
Returns the configuration dictionary
of this layer.
Returns
-------
dict
Configuration including `embed_dim`.
"""
config = super().get_config().copy()
config.update({
'embed_dim': self.embed_dim
})
return config
[docs]
@classmethod
def from_config(cls, config):
r"""
Recreates a MultiModalEmbedding layer from
a config dictionary.
Parameters
----------
``config`` : dict
Configuration as produced by
``get_config``.
Returns
-------
MultiModalEmbedding
A new instance of this layer.
"""
return cls(**config)