# -*- coding: utf-8 -*-
# License: BSD-3-Clause
# Author: LKouadio <etanoyau@gmail.com>
"""
Physics-Informed Hybrid Attentive LSTM Network (PIHALNet).
"""
from __future__ import annotations
from numbers import Integral, Real
from typing import List, Optional, Union, Dict, Tuple
from ..._fusionlog import fusionlog, OncePerMessageFilter
from ...core.handlers import param_deprecated_message
from ...compat.sklearn import validate_params, Interval, StrOptions
from ...params import LearnableC, FixedC, DisabledC
from ...utils.deps_utils import ensure_pkg
from .. import KERAS_BACKEND, KERAS_DEPS, dependency_message
from .._base_attentive import BaseAttentive
if KERAS_BACKEND:
from .._tensor_validation import check_inputs
from .op import process_pinn_inputs, compute_consolidation_residual
from .utils import process_pde_modes
MeanSquaredError = KERAS_DEPS.Adam
Adam =KERAS_DEPS.Adam
Tensor = KERAS_DEPS.Tensor
Constant =KERAS_DEPS.Constant
GradientTape = KERAS_DEPS.GradientTape
tf_exp = KERAS_DEPS.exp
tf_square = KERAS_DEPS.square
tf_reduce_mean = KERAS_DEPS.reduce_mean
tf_zeros_like = KERAS_DEPS.zeros_like
tf_rank =KERAS_DEPS.rank
tf_exp =KERAS_DEPS.exp
tf_constant =KERAS_DEPS.constant
tf_log = KERAS_DEPS.log
tf_constant =KERAS_DEPS.constant
tf_float32=KERAS_DEPS.float32
tf_autograph=KERAS_DEPS.autograph
tf_autograph.set_verbosity(0)
register_keras_serializable =KERAS_DEPS.register_keras_serializable
DEP_MSG = dependency_message('nn.pinn.models')
logger = fusionlog().get_fusionlab_logger(__name__)
logger.addFilter(OncePerMessageFilter())
__all__ = ["PIHALNet"]
[docs]
@register_keras_serializable(
'fusionlab.nn.pinn', name="PIHALNet"
)
@param_deprecated_message(
conditions_params_mappings=[
{
'param': 'pde_mode',
'condition': lambda p_value: (
p_value == 'gw_flow' or
p_value == 'both' or
p_value == 'none' or
(isinstance(p_value, list) and
any(mode in ['gw_flow', 'both', 'none'] for mode in p_value) and
not ('consolidation' in p_value and len(p_value) == 1)
)
),
'message': (
"Warning: The 'pde_mode' parameter received a value that "
"includes options ('gw_flow', 'both', 'none') which are "
"the primary focus of the current physics-informed implementation. "
"This version of PIHALNet is optimized for and defaults to "
"'consolidation' mode to ensure robust physical constraints "
"based on Terzaghi's theory with finite differences. "
"The model will proceed using 'consolidation' mode. Full "
"support for other PDE modes and their specific derivative "
"can be explored in 'fusionlab.nn.pinn.TransFlowSubsNet' instead."
),
'default': 'consolidation',
}
],
warning_category=UserWarning
)
class PIHALNet(BaseAttentive):
"""
Physics-Informed Hybrid Attentive-based LSTM Network (PIHALNet).
This model integrates a data-driven forecasting architecture, based on
LSTMs and multiple attention mechanisms, with physics-informed
constraints derived from the governing equations of land subsidence.
It is designed to produce physically consistent, multi-horizon
probabilistic forecasts for both subsidence and groundwater levels,
while also offering the capability to discover physical parameters
from observational data.
The architecture can operate in two modes for its physical
coefficients:
1. **Parameter Specification:** Use predefined physical constants.
2. **Parameter Discovery:** Treat physical constants as trainable
variables to be learned during training (default).
"""
[docs]
@validate_params({
'output_subsidence_dim': [
Interval(Integral,1, None, closed="left")],
'output_gwl_dim': [
Interval(Integral,1, None, closed="left"),],
"pde_mode": [
StrOptions({'consolidation', 'gw_flow', 'both', 'none'}),
'array-like', None
],
"pinn_coefficient_C": [
str, Real, None, StrOptions({"learnable"}),
LearnableC, FixedC, DisabledC
],
"gw_flow_coeffs": [dict, None],
})
@ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG)
def __init__(
self,
static_input_dim: int,
dynamic_input_dim: int,
future_input_dim: int,
output_subsidence_dim: int = 1,
output_gwl_dim: int = 1,
embed_dim: int = 32,
hidden_units: int = 64,
lstm_units: int = 64,
attention_units: int = 32,
num_heads: int = 4,
dropout_rate: float = 0.1,
forecast_horizon: int = 1,
quantiles: Optional[List[float]] = None,
max_window_size: int = 10,
memory_size: int = 100,
scales: Optional[List[int]] = None,
multi_scale_agg: str = 'last',
final_agg: str = 'last',
activation: str = 'relu',
use_residuals: bool = True,
use_batch_norm: bool = False,
pde_mode: Union[str, List[str], None] = 'consolidation',
pinn_coefficient_C: Union[
LearnableC, FixedC, DisabledC, str, float, None
] = LearnableC(initial_value=0.01),
gw_flow_coeffs: Optional[Dict[str, Union[str, float, None]]] = None,
use_vsn: bool = True,
vsn_units: Optional[int] = None,
mode: Optional[str]=None,
attention_levels:Optional[Union[str, List[str]]]=None,
architecture_config: Optional[Dict] = None,
name: str = "PIHALNet",
**kwargs
):
self._combined_output_target_dim = (
output_subsidence_dim + output_gwl_dim
)
super().__init__(
static_input_dim=static_input_dim,
dynamic_input_dim=dynamic_input_dim,
future_input_dim=future_input_dim,
output_dim= self._combined_output_target_dim,
forecast_horizon=forecast_horizon,
mode=mode,
quantiles=quantiles,
embed_dim=embed_dim,
hidden_units=hidden_units,
lstm_units=lstm_units,
attention_units=attention_units,
num_heads=num_heads,
dropout_rate=dropout_rate,
max_window_size=max_window_size,
memory_size=memory_size,
scales=scales,
multi_scale_agg=multi_scale_agg,
final_agg=final_agg,
activation=activation,
use_residuals=use_residuals,
use_vsn=use_vsn,
use_batch_norm =use_batch_norm,
vsn_units=vsn_units,
attention_levels =attention_levels,
architecture_config=architecture_config,
name=name,
**kwargs
)
# Initialize only PINN-specific attributes
self.output_subsidence_dim = output_subsidence_dim
self.output_gwl_dim = output_gwl_dim
# --- Store PINN Configuration ---
self.pde_modes_active = process_pde_modes(
pde_mode, enforce_consolidation=True,
solo_return=True,
)
# Normalize pinn_coefficient_C into one of our helper classes or legacy:
self.pinn_coefficient_C_config = self._normalize_C_descriptor(
pinn_coefficient_C)
self.gw_flow_coeffs_config = (
gw_flow_coeffs if gw_flow_coeffs is not None else {}
)
# Build the physics-related components
self._build_pinn_components()
[docs]
def split_outputs(
self,
predictions_combined: Tensor,
decoded_outputs_for_mean: Tensor
) -> Tuple[Tensor, Tensor, Tensor, Tensor]:
r"""
Separate the **combined output tensor** into individual
subsidence and groundwater‑level (GWL) components and return
both the *final* and *mean* predictions needed for the two loss
terms used in PIHALNet (data loss and physics/PDE loss).
The method supports two output shapes:
* **Quantile mode**
``(B, H, Q, C)`` where *Q* is the number of quantiles and
*C* = ``output_subsidence_dim + output_gwl_dim``.
* **Deterministic mode**
``(B, H, C)`` when quantiles are disabled.
Parameters
----------
predictions_combined : Tensor
Network output after the
:class:`~fusionlab.nn.pinn.QuantileDistributionModeling`
stage. Shape is ``(B, H, C)`` or ``(B, H, Q, C)``.
decoded_outputs_for_mean : Tensor
Decoder output *before* quantile distribution, used to
compute the PDE residual. Shape is ``(B, H, C)``.
training : bool, optional
*Inherited from the calling context.* Present only in
TensorFlow graph mode; not used explicitly here.
Returns
-------
s_pred_final : Tensor
Subsidence predictions ready for the data‑fidelity loss.
Shape matches ``predictions_combined`` minus the *C* split.
gwl_pred_final : Tensor
GWL predictions ready for the data‑fidelity loss.
s_pred_mean_for_pde : Tensor
Mean (deterministic) subsidence predictions used when
computing physics‑based derivatives.
gwl_pred_mean_for_pde : Tensor
Mean GWL predictions for the PDE residual term.
Notes
-----
* Mean predictions are extracted *only* from
``decoded_outputs_for_mean`` because applying the quantile
mapping first would break the differentiability required for
spatial–temporal derivatives.
* When TensorFlow executes in graph mode and the rank of
*predictions_combined* is dynamic, the function falls back to
:pyfunc:`tf.rank` for shape inspection.
Examples
--------
>>> outputs = model(...) # forward pass
>>> s_final, gwl_final, s_mean, gwl_mean = (
... model.split_outputs(
... predictions_combined=outputs["pred"],
... decoded_outputs_for_mean=outputs["dec_mean"],
... )
... )
>>> s_final.shape
TensorShape([32, 24, 3]) # e.g. B=32, H=24, Q=3
>>> gwl_mean.shape
TensorShape([32, 24, 1]) # deterministic mean
See Also
--------
fusionlab.nn.pinn.QuantileDistributionModeling :
Layer that adds the quantile dimension.
fusionlab.nn.pinn.PIHALNet.run_halnet_core :
Produces ``decoded_outputs_for_mean``.
"""
# --- 1. Extract Mean Predictions (for PDE Loss) ---
# These come from the decoder output *before* quantile distribution.
# This provides a stable point forecast for derivative calculation.
s_pred_mean_for_pde = decoded_outputs_for_mean[
..., :self.output_subsidence_dim
]
gwl_pred_mean_for_pde = decoded_outputs_for_mean[
..., self.output_subsidence_dim:
]
# --- 2. Extract Final Predictions (for Data Loss) ---
# These may or may not include a quantile dimension.
# We check the tensor's rank to decide how to slice.
# Keras may return a known static rank during build time,
# or we can use tf.rank for dynamic graph execution.
if self.quantiles and hasattr(
predictions_combined, 'shape') and len(
predictions_combined.shape) == 4:
# Case: Quantiles are present.
# Shape is (Batch, Horizon, NumQuantiles, CombinedOutputDim)
s_pred_final = predictions_combined[
..., :self.output_subsidence_dim
]
gwl_pred_final = predictions_combined[
..., self.output_subsidence_dim:
]
elif (
hasattr(predictions_combined, 'shape')
and len(predictions_combined.shape) == 3
):
# Case: No quantiles. Shape is (Batch, Horizon, CombinedOutputDim)
s_pred_final = predictions_combined[
..., :self.output_subsidence_dim
]
gwl_pred_final = predictions_combined[
..., self.output_subsidence_dim:
]
else:
# This case handles dynamic shapes during graph execution
# and acts as a fallback.
if self.quantiles and tf_rank(predictions_combined) == 4:
s_pred_final = predictions_combined[..., :self.output_subsidence_dim]
gwl_pred_final = predictions_combined[..., self.output_subsidence_dim:]
elif tf_rank(predictions_combined) == 3:
s_pred_final = predictions_combined[..., :self.output_subsidence_dim]
gwl_pred_final = predictions_combined[..., self.output_subsidence_dim:]
else:
# This case should ideally not be reached if QDM is consistent
raise ValueError(
f"Unexpected shape from QuantileDistributionModeling: "
f"Rank is {tf_rank(predictions_combined)}"
)
return (s_pred_final, gwl_pred_final,
s_pred_mean_for_pde, gwl_pred_mean_for_pde)
[docs]
@tf_autograph.experimental.do_not_convert
def call(
self,
inputs: Dict[str, Optional[Tensor]],
training: bool = False
) -> Dict[str, Tensor]:
r"""
Forward pass for :class:`~fusionlab.nn.pinn.PIHALNet`.
This method orchestrates the full **physics‑informed** workflow:
1. Validate and split the input dictionary into static, dynamic,
future, and coordinate tensors.
2. Run the HALNet encoder–decoder core to obtain a latent
representation.
3. Produce mean forecasts with the multi‑horizon decoder.
4. Optionally expand those means into quantile predictions.
5. Separate combined outputs into subsidence and GWL streams for
both data‑loss and physics‑loss branches.
6. Compute the consolidation PDE residual on the mean series.
7. Return a dictionary ready for the model’s composite loss.
Parameters
----------
inputs : dict[str, Tensor]
Dictionary containing at least the following keys
(created by :pyfunc:`fusionlab.nn.pinn.process_pinn_inputs`):
* ``'coords'`` – tensor ``(B, H, 3)`` with
*(t, x, y)* coordinates.
* ``'static_features'`` – tensor ``(B, S)``.
* ``'dynamic_features'`` – tensor ``(B, T_past, D)``.
* ``'future_features'`` – tensor ``(B, H, F)``.
training : bool, default=False
Standard Keras flag indicating training or inference mode.
Returns
-------
dict[str, Tensor]
* ``'subs_pred'`` – subsidence predictions, shape
``(B, H, Q, O_s)`` or ``(B, H, O_s)``.
* ``'gwl_pred'`` – GWL predictions, same layout for *O_g*.
* ``'pde_residual'`` – physics residual,
shape ``(B, H, 1)`` (all zeros if *H = 1*).
Notes
-----
* Quantile outputs are produced only when the model’s
``quantiles`` attribute is not *None*.
* The PDE residual is based on a discrete‑time consolidation
equation evaluated with finite differences; therefore a
forecast horizon > 1 is required.
Examples
--------
>>> out = pihalnet(
... {
... "coords": coords,
... "static_features": s,
... "dynamic_features": d,
... "future_features": f,
... },
... training=True,
... )
>>> out["subs_pred"].shape
TensorShape([32, 24, 3, 1]) # B=32, H=24, Q=3, O_s=1
>>> out["pde_residual"].shape
TensorShape([32, 24, 1])
See Also
--------
fusionlab.nn.pinn.PIHALNet.run_halnet_core :
Feature‑extraction backbone called internally.
fusionlab.nn.pinn.PIHALNet.split_outputs :
Helper that separates subsidence and GWL channels.
"""
# 1. Unpack and Validate Inputs
# The `process_pinn_inputs` helper unpacks the input dict and
# isolates the coordinate tensors for later use.
logger.debug("PIHALNet call: Processing PINN inputs.")
t, x, y, static_features, dynamic_features, future_features = \
process_pinn_inputs(inputs, mode='as_dict')
# basic tensors checks.
check_inputs(
dynamic_inputs= dynamic_features,
static_inputs= static_features,
future_inputs= future_features,
dynamic_input_dim= self.dynamic_input_dim,
static_input_dim = self.static_input_dim,
future_input_dim= self.future_input_dim,
forecast_horizon= self.forecast_horizon,
verbose=0 # Set to >0 for detailed logging from checks
)
logger.debug(
"Input shapes after validation:"
f" S={getattr(static_features, 'shape', 'None')}, "
f"D={getattr(dynamic_features, 'shape', 'None')},"
f" F={getattr(future_features, 'shape', 'None')}"
)
# 2. Get the data-driven predictions from the base model
# The base call handles validation and the core encoder-decoder
# Get the data-driven predictions from the base model
# The base call handles validation and the core encoder-decoder
# outputs the decoder final predictions
predictions_final_targets = super().call(
[static_features, dynamic_features, future_features],
training=training)
# `base_outputs` contains the combined predictions.
# We need both the final (potentially quantile) predictions for
# data loss and the mean predictions for the PDE residual.
logger.debug(
f"Shape of decoded outputs (means):"
f" {predictions_final_targets.shape}")
# --- 4. Split and Organize Outputs ---
# Use helper to separate subsidence and GWL predictions for both
# data loss (quantiles) and physics loss (mean).
(s_pred_final, gwl_pred_final,
s_pred_mean_for_pde, gwl_pred_mean_for_pde) = self.split_outputs(
predictions_combined=predictions_final_targets,
decoded_outputs_for_mean=self._decoded_outputs
)
# --- 5. Calculate Physics Residual ---
# Let's verify the shape and slice if needed
# (this is a patch, not the ideal fix)
if t.shape[1] != self.forecast_horizon:
# This indicates a data pipeline issue, but we can patch it here
# by assuming the last `forecast_horizon` steps of `t`
# are the correct ones.
# This assumption may be incorrect depending on your sequence prep.
t_for_pde = t[:, -self.forecast_horizon:, :]
# A better approach is to fix the data pipeline so `inputs['coords']`
# has the shape (batch, forecast_horizon, 3) from the start.
else:
t_for_pde = t
# 3. Compute the physics residual
# The PDE residual is calculated on the mean predictions using
# finite differences, which is suitable for sequence outputs.
# This does NOT require a GradientTape in the call method.
logger.debug("Computing PDE residual from mean predictions.")
if self.forecast_horizon > 1:
pde_residual = compute_consolidation_residual(
s_pred=s_pred_mean_for_pde,
h_pred=gwl_pred_mean_for_pde,
time_steps=t_for_pde, # `t` holds the forecast time sequence
C=self.get_pinn_coefficient_C()
)
else:
# Cannot compute discrete time derivative with a single point
logger.warning(
"Forecast horizon is 1, cannot compute finite-difference "
"based PDE residual. Returning zeros."
)
pde_residual = tf_zeros_like(s_pred_mean_for_pde)
logger.debug(f"Shape of PDE residual: {pde_residual.shape}")
# 4. Return the combined dictionary for the loss function
return {
"subs_pred": s_pred_final,
"gwl_pred": gwl_pred_final,
"pde_residual": pde_residual,
}
[docs]
def compile(
self,
optimizer,
loss,
metrics=None,
loss_weights=None,
lambda_pde: float = 1.0,
**kwargs,
):
r"""
Configure PIHALNet for training with a **composite PINN loss**.
The total loss optimised during :pyfunc:`train_step`
is
.. math::
L_\text{total} \;=\;
\sum_i w_i\,L_{\text{data},i}
+ \lambda_\text{pde}\,L_\text{pde}
where *i* indexes the outputs (subsidence, GWL).
Parameters
----------
optimizer : tf.keras.optimizers.Optimizer | str
Optimiser instance or Keras alias (e.g., ``"adam"``).
loss : dict
Mapping ``{"subs_pred": loss_fn, "gwl_pred": loss_fn}``.
Each value can be a Keras loss object or a string such
as ``"mse"``.
metrics : dict, optional
Mapping from output keys to a list of Keras metric objects
(or their aliases) that will be tracked during training and
evaluation.
loss_weights : dict, optional
Scalar weights :math:`w_i` for each *data* loss term.
Defaults to 1 for every output.
lambda_pde : float, default=1.0
Weight applied to :math:`L_\text{pde}` (physics residual).
**kwargs
Additional keywords forwarded to
:pyfunc:`tf.keras.Model.compile`.
Notes
-----
* The physics‑residual term is added manually in
:pyfunc:`train_step`; therefore ``lambda_pde`` is stored as an
attribute rather than passed to ``loss_weights``.
"""
# Call the parent's compile method. It will handle the setup of
# losses, metrics, and weights for our named outputs.
super().compile(
optimizer=optimizer,
loss=loss,
metrics=metrics,
loss_weights=loss_weights,
**kwargs,
)
# Store the PINN-specific loss weight
self.lambda_pde = lambda_pde
[docs]
def train_step(self, data: Tuple[Dict, Dict]) -> Dict[str, Tensor]:
r"""
Single optimisation step implementing the composite loss.
The procedure is:
1. Forward pass → ``self.call`` to obtain
``{"subs_pred", "gwl_pred", "pde_residual"}``.
2. Compute data‑fidelity loss via
:pyfunc:`tf.keras.Model.compute_loss`.
3. Compute physics residual loss
:math:`L_\text{pde} = \langle r^2\rangle` where
``r = outputs["pde_residual"]``.
4. Form
:math:`L_\text{total}=L_\text{data} +
\lambda_\text{pde}\,L_\text{pde}` and back‑propagate.
5. Update Keras metrics and return a results dictionary.
Parameters
----------
data : tuple(dict, dict)
Tuple ``(inputs, targets)`` produced by the data pipeline.
Returns
-------
dict[str, Tensor]
Includes user‑defined metrics plus ``"total_loss"``,
``"data_loss"``, and ``"physics_loss"``.
Raises
------
ValueError
If *data* is not a two‑element tuple of dictionaries.
"""
# Unpack the data. Keras provides it as a tuple of (inputs, targets).
# We expect both to be dictionaries.
if not isinstance(data, tuple) or len(data) != 2:
raise ValueError(
"Expected data to be a tuple of (inputs_dict, targets_dict)."
)
inputs, targets = data
# Open a GradientTape to record operations
# for automatic differentiation.
with GradientTape() as tape:
# 1. Forward pass to get model outputs
# The `call` method returns a dict:
# {'subs_pred', 'gwl_pred', 'pde_residual'}
outputs = self(inputs, training=True)
# Structure predictions to match the
# 'loss' dictionary from compile()
y_pred_for_loss = {
'subs_pred': outputs['subs_pred'],
'gwl_pred': outputs['gwl_pred']
}
# 2. Calculate Data Fidelity Loss (L_data)
# Keras's self.compute_loss handles the dictionary of losses,
# targets, predictions, and loss_weights automatically.
data_loss = self.compute_loss(
x=inputs, y=targets, y_pred=y_pred_for_loss
)
# 3. Calculate Physics Residual Loss (L_pde)
# We penalize the mean of the squared residual to force it to zero.
pde_residual = outputs['pde_residual']
loss_pde = tf_reduce_mean(tf_square(pde_residual))
# 4. Combine losses to form the Total Loss
# This is the final scalar loss that will be differentiated.
total_loss = data_loss + self.lambda_pde * loss_pde
# 5. Compute and Apply Gradients
# This includes the model's layers and the learnable `log_C_coefficient`.
trainable_vars = self.trainable_variables
gradients = tape.gradient(total_loss, trainable_vars)
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# 6. Update and Return Metrics
# Update the metrics passed in compile()
# (e.g., 'mae', 'rmse' for each output)
self.compiled_metrics.update_state(targets, y_pred_for_loss)
# Build a dictionary of results to be displayed by Keras.
results = {m.name: m.result() for m in self.metrics}
results.update({
"total_loss": total_loss, # The main loss we optimized
"data_loss": data_loss, # The part of the loss from data
"physics_loss": loss_pde, # The part of the loss from physics
})
return results
def _normalize_C_descriptor(
self,
raw: Union[LearnableC, FixedC, DisabledC, str, float, None]
):
"""
Internal helper: turn the user‐passed `raw` into exactly one of
our three classes (LearnableC, FixedC, DisabledC).
Raises
------
ValueError
If `raw` is an unrecognized string or negative float.
TypeError
If `raw` is not one of the expected types.
"""
# 1) Already one of our classes?
if isinstance(raw, (LearnableC, FixedC, DisabledC)):
return raw
# 2) If user passed a bare float or int: treat as FixedC(value=raw)
if isinstance(raw, (float, int)):
if raw < 0:
raise ValueError(
"Numeric pinn_coefficient_C must"
f" be non‐negative, got {raw}"
)
# Nonzero means 'fixed to that value'
return FixedC(value=float(raw))
# 3) If user passed a string, allow the legacy
# values "learnable" or "fixed"
if isinstance(raw, str):
low = raw.strip().lower()
if low == "learnable":
# Default initial value = 0.01 is built into LearnableC
return LearnableC(initial_value=0.01)
if low == "fixed":
# Default fixed value = 1.0
return FixedC(value=1.0)
if low in ("none", "disabled", "off"):
return DisabledC()
raise ValueError(
f"Unrecognized pinn_coefficient_C string: '{raw}'. "
"Expected 'learnable', 'fixed', 'none', or use"
" a LearnableC/FixedC/DisabledC instance."
)
# 4) If user passed None, treat as DisabledC()
if raw is None:
return DisabledC()
raise TypeError(
f"pinn_coefficient_C must be LearnableC, FixedC, DisabledC, "
f"str, float, or None; got {type(raw).__name__}."
)
def _build_pinn_components(self):
"""
Instantiates components required for the physics‐informed module.
Specifically, sets up how we obtain C:
- If LearnableC, create a trainable variable log_C.
- If FixedC, store a lambda that returns a fixed tf.constant(value).
- If DisabledC, store a lambda that returns tf.constant(1.0).
"""
# Check which descriptor we have:
desc = self.pinn_coefficient_C_config
if isinstance(desc, LearnableC):
# We learn log(C) so that C = exp(log_C) is always > 0
self.log_C_coefficient = self.add_weight(
name="log_pinn_coefficient_C",
shape=(), # scalar
initializer=Constant(tf_log(desc.initial_value)),
trainable=True
)
self._get_C = lambda: tf_exp(self.log_C_coefficient)
elif isinstance(desc, FixedC):
# Fixed value, non‐trainable
val = desc.value
self._get_C = lambda: tf_constant(val, dtype=tf_float32)
elif isinstance(desc, DisabledC):
# Physics disabled => C internally 1.0 but not used if lambda_pde==0
# in compile()
self._get_C = lambda: tf_constant(1.0, dtype=tf_float32)
else:
# Should never happen if _normalize_C_descriptor is correct
raise RuntimeError(
"Internal error: pinn_coefficient_C_config is not a recognized type."
)
[docs]
def get_pinn_coefficient_C(self) -> Tensor:
"""Returns the physical coefficient C."""
return self._get_C()
[docs]
def get_config(self):
"""Returns the full configuration of the PIHALNet model."""
# Get the config from the base class first
base_config = super().get_config()
# Add PIHALNet-specific parameters
base_config.update({
"output_subsidence_dim": self.output_subsidence_dim,
"output_gwl_dim": self.output_gwl_dim,
"pde_mode": self.pde_modes_active,
"pinn_coefficient_C": self.pinn_coefficient_C_config,
"gw_flow_coeffs": self.gw_flow_coeffs_config,
"name": self.name # for consistency
})
# The base config already contains all other necessary params
return base_config
[docs]
@classmethod
def from_config(cls, config, custom_objects=None):
"""
Instantiates the class from a configuration dictionary.
This method is a factory method for creating an instance of the class
from a dictionary containing the configuration. The `output_dim` is
removed from the configuration since it is computed dynamically based
on other parameters (e.g., `output_subsidence_dim + output_gwl_dim`),
ensuring that it is not redundant in the configuration.
Parameters
----------
config : dict
A dictionary containing the configuration parameters for the class.
The dictionary should contain all the required parameters except for
`output_dim`, which is computed internally.
custom_objects : dict, optional
A dictionary of custom objects that might be needed to interpret
the configuration, such as custom layers, activation functions,
or other components.
Returns
-------
object
An instance of the class, initialized with the parameters from
the `config` dictionary.
Notes
-----
- The method removes the `'output_dim'` key from the configuration
because it is derived from the sum of `output_subsidence_dim` and
`output_gwl_dim`, and including it would result in redundancy.
- This method is typically used for loading a model or initializing
an object from a saved configuration, which is common in model
serialization/deserialization.
Example
-------
>>> config = {
>>> 'output_subsidence_dim': 2,
>>> 'output_gwl_dim': 3,
>>> 'other_param': 42
>>> }
>>> model = MyModel.from_config(config)
>>> print(model.output_dim) # Will be computed as 5, not directly passed.
"""
if 'output_dim' in config:
config.pop('output_dim') # Remove 'output_dim' since it is computed
# from output_subsidence_dim + output_gwl_dim.
return cls(**config)
PIHALNet.__doc__+=r"""\n
The architecture can operate in two modes for its physical coefficient “C”:
1. **Parameter Specification:** Use a user-supplied constant value.
2. **Parameter Discovery:** Treat the coefficient as trainable (default),
learning log(C) during training to ensure positivity.
PIHALNet’s total loss is a weighted sum of the data fidelity loss on
subsidence/GWL predictions and a physics residual loss (PDE loss).
Formulation
~~~~~~~~~~~~
Given inputs :math:`\mathbf{x}_{\text{static}}`, :math:`\mathbf{x}_{\text{dyn}}`,
and (optionally) :math:`\mathbf{x}_{\text{fut}}`, PIHALNet produces multi-horizon
predictions::
:math:`\hat{s}[t+h],\; \hat{h}[t+h] \quad (h=1,\dots,H),`
for subsidence :math:`s` and GWL :math:`h`. The data loss
(:math:`L_{\text{data}}`) is::
.. math::
L_{\text{data}} = \sum_{h=1}^H
\bigl\{ \ell\bigl( \hat{s}[t+h], s[t+h]\bigr)
+ \ell\bigl( \hat{h}[t+h], h[t+h]\bigr) \bigr\},
where :math:`\ell` is typically MSE or MAE. The PDE residual loss
(:math:`L_{\text{pde}}`) for Terzaghi’s consolidation equation is::
.. math::
L_{\text{pde}} =
\frac{1}{H} \sum_{h=1}^H \bigl\| C \, \Delta s[t+h]
- \frac{\partial h}{\partial t}[t+h] \bigr\|^2,
computed via finite differences on the sequence of mean predictions.
The total loss is::
.. math::
L_{\text{total}} = L_{\text{data}}
+ \lambda_{\text{pde}} \, L_{\text{pde}},
where :math:`\lambda_{\text{pde}}` is a user-defined weight.
Parameters
----------
static_input_dim : int
Dimensionality of static (time-invariant) feature vector. Must be
:math:`\geq 0`. If zero, static inputs are omitted.
dynamic_input_dim : int
Dimensionality of the historical dynamic feature vector at each time
step. Must be :math:`\geq 1`.
future_input_dim : int
Dimensionality of known future covariates at each forecast step. Must
be :math:`\geq 0`. If zero, no future covariates are used.
output_subsidence_dim : int, default 1
Number of simultaneous subsidence targets (usually 1). Must be
:math:`\geq 1`.
output_gwl_dim : int, default 1
Number of simultaneous groundwater-level targets (usually 1). Must be
:math:`\geq 1`.
embed_dim : int, default 32
Size of the embedding after initial feature processing (VSN/GRN).
Controls hidden dimension for attention and LSTM inputs.
hidden_units : int, default 64
Number of hidden units in the Gated Residual Networks (GRNs) and
Dense layers. Must be :math:`\geq 1`.
lstm_units : int, default 64
Number of units in each LSTM cell. Must be :math:`\geq 1`.
For multi-scale LSTM, this is the base size at each scale.
attention_units : int, default 32
Number of units in multi-head attention and Hierarchical Attention
layers. Must be :math:`\geq 1`.
num_heads : int, default 4
Number of attention heads in all multi-head attention modules.
Must be :math:`\geq 1`.
dropout_rate : float, default 0.1
Dropout rate applied in various layers (VSN GRNs, attention heads,
final MLP). Must be in :math:`[0.0, 1.0]`.
forecast_horizon : int, default 1
Number of future time steps to predict. Must be :math:`\geq 1`.
Multi-horizon predictions are produced for
:math:`h=1,\dots,\text{forecast_horizon}`.
quantiles : list of float, optional
If provided, PIHALNet will output quantile forecasts at each horizon.
Each quantile dimension produces an additional branch. If *None*, only
mean predictions are used for PDE residual (physics) loss.
max_window_size : int, default 10
Maximum time-window length for DynamicTimeWindow layer. Must be
:math:`\geq 1`. Controls the longest subsequence of historical dynamic
features used at each decoding step.
memory_size : int, default 100
Size of the memory bank for MemoryAugmentedAttention. Must be
:math:`\geq 1`.
scales : list of int or “auto”, optional
Scales used in MultiScaleLSTM. If “auto”, scales are chosen
automatically based on forecast_horizon. Otherwise, each scale must
divide the forecast_horizon. Example: :math:`[1, 3, 6]` for a 6-step
horizon.
multi_scale_agg : {“last”, “average”, “flatten”, “auto”}, default “last”
Aggregation method for multi-scale outputs:
- “last”: take last time-step output from each scale.
- “average”: average outputs over time.
- “flatten”: concatenate outputs over time.
- “auto”: choose “last” by default.
final_agg : {“last”, “average”, “flatten”}, default “last”
Aggregation method after DynamicTimeWindow:
- “last”: use final time-step.
- “average”: average over windows.
- “flatten”: flatten all window outputs.
activation : str or callable, default “relu”
Activation function for all GRNs, Dense layers, and VSNs.
If a string, must be one of Keras built-ins (e.g. “relu”, “gelu”).
use_residuals : bool, default True
If True, apply a residual connection via a Dense layer to embeddings.
use_batch_norm : bool, default False
If True, apply LayerNormalization after each Dense/GRN block.
pde_mode : str or list of str or None, default “consolidation”
Determines which PDE component(s) to include in the physics loss:
- “consolidation”: solve Terzaghi’s consolidation only (1-D vertical).
- “gw_flow”: solve coupled groundwater flow (reserved for future release).
- “both”: include both consolidation and gw_flow (reserved).
- “none”: disable physics loss entirely.
If a list is provided, only those modes are active.
“consolidation” is enforced by default for this release.
pinn_coefficient_C : str, float, or None, default “learnable”
Configuration for consolidation coefficient :math:`C`:
- “learnable”: estimate :math:`\log(C)` as a trainable scalar.
- float (:math:`>0`): use this fixed constant.
- None: disable physics entirely (:math:`C=1` but unused).
gw_flow_coeffs : dict or None, default None
Dictionary of groundwater-flow coefficients:
- “K”: hydraulic conductivity (:math:`>0`), default “learnable”.
- “Ss”: specific storage (:math:`>0`), default “learnable”.
- “Q”: source/sink term, default 0.0.
Only used if “gw_flow” is in pde_mode.
use_vsn : bool, default True
If True, apply VariableSelectionNetwork blocks to static, dynamic,
and future inputs. If False, skip VSN and use simple dense projections.
vsn_units : int or None, default None
Output dimension of each VSN block. If None, defaults to hidden_units.
name : str, default “PIHALNet”
Keras model name.
**kwargs
Additional keyword arguments forwarded to tf.keras.Model initializer.
Attributes
----------
static_vsn : VariableSelectionNetwork or None
VSN block for static features (if use_vsn=True and static_input_dim>0).
Otherwise None.
dynamic_vsn : VariableSelectionNetwork or None
VSN block for dynamic features (if use_vsn=True).
future_vsn : VariableSelectionNetwork or None
VSN block for future features (if use_vsn=True and future_input_dim>0).
multi_modal_embedding : MultiModalEmbedding or None
Fallback embedding layer if VSNs are disabled or incomplete.
multi_scale_lstm : MultiScaleLSTM
LSTM module that operates at multiple temporal scales.
hierarchical_attention : HierarchicalAttention
Performs hierarchical attention over dynamic/future features.
cross_attention : CrossAttention
Cross-attends dynamic features with fused embeddings.
memory_augmented_attention : MemoryAugmentedAttention
Attention mechanism with an external memory bank.
dynamic_time_window : DynamicTimeWindow
Splits attention-fused features into overlapping windows.
multi_decoder : MultiDecoder
Produces final multi-horizon mean forecasts for combined targets.
quantile_distribution_modeling : QuantileDistributionModeling or None
If quantiles is not None, applies quantile modeling over decoder outputs.
positional_encoding : PositionalEncoding
Adds positional embeddings to fused features.
learned_normalization : LearnedNormalization
Optional normalization layer applied to raw inputs or VSN outputs.
log_C_coefficient : tf.Variable or None
If pinn_coefficient_C == “learnable”, stores :math:`\log(C)`. Otherwise None.
log_K_gwflow_var : tf.Variable or None
Trainable log(K) for groundwater flow, if enabled.
log_Ss_gwflow_var : tf.Variable or None
Trainable log(Ss) for groundwater flow, if enabled.
Q_gwflow_var : tf.Variable or None
Trainable or fixed Q term for groundwater flow, if enabled.
Methods
-------
call(inputs, training=False) -> dict[str, tf.Tensor]
Forward pass computing predictions and PDE residual:
1. Process inputs via process_pinn_inputs.
2. Validate tensor shapes (via validate_model_inputs).
3. Extract features with build_halnet_layers and attention/LSTM.
4. Decode multi-horizon mean predictions via multi_decoder.
5. If quantiles is provided, produce quantile outputs.
6. Split outputs into subs_pred, gwl_pred, plus mean for PDE.
7. Compute PDE residual via compute_consolidation_residual.
Returns a dict containing:
- “subs_pred”: subsidence forecasts (with quantiles if requested).
- “gwl_pred”: GWL forecasts (with quantiles if requested).
- “pde_residual”: tensor of physics residuals.
compile(optimizer, loss, metrics=None, loss_weights=None, lambda_pde=1.0, **kwargs)
Configures the model for training with a composite PINN loss.
Args:
optimizer : Keras optimizer instance (e.g. Adam).
loss : dict mapping output names (“subs_pred”, “gwl_pred”) to Keras loss
functions or string identifiers (e.g. “mse”).
metrics : dict mapping output names to lists of metrics to track.
loss_weights : dict mapping output names to scalar weights for data loss.
lambda_pde : float, weight for physics residual loss. Defaults to 1.0.
**kwargs : Additional args for tf.keras.Model.compile.
train_step(data: tuple) -> dict[str, tf.Tensor]
Custom training step to handle the composite PINN loss.
1. Unpack (inputs_dict, targets_dict) from data.
2. Forward pass with self(inputs, training=True).
3. Extract subs_pred, gwl_pred for data loss.
4. Compute data loss via self.compute_loss(x, y, y_pred).
5. Compute physics loss: :math:`\mathrm{MSE}(\text{pde_residual})`.
6. Total loss = data_loss + lambda_pde * physics_loss.
7. Compute/apply gradients via optimizer.
8. Update compiled metrics.
Returns a dict of results:
- “loss” (total PINN loss), “data_loss”, “physics_loss”,
plus any compiled metrics (e.g. “subs_mae”, “gwl_mae”).
get_pinn_coefficient_C() -> tf.Tensor or None
Returns the positive consolidation coefficient :math:`C`:
- If “learnable”, returns :math:`\exp(\log_C_coefficient)`.
- If fixed float was provided, returns that constant tensor.
- If disabled, returns :math:`1.0` if only consolidation is active, else None.
get_K_gwflow(), get_Ss_gwflow(), get_Q_gwflow() -> tf.Tensor or None
Return positive hydraulic conductivity :math:`K`, specific storage
:math:`S_s`, and source/sink term :math:`Q` for groundwater flow PDE,
if “gw_flow” mode is active. If not, return None.
get_config() -> dict
Returns a dict of all initialization arguments (static, dynamic, future
dims; PINN coefficients; architectural HPs). Enables model saving/loading
via tf.keras.models.clone_model.
from_config(config: dict, custom_objects=None) -> PIHALNet
Reconstructs a PIHALNet instance from get_config() output.
run_halnet_core(static_input, dynamic_input, future_input, training) -> tf.Tensor
Executes the core data-driven feature pipeline:
- Applies VSNs (if enabled) or Dense to each input block.
- Aligns future features via align_temporal_dimensions.
- Concatenates dynamic + future embeddings (or uses MME if no VSN).
- Applies positional encoding and optional residual connection.
- Runs MultiScaleLSTM, hierarchical/cross/memory-attention, and fusion.
- Returns final features to feed into MultiDecoder.
split_outputs(predictions_combined, decoded_outputs_for_mean) -> tuple
Separates combined predictions into subsidence and GWL components:
- predictions_combined: may include a quantile dimension (Rank 4) or be
Rank 3 if only mean forecasts. Splits along last axis into subsidence
and GWL for data loss.
- decoded_outputs_for_mean: always Rank 3 (Batch, Horizon, CombinedDim)
before quantile modeling. Splits into s_pred_mean_for_pde and
gwl_pred_mean_for_pde for physics residual calculation.
Notes
-----
- If quantiles is provided, final outputs shape is
(batch_size, horizon, num_quantiles, combined_output_dim).
Otherwise shape is (batch_size, horizon, combined_output_dim).
- Consolidation residual uses finite differences along horizon steps
to approximate :math:`\partial h / \partial t` and :math:`\Delta s`.
- Groundwater-flow PDE is reserved for a future release (“gw_flow” mode).
- VariableSelectionNetworks (VSNs) refine feature selection. If
use_vsn=False, simple Dense projections are used instead.
- MultiScaleLSTM can process temporal patterns at different resolutions.
Scales must divide the forecast horizon if not “auto”.
Examples
--------
# 1) Instantiate PIHALNet for point forecasts (no quantiles):
>>> from fusionlab.nn.pinn.models import PIHALNet
>>> model = PIHALNet(
... static_input_dim=5,
... dynamic_input_dim=3,
... future_input_dim=2,
... output_subsidence_dim=1,
... output_gwl_dim=1,
... embed_dim=32,
... hidden_units=64,
... lstm_units=64,
... attention_units=32,
... num_heads=4,
... dropout_rate=0.1,
... forecast_horizon=1,
... quantiles=None,
... max_window_size=10,
... memory_size=100,
... scales='auto',
... multi_scale_agg='last',
... final_agg='last',
... activation='relu',
... use_residuals=True,
... use_batch_norm=False,
... pde_mode='consolidation',
... pinn_coefficient_C='learnable',
... gw_flow_coeffs=None,
... use_vsn=True,
... vsn_units=None,
... )
>>> model.compile(
... optimizer='adam',
... loss={'subs_pred': 'mse', 'gwl_pred': 'mse'},
... metrics={'subs_pred': ['mae'], 'gwl_pred': ['mae']},
... loss_weights={'subs_pred': 1.0, 'gwl_pred': 0.8},
... lambda_pde=0.5,
... )
>>> import numpy as np
>>> inputs = {
... 'coords': np.zeros((2, 2), dtype='float32'),
... 'static_features': np.zeros((2, 5), dtype='float32'),
... 'dynamic_features': np.zeros((2, 1, 3), dtype='float32'),
... 'future_features': np.zeros((2, 1, 2), dtype='float32'),
... }
>>> targets = {
... 'subs_pred': np.zeros((2, 1, 1), dtype='float32'),
... 'gwl_pred': np.zeros((2, 1, 1), dtype='float32'),
... }
>>> outputs = model(inputs, training=False)
>>> print(outputs['subs_pred'].shape, outputs['gwl_pred'].shape)
(2, 1, 1) (2, 1, 1)
# 2) Instantiate with quantile forecasting (e.g., [0.1, 0.5, 0.9]):
>>> model_q = PIHALNet(
... static_input_dim=5,
... dynamic_input_dim=3,
... future_input_dim=2,
... output_subsidence_dim=1,
... output_gwl_dim=1,
... embed_dim=32,
... hidden_units=64,
... lstm_units=64,
... attention_units=32,
... num_heads=4,
... dropout_rate=0.1,
... forecast_horizon=3,
... quantiles=[0.1, 0.5, 0.9],
... max_window_size=10,
... memory_size=100,
... scales=[1, 3],
... multi_scale_agg='average',
... final_agg='flatten',
... activation='gelu',
... use_residuals=True,
... use_batch_norm=True,
... pde_mode='consolidation',
... pinn_coefficient_C=0.02, # fixed C = 0.02
... gw_flow_coeffs={'K': 1e-4, 'Ss': 1e-5, 'Q': 0.0},
... use_vsn=False,
... vsn_units=None,
... )
>>> model_q.compile(
... optimizer='adam',
... loss={'subs_pred': 'mse', 'gwl_pred': 'mse'},
... metrics={'subs_pred': ['mae'], 'gwl_pred': ['mae']},
... loss_weights={'subs_pred': 1.0, 'gwl_pred': 0.8},
... lambda_pde=1.0,
... )
>>> outputs_q = model_q(inputs, training=False)
>>> # Since quantiles=3, final outputs have shape (2, 3, 3, 2):
>>> print(outputs_q['subs_pred'].shape, outputs_q['gwl_pred'].shape)
(2, 3, 3, 1) (2, 3, 3, 1)
See Also
--------
fusionlab.nn.pinn.tuning.PIHALTuner
Hyperparameter tuner specifically built for PIHALNet.
fusionlab.nn.pinn.utils.process_pinn_inputs
Preprocessing of nested input dict to tensors.
fusionlab.nn.pinn._tensor_validation.validate_model_inputs
Ensures dynamic/static/future tensors match declared dims.
References
----------
.. [1] Raissi, M., Perdikaris, P., & Karniadakis, G.E. (2019).
*Physics-informed neural networks: A deep learning framework
for solving forward and inverse problems involving nonlinear
partial differential equations*. Journal of Computational Physics,
378, 686–707.
.. [2] Karniadakis, G.E., Kevrekidis, I.G., Lu, L., Perdikaris, P.,
Wang, S., & Yang, L. (2021). *Physics-informed machine learning*.
Nature Reviews Physics, 3(6), 422–440.
.. [3] Heng, M.H., Chen, W., & Smith, E.C. (2022). *Joint modeling of
land subsidence and groundwater levels with PINNs*. Environmental
Modelling & Software, 150, 105347.
"""