# -*- coding: utf-8 -*-
# License: BSD-3-Clause
# Author: LKouadio <etanoyau@gmail.com>
"""
Legacy Physics-Informed Hybrid Attentive LSTM Network (PiHALNet).
"""
from textwrap import dedent # noqa
from numbers import Real, Integral
from typing import List, Optional, Union, Dict, Tuple
import warnings
from ..._fusionlog import fusionlog, OncePerMessageFilter
from ...api.property import NNLearner
from ...core.handlers import param_deprecated_message
from ...compat.sklearn import validate_params, Interval, StrOptions
from ...params import LearnableC, FixedC, DisabledC
from ...utils.deps_utils import ensure_pkg
from ...utils.generic_utils import select_mode, rename_dict_keys
from .. import KERAS_DEPS, KERAS_BACKEND, dependency_message
if KERAS_BACKEND:
from .._tensor_validation import validate_model_inputs
from .._tensor_validation import check_inputs
from ..utils import set_default_params
from ..components import (
Activation,
CrossAttention,
DynamicTimeWindow,
GatedResidualNetwork,
HierarchicalAttention,
MemoryAugmentedAttention,
MultiDecoder,
MultiResolutionAttentionFusion,
MultiScaleLSTM,
QuantileDistributionModeling,
VariableSelectionNetwork,
PositionalEncoding,
aggregate_time_window_output,
aggregate_multiscale_on_3d
)
from .op import process_pinn_inputs, compute_consolidation_residual
from .utils import process_pde_modes
LSTM = KERAS_DEPS.LSTM
Dense = KERAS_DEPS.Dense
LayerNormalization = KERAS_DEPS.LayerNormalization
Model= KERAS_DEPS.Model
Tensor=KERAS_DEPS.Tensor
Variable =KERAS_DEPS.Variable
Add =KERAS_DEPS.Add
Constant =KERAS_DEPS.Constant
GradientTape =KERAS_DEPS.GradientTape
register_keras_serializable=KERAS_DEPS.register_keras_serializable
tf_zeros_like= KERAS_DEPS.zeros_like
tf_zeros =KERAS_DEPS.zeros
tf_reduce_mean =KERAS_DEPS.reduce_mean
tf_square =KERAS_DEPS.square
tf_constant =KERAS_DEPS.constant
tf_log = KERAS_DEPS.log
tf_expand_dims = KERAS_DEPS.expand_dims
tf_tile = KERAS_DEPS.tile
tf_concat = KERAS_DEPS.concat
tf_shape = KERAS_DEPS.shape
tf_float32=KERAS_DEPS.float32
tf_exp =KERAS_DEPS.exp
tf_rank =KERAS_DEPS.rank
tf_assert_equal = KERAS_DEPS.assert_equal
tf_convert_to_tensor =KERAS_DEPS.convert_to_tensor
tf_autograph=KERAS_DEPS.autograph
tf_autograph.set_verbosity(0)
DEP_MSG = dependency_message('nn.pinn.models')
logger = fusionlog().get_fusionlab_logger(__name__)
logger.addFilter(OncePerMessageFilter())
__all__ =["PiHALNet"]
[docs]
@register_keras_serializable('fusionlab.nn.pinn', name="PIHALNet")
@param_deprecated_message(
conditions_params_mappings=[
{
'param': 'pde_mode', # The __init__ parameter name
'condition': lambda p_value: (
p_value == 'gw_flow' or
p_value == 'both' or
p_value == 'none' or
(isinstance(p_value, list) and
any(mode in ['gw_flow', 'both', 'none'] for mode in p_value) and
not ('consolidation' in p_value and len(p_value) == 1)
)
),
'message': (
"Warning: The 'pde_mode' parameter received a value that "
"includes options ('gw_flow', 'both', 'none') which are "
"intended for future development or are not the primary "
"focus of the current physics-informed implementation. "
"This version of PiHALNet is optimized for and defaults to "
"'consolidation' mode to ensure robust physical constraints "
"based on Terzaghi's theory with finite differences. "
"The model will proceed using 'consolidation' mode. Full "
"support for other PDE modes and their specific derivative "
"requirements will be explored in future releases."
),
'default': 'consolidation',
}
],
warning_category=UserWarning
)
class PiHALNet(Model, NNLearner):
"""
Physics-Informed Hybrid Attentive LSTM Network (PiHALNet).
This model integrates a data-driven forecasting architecture, based on
LSTMs and multiple attention mechanisms, with physics-informed
constraints derived from the governing equations of land subsidence.
It is designed to produce physically consistent, multi-horizon
probabilistic forecasts for both subsidence and groundwater levels,
while also offering the capability to discover physical parameters
from observational data.
The architecture can operate in two modes for its physical
coefficients:
1. **Parameter Specification:** Use predefined physical constants.
2. **Parameter Discovery:** Treat physical constants as trainable
variables to be learned during training (default).
"""
[docs]
@validate_params({
"static_input_dim": [Interval(Integral, 0, None, closed='left')],
"dynamic_input_dim": [Interval(Integral, 1, None, closed='left')],
"future_input_dim": [Interval(Integral, 0, None, closed='left')],
"output_subsidence_dim": [Interval(Integral, 1, None, closed='left')],
"output_gwl_dim": [Interval(Integral, 1, None, closed='left')],
"embed_dim": [Interval(Integral, 1, None, closed='left')],
"hidden_units": [Interval(Integral, 1, None, closed='left')],
"lstm_units": [Interval(Integral, 1, None, closed='left'), None],
"attention_units": [Interval(Integral, 1, None, closed='left')],
"num_heads": [Interval(Integral, 1, None, closed='left')],
"dropout_rate": [Interval(Real, 0, 1, closed="both")],
"forecast_horizon": [Interval(Integral, 1, None, closed='left')],
"quantiles": ['array-like', StrOptions({'auto'}), None],
"max_window_size": [Interval(Integral, 1, None, closed='left')],
"memory_size": [Interval(Integral, 1, None, closed='left')],
"scales": ['array-like', StrOptions({'auto'}), None],
"multi_scale_agg": [StrOptions({
"last", "average", "flatten", "auto", "sum", "concat"}), None],
"final_agg": [StrOptions({"last", "average", "flatten"})],
"activation": [str, callable],
"use_residuals": [bool],
"use_batch_norm": [bool],
"pde_mode": [
StrOptions({'consolidation', 'gw_flow', 'both', 'none'}),
'array-like', None
],
"pinn_coefficient_C": [
str, Real, None, StrOptions({"learnable"}),
LearnableC, FixedC, DisabledC
],
"mode": [
StrOptions({'tft', 'pihal', 'tft_like', 'pihal_like',
"tft-like", "pihal-like"}),
None
],
"gw_flow_coeffs": [dict, type(None)],
'use_vsn': [bool, int],
'vsn_units': [Interval(Integral, 0, None, closed="left"), None]
})
@ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG)
def __init__(
self,
static_input_dim: int,
dynamic_input_dim: int,
future_input_dim: int,
output_subsidence_dim: int = 1,
output_gwl_dim: int = 1,
embed_dim: int = 32,
hidden_units: int = 64,
lstm_units: int = 64,
attention_units: int = 32,
num_heads: int = 4,
dropout_rate: float = 0.1,
forecast_horizon: int = 1,
quantiles: Optional[List[float]] = None,
max_window_size: int = 10,
memory_size: int = 100,
scales: Optional[List[int]] = None,
multi_scale_agg: str = 'last',
final_agg: str = 'last',
activation: str = 'relu',
use_residuals: bool = True,
use_batch_norm: bool = False,
pde_mode: Union[str, List[str], None] = 'consolidation',
pinn_coefficient_C: Union[
LearnableC, FixedC, DisabledC, str, float, None
] = LearnableC(initial_value=0.01),
gw_flow_coeffs: Optional[Dict[str, Union[str, float, None]]] = None,
use_vsn: bool = True,
vsn_units: Optional[int] = None,
mode: Optional[str]=None,
name: str = "PiHALNet",
**kwargs
):
super().__init__(name=name, **kwargs)
self.static_input_dim = static_input_dim
self.dynamic_input_dim = dynamic_input_dim
self.future_input_dim = future_input_dim
self.output_subsidence_dim = output_subsidence_dim
self.output_gwl_dim = output_gwl_dim
self._combined_output_target_dim = (
output_subsidence_dim + output_gwl_dim
)
self.embed_dim = embed_dim
self.hidden_units = hidden_units
self.lstm_units = lstm_units
self.attention_units = attention_units
self.num_heads = num_heads
self.dropout_rate = dropout_rate
self.forecast_horizon = forecast_horizon
self.max_window_size = max_window_size
self.memory_size = memory_size
self.final_agg = final_agg
self.activation_fn_str = Activation(activation).activation_str
self.use_residuals = use_residuals
self.use_batch_norm = use_batch_norm
self.mode = select_mode (mode ) # default to pihal-like
self.use_vsn = use_vsn
self.vsn_units = vsn_units if vsn_units is not None else self.hidden_units
(self.quantiles, self.scales,
self.lstm_return_sequences) = set_default_params(
quantiles, scales, multi_scale_agg
)
self.multi_scale_agg_mode = multi_scale_agg
# --- Store PINN Configuration ---
self.pde_modes_active = process_pde_modes(
pde_mode, enforce_consolidation=True,
solo_return=True,
)
# Normalize pinn_coefficient_C into one of our helper classes or legacy:
self.pinn_coefficient_C_config = self._normalize_C_descriptor(
pinn_coefficient_C)
self.gw_flow_coeffs_config = (
gw_flow_coeffs if gw_flow_coeffs is not None else {}
)
# ------------------------------------------------------------------
# PIHALNet is designed for 1-D consolidation only. If the user
# supplies coefficients for the transient 2-D/3-D groundwater-flow
# equation (K, Ss, Q) they probably meant to instantiate
# TransFlowSubsNet, which couples consolidation and groundwater flow.
# ------------------------------------------------------------------
if self.gw_flow_coeffs_config:
warnings.warn(
"PIHALNet ignores groundwater-flow coefficients "
"(K, Ss, Q). The model minimises only the 1-D "
"consolidation residual. For joint head–subsidence "
"modelling use `TransFlowSubsNet` instead.",
UserWarning,
stacklevel=2,
)
self._build_halnet_layers()
self._build_pinn_components()
[docs]
def run_halnet_core(
self,
static_input: Tensor,
dynamic_input: Tensor,
future_input: Tensor,
training: bool
) -> Tensor:
r"""
Execute the core **encoder–decoder** data‑driven pipeline of
:class:`~fusionlab.nn.pinn.PIHALNet`.
Executes data-driven pipeline with flexible encoder-decoder logic.
The method ingests *static*, *dynamic* (past), and *future*
covariates, passes them through Variable Selection Networks
(VSNs) or dense projections, and then processes them with a
multi‑scale LSTM encoder and a hierarchical attention‑augmented
decoder to produce a single latent vector per sample. This
vector is subsequently fed to the model’s task‑specific output
head (not shown here).
Parameters
----------
static_input : Tensor
Tensor of shape ``(B, S)`` containing time‑invariant
features such as lithology or well depth, where *B* is the
batch size and *S* is ``static_input_dim``.
dynamic_input : Tensor
Past time‑series of length :math:`T_\mathrm{past}` with
shape ``(B, T_past, D_in)``. Typical examples are
historical groundwater levels or precipitation.
future_input : Tensor
Known future covariates of length
:pyattr:`forecast_horizon` with shape
``(B, T_future, F_in)``.
training : bool
Flag forwarded to Keras layers to enable dropout and other
training‑only behaviour.
Returns
-------
Tensor
A 2‑D tensor of shape ``(B, A)``, where *A* is
``attention_units``. This latent representation encodes the
fused historical context, static descriptors, and known
future information.
Notes
-----
* If :pyattr:`use_vsn` is *True*, each input type is first passed
through a Variable Selection Network that outputs both
feature‑wise importance weights and transformed features.
* Duplicate temporal resolutions produced by
:pyclass:`~fusionlab.layers.MultiScaleLSTM` are aggregated with
:pyfunc:`fusionlab.ops.aggregate_multiscale_on_3d`.
* Duplicate residual connections follow the original TFT design
but employ GRN‑based :math:`\mathrm{Add}\!\!+\!\!
\mathrm{LayerNorm}` blocks for improved stability.
"""
time_steps = tf_shape(dynamic_input)[1]
# 1. Initial Feature Processing
static_context, dyn_proc, fut_proc = None, dynamic_input, future_input
if self.use_vsn:
if self.static_vsn is not None:
vsn_static_out = self.static_vsn(
static_input, training=training)
static_context = self.static_vsn_grn(
vsn_static_out, training=training)
if self.dynamic_vsn is not None:
dyn_context = self.dynamic_vsn(
dynamic_input, training=training
)
dyn_proc = self.dynamic_vsn_grn(
dyn_context, training=training
)
if self.future_vsn is not None:
fut_context = self.future_vsn(
future_input, training=training
)
fut_proc = self.future_vsn_grn(
fut_context, training=training
)
else: # Non-VSN path
if self.static_dense is not None:
processed_static = self.static_dense(static_input)
# Note: here the GRN's output dim might differ from the
# VSN path. This is handled by the decoder_input_projection.
static_context = self.grn_static_non_vsn(
processed_static, training=training)
dyn_proc = self.dynamic_dense(dynamic_input)
fut_proc = self.future_dense(future_input)
logger.debug(f"Shape after VSN/initial processing: "
f"Dynamic={getattr(dyn_proc, 'shape', 'N/A')}, "
f"Future={getattr(fut_proc, 'shape', 'N/A')}")
logger.debug(f"Initial processed shapes: Dynamic={dyn_proc.shape}, "
f"Future={fut_proc.shape}")
# 2. Encoder Path
encoder_input_parts = [dyn_proc]
if self.mode == 'tft_like':
# For TFT mode, slice historical part of future features
# and add to the encoder input.
fut_enc_proc = fut_proc[:, :time_steps, :]
encoder_input_parts.append(fut_enc_proc)
encoder_raw = tf_concat(encoder_input_parts, axis=-1)
encoder_input = self.encoder_positional_encoding(encoder_raw)
lstm_out = self.multi_scale_lstm(
encoder_input, training=training
)
encoder_sequences = aggregate_multiscale_on_3d(
lstm_out, mode='concat')
if self.dynamic_time_window is not None:
encoder_sequences = self.dynamic_time_window(
encoder_sequences, training=training)
logger.debug(f"Encoder sequences shape: {encoder_sequences.shape}")
# 3. Decoder Path
if self.mode == 'tft_like':
# For TFT mode, slice the forecast part of future features.
fut_dec_proc = fut_proc[:, time_steps:, :]
else: # For pihal_like mode, use the whole future tensor.
fut_dec_proc = fut_proc
decoder_parts = []
if static_context is not None:
static_expanded = tf_expand_dims(static_context, 1)
static_expanded = tf_tile(
static_expanded, [1, self.forecast_horizon, 1])
decoder_parts.append(static_expanded)
if self.future_input_dim > 0:
future_with_pos = self.decoder_positional_encoding(
fut_dec_proc)
decoder_parts.append(future_with_pos)
if not decoder_parts:
batch_size = tf_shape(dynamic_input)[0]
raw_decoder_input = tf_zeros(
(batch_size, self.forecast_horizon, self.attention_units))
else:
raw_decoder_input = tf_concat(decoder_parts, axis=-1)
# Project the raw decoder input to a consistent feature dimension.
projected_decoder_input = self.decoder_input_projection(
raw_decoder_input)
logger.debug(f"Projected decoder input shape: "
f"{projected_decoder_input.shape}")
# --- 4. Attention-based Fusion (Encoder-Decoder Interaction) ---
cross_attention_output = self.cross_attention(
[projected_decoder_input, encoder_sequences], training=training)
cross_attention_processed = self.attention_processing_grn(
cross_attention_output, training=training)
# First residual connection within the decoder block.
if self.use_residuals and self.decoder_add_norm is not None:
decoder_context_with_attention = self.decoder_add_norm[0](
[projected_decoder_input, cross_attention_processed])
decoder_context_with_attention = self.decoder_add_norm[1](
decoder_context_with_attention)
else:
# If no residual, the processed attention output becomes the context.
decoder_context_with_attention = cross_attention_processed
hierarchical_att_output = self.hierarchical_attention(
[decoder_context_with_attention, decoder_context_with_attention],
training=training
)
memory_attention_output = self.memory_augmented_attention(
hierarchical_att_output, training=training
)
# --- 5. Final Feature Combination for Decoding ---
final_features = self.multi_resolution_attention_fusion(
memory_attention_output, training=training
)
if self.use_residuals and self.final_add_norm:
# The `residual_base` must have the same dimension as `final_features`
residual_base = self.residual_dense(
decoder_context_with_attention)
final_features = self.final_add_norm[0](
[final_features, residual_base])
final_features = self.final_add_norm[1](final_features)
logger.debug(f"Shape after final fusion: {final_features.shape}")
# Collapse the time dimension to get a single vector per sample.
final_features_for_decode = aggregate_time_window_output(
final_features, self.final_agg
)
logger.debug(f"Final features for decoder shape: "
f"{final_features_for_decode.shape}")
return final_features_for_decode
[docs]
def split_outputs(
self,
predictions_combined: Tensor,
decoded_outputs_for_mean: Tensor
) -> Tuple[Tensor, Tensor, Tensor, Tensor]:
r"""
Separate the **combined output tensor** into individual
subsidence and groundwater‑level (GWL) components and return
both the *final* and *mean* predictions needed for the two loss
terms used in PIHALNet (data loss and physics/PDE loss).
The method supports two output shapes:
* **Quantile mode**
``(B, H, Q, C)`` where *Q* is the number of quantiles and
*C* = ``output_subsidence_dim + output_gwl_dim``.
* **Deterministic mode**
``(B, H, C)`` when quantiles are disabled.
Parameters
----------
predictions_combined : Tensor
Network output after the
:class:`~fusionlab.nn.pinn.QuantileDistributionModeling`
stage. Shape is ``(B, H, C)`` or ``(B, H, Q, C)``.
decoded_outputs_for_mean : Tensor
Decoder output *before* quantile distribution, used to
compute the PDE residual. Shape is ``(B, H, C)``.
training : bool, optional
*Inherited from the calling context.* Present only in
TensorFlow graph mode; not used explicitly here.
Returns
-------
s_pred_final : Tensor
Subsidence predictions ready for the data‑fidelity loss.
Shape matches ``predictions_combined`` minus the *C* split.
gwl_pred_final : Tensor
GWL predictions ready for the data‑fidelity loss.
s_pred_mean_for_pde : Tensor
Mean (deterministic) subsidence predictions used when
computing physics‑based derivatives.
gwl_pred_mean_for_pde : Tensor
Mean GWL predictions for the PDE residual term.
Notes
-----
* Mean predictions are extracted *only* from
``decoded_outputs_for_mean`` because applying the quantile
mapping first would break the differentiability required for
spatial–temporal derivatives.
* When TensorFlow executes in graph mode and the rank of
*predictions_combined* is dynamic, the function falls back to
:pyfunc:`tf.rank` for shape inspection.
Examples
--------
>>> outputs = model(...) # forward pass
>>> s_final, gwl_final, s_mean, gwl_mean = (
... model.split_outputs(
... predictions_combined=outputs["pred"],
... decoded_outputs_for_mean=outputs["dec_mean"],
... )
... )
>>> s_final.shape
TensorShape([32, 24, 3]) # e.g. B=32, H=24, Q=3
>>> gwl_mean.shape
TensorShape([32, 24, 1]) # deterministic mean
See Also
--------
fusionlab.nn.pinn.QuantileDistributionModeling :
Layer that adds the quantile dimension.
fusionlab.nn.pinn.PIHALNet.run_halnet_core :
Produces ``decoded_outputs_for_mean``.
"""
# --- 1. Extract Mean Predictions (for PDE Loss) ---
# These come from the decoder output *before* quantile distribution.
# This provides a stable point forecast for derivative calculation.
s_pred_mean_for_pde = decoded_outputs_for_mean[
..., :self.output_subsidence_dim
]
gwl_pred_mean_for_pde = decoded_outputs_for_mean[
..., self.output_subsidence_dim:
]
# --- 2. Extract Final Predictions (for Data Loss) ---
# These may or may not include a quantile dimension.
# We check the tensor's rank to decide how to slice.
# Keras may return a known static rank during build time,
# or we can use tf.rank for dynamic graph execution.
if self.quantiles and hasattr(
predictions_combined, 'shape') and len(
predictions_combined.shape) == 4:
# Case: Quantiles are present.
# Shape is (Batch, Horizon, NumQuantiles, CombinedOutputDim)
s_pred_final = predictions_combined[
..., :self.output_subsidence_dim
]
gwl_pred_final = predictions_combined[
..., self.output_subsidence_dim:
]
elif (
hasattr(predictions_combined, 'shape')
and len(predictions_combined.shape) == 3
):
# Case: No quantiles. Shape is (Batch, Horizon, CombinedOutputDim)
s_pred_final = predictions_combined[
..., :self.output_subsidence_dim
]
gwl_pred_final = predictions_combined[
..., self.output_subsidence_dim:
]
else:
# This case handles dynamic shapes during graph execution
# and acts as a fallback.
if self.quantiles and tf_rank(predictions_combined) == 4:
s_pred_final = predictions_combined[..., :self.output_subsidence_dim]
gwl_pred_final = predictions_combined[..., self.output_subsidence_dim:]
elif tf_rank(predictions_combined) == 3:
s_pred_final = predictions_combined[..., :self.output_subsidence_dim]
gwl_pred_final = predictions_combined[..., self.output_subsidence_dim:]
else:
# This case should ideally not be reached if QDM is consistent
raise ValueError(
f"Unexpected shape from QuantileDistributionModeling: "
f"Rank is {tf_rank(predictions_combined)}"
)
return (s_pred_final, gwl_pred_final,
s_pred_mean_for_pde, gwl_pred_mean_for_pde)
[docs]
@tf_autograph.experimental.do_not_convert
def call(
self,
inputs: Dict[str, Optional[Tensor]],
training: bool = False
) -> Dict[str, Tensor]:
r"""
Forward pass for :class:`~fusionlab.nn.pinn.PIHALNet`.
This method orchestrates the full **physics‑informed** workflow:
1. Validate and split the input dictionary into static, dynamic,
future, and coordinate tensors.
2. Run the HALNet encoder–decoder core to obtain a latent
representation.
3. Produce mean forecasts with the multi‑horizon decoder.
4. Optionally expand those means into quantile predictions.
5. Separate combined outputs into subsidence and GWL streams for
both data‑loss and physics‑loss branches.
6. Compute the consolidation PDE residual on the mean series.
7. Return a dictionary ready for the model’s composite loss.
Parameters
----------
inputs : dict[str, Tensor]
Dictionary containing at least the following keys
(created by :pyfunc:`fusionlab.nn.pinn.process_pinn_inputs`):
* ``'coords'`` – tensor ``(B, H, 3)`` with
*(t, x, y)* coordinates.
* ``'static_features'`` – tensor ``(B, S)``.
* ``'dynamic_features'`` – tensor ``(B, T_past, D)``.
* ``'future_features'`` – tensor ``(B, H, F)``.
training : bool, default=False
Standard Keras flag indicating training or inference mode.
Returns
-------
dict[str, Tensor]
* ``'subs_pred'`` – subsidence predictions, shape
``(B, H, Q, O_s)`` or ``(B, H, O_s)``.
* ``'gwl_pred'`` – GWL predictions, same layout for *O_g*.
* ``'pde_residual'`` – physics residual,
shape ``(B, H, 1)`` (all zeros if *H = 1*).
Notes
-----
* Quantile outputs are produced only when the model’s
``quantiles`` attribute is not *None*.
* The PDE residual is based on a discrete‑time consolidation
equation evaluated with finite differences; therefore a
forecast horizon > 1 is required.
Examples
--------
>>> out = pihalnet(
... {
... "coords": coords,
... "static_features": s,
... "dynamic_features": d,
... "future_features": f,
... },
... training=True,
... )
>>> out["subs_pred"].shape
TensorShape([32, 24, 3, 1]) # B=32, H=24, Q=3, O_s=1
>>> out["pde_residual"].shape
TensorShape([32, 24, 1])
See Also
--------
fusionlab.nn.pinn.PIHALNet.run_halnet_core :
Feature‑extraction backbone called internally.
fusionlab.nn.pinn.PIHALNet.split_outputs :
Helper that separates subsidence and GWL channels.
"""
# 1. Unpack and Validate Inputs
# - Process and Validate All Inputs ---
# The `process_pinn_inputs` helper unpacks the input dict and
# isolates the coordinate tensors for later use.
logger.debug("PIHALNet call: Processing PINN inputs.")
t, x, y, static_features, dynamic_features, future_features = \
process_pinn_inputs(inputs, mode='auto')
# basic tensors checks.
check_inputs(
dynamic_inputs= dynamic_features,
static_inputs= static_features,
future_inputs= future_features,
dynamic_input_dim= self.dynamic_input_dim,
static_input_dim = self.static_input_dim,
future_input_dim= self.future_input_dim,
forecast_horizon= self.forecast_horizon,
verbose=0 # Set to >0 for detailed logging from checks
)
# `validate_model_inputs` can provide a secondary, more detailed
# check on the unpacked feature tensors.
static_p, dynamic_p, future_p = validate_model_inputs(
inputs=[static_features, dynamic_features, future_features],
static_input_dim=self.static_input_dim,
dynamic_input_dim=self.dynamic_input_dim,
future_covariate_dim=self.future_input_dim,
#forecast_horizon=self.forecast_horizon,
mode='strict',
verbose=0 # Set to 1 for more detailed logging from validator
)
logger.debug(
"Input shapes after validation:"
f" S={getattr(static_p, 'shape', 'None')}, "
f"D={getattr(dynamic_p, 'shape', 'None')},"
f" F={getattr(future_p, 'shape', 'None')}"
)
# *** Validate future_p shape based on mode ***
if self.mode == 'tft_like':
expected_future_span = self.max_window_size + self.forecast_horizon
else: # pihal_like
expected_future_span = self.forecast_horizon
actual_future_span = tf_shape(future_p)[1]
expected_span_tensor = tf_convert_to_tensor(
expected_future_span, dtype=actual_future_span.dtype)
tf_assert_equal(
actual_future_span, expected_span_tensor,
message=(
f"Incorrect 'future_features' tensor length for "
f"mode='{self.mode}'. Expected time dimension of "
f"{expected_future_span}, but got {actual_future_span}."
)
)
# --- 2. Run Core Data-Driven Feature Extraction ---
# This self-contained method performs the complex feature engineering
# using LSTMs and attention mechanisms.
logger.debug("Running HALNet core for feature extraction.")
final_features_for_decode = self.run_halnet_core(
static_input=static_p,
dynamic_input=dynamic_p,
future_input=future_p,
training=training
)
logger.debug(
f"Shape of features for decoder: {final_features_for_decode.shape}"
)
# --- 3. Generate Predictions ---
# Get mean predictions (for PDE) from the multi-horizon decoder
decoded_outputs = self.multi_decoder(
final_features_for_decode, training=training
)
logger.debug(f"Shape of decoded outputs (means): {decoded_outputs.shape}")
# Get final predictions (potentially with quantiles, for data loss)
predictions_final_targets = decoded_outputs
if self.quantiles is not None:
predictions_final_targets = self.quantile_distribution_modeling(
decoded_outputs, training=training
)
logger.debug(
f"Shape of final quantile outputs: {predictions_final_targets.shape}"
)
# --- 4. Split and Organize Outputs ---
# Use helper to separate subsidence and GWL predictions for both
# data loss (quantiles) and physics loss (mean).
(s_pred_final, gwl_pred_final,
s_pred_mean_for_pde, gwl_pred_mean_for_pde) = self.split_outputs(
predictions_combined=predictions_final_targets,
decoded_outputs_for_mean=decoded_outputs
)
# --- 5. Calculate Physics Residual ---
# Let's verify the shape and slice if needed
# (this is a patch, not the ideal fix)
if t.shape[1] != self.forecast_horizon:
# This indicates a data pipeline issue, but we can patch it here
# by assuming the last `forecast_horizon` steps of `t` are the correct ones.
# This assumption may be incorrect depending on your sequence prep.
t_for_pde = t[:, -self.forecast_horizon:, :]
# A better approach is to fix the data pipeline so `inputs['coords']`
# has the shape (batch, forecast_horizon, 3) from the start.
else:
t_for_pde = t
# The PDE residual is calculated on the mean predictions using
# finite differences, which is suitable for sequence outputs.
# This does NOT require a GradientTape in the call method.
logger.debug("Computing PDE residual from mean predictions.")
if self.forecast_horizon > 1:
pde_residual = compute_consolidation_residual(
s_pred=s_pred_mean_for_pde,
h_pred=gwl_pred_mean_for_pde,
time_steps=t_for_pde, # `t` holds the forecast time sequence
C=self.get_pinn_coefficient_C()
)
else:
# Cannot compute discrete time derivative with a single point
logger.warning(
"Forecast horizon is 1, cannot compute finite-difference "
"based PDE residual. Returning zeros."
)
pde_residual = tf_zeros_like(s_pred_mean_for_pde)
logger.debug(f"Shape of PDE residual: {pde_residual.shape}")
# --- 6. Return All Components for Loss Calculation ---
return {
"subs_pred": s_pred_final,
"gwl_pred": gwl_pred_final,
"pde_residual": pde_residual,
}
[docs]
def compile(
self,
optimizer,
loss,
metrics=None,
loss_weights=None,
lambda_pde: float = 1.0,
**kwargs,
):
r"""
Configure PIHALNet for training with a **composite PINN loss**.
The total loss optimised during :pyfunc:`train_step`
is
.. math::
L_\text{total} \;=\;
\sum_i w_i\,L_{\text{data},i}
+ \lambda_\text{pde}\,L_\text{pde}
where *i* indexes the outputs (subsidence, GWL).
Parameters
----------
optimizer : tf.keras.optimizers.Optimizer | str
Optimiser instance or Keras alias (e.g., ``"adam"``).
loss : dict
Mapping ``{"subs_pred": loss_fn, "gwl_pred": loss_fn}``.
Each value can be a Keras loss object or a string such
as ``"mse"``.
metrics : dict, optional
Mapping from output keys to a list of Keras metric objects
(or their aliases) that will be tracked during training and
evaluation.
loss_weights : dict, optional
Scalar weights :math:`w_i` for each *data* loss term.
Defaults to 1 for every output.
lambda_pde : float, default=1.0
Weight applied to :math:`L_\text{pde}` (physics residual).
**kwargs
Additional keywords forwarded to
:pyfunc:`tf.keras.Model.compile`.
Notes
-----
* The physics‑residual term is added manually in
:pyfunc:`train_step`; therefore ``lambda_pde`` is stored as an
attribute rather than passed to ``loss_weights``.
"""
# Call the parent's compile method. It will handle the setup of
# losses, metrics, and weights for our named outputs.
super().compile(
optimizer=optimizer,
loss=loss,
metrics=metrics,
loss_weights=loss_weights,
**kwargs,
)
# Store the PINN-specific loss weight
self.lambda_pde = lambda_pde
[docs]
def train_step(self, data: Tuple[Dict, Dict]) -> Dict[str, Tensor]:
r"""
Single optimisation step implementing the composite loss.
The procedure is:
1. Forward pass → ``self.call`` to obtain
``{"subs_pred", "gwl_pred", "pde_residual"}``.
2. Compute data‑fidelity loss via
:pyfunc:`tf.keras.Model.compute_loss`.
3. Compute physics residual loss
:math:`L_\text{pde} = \langle r^2\rangle` where
``r = outputs["pde_residual"]``.
4. Form
:math:`L_\text{total}=L_\text{data} +
\lambda_\text{pde}\,L_\text{pde}` and back‑propagate.
5. Update Keras metrics and return a results dictionary.
Parameters
----------
data : tuple(dict, dict)
Tuple ``(inputs, targets)`` produced by the data pipeline.
Returns
-------
dict[str, Tensor]
Includes user‑defined metrics plus ``"total_loss"``,
``"data_loss"``, and ``"physics_loss"``.
Raises
------
ValueError
If *data* is not a two‑element tuple of dictionaries.
"""
# Unpack the data. Keras provides it as a tuple of (inputs, targets).
# We expect both to be dictionaries.
if not isinstance(data, tuple) or len(data) != 2:
raise ValueError(
"Expected data to be a tuple of (inputs_dict, targets_dict)."
)
inputs, targets = data
if isinstance (targets, dict):
# For consistency, map targets if users explicitely
# provide 'subsidence', and 'gwl' as keys .
targets = rename_dict_keys(
targets.copy(),
param_to_rename={
"subsidence": "subs_pred",
"gwl": "gwl_pred"
}
)
# Open a GradientTape to record operations
# for automatic differentiation.
with GradientTape() as tape:
# 1. Forward pass to get model outputs
# The `call` method returns a dict:
# {'subs_pred', 'gwl_pred', 'pde_residual'}
outputs = self(inputs, training=True)
# Structure predictions to match the
# 'loss' dictionary from compile()
y_pred_for_loss = {
'subs_pred': outputs['subs_pred'],
'gwl_pred': outputs['gwl_pred']
}
# 2. Calculate Data Fidelity Loss (L_data)
# Keras's self.compute_loss handles the dictionary of losses,
# targets, predictions, and loss_weights automatically.
data_loss = self.compute_loss(
x=inputs, y=targets, y_pred=y_pred_for_loss
)
# 3. Calculate Physics Residual Loss (L_pde)
# We penalize the mean of the squared residual to force it to zero.
pde_residual = outputs['pde_residual']
loss_pde = tf_reduce_mean(tf_square(pde_residual))
# 4. Combine losses to form the Total Loss
# This is the final scalar loss that will be differentiated.
total_loss = data_loss + self.lambda_pde * loss_pde
# 5. Compute and Apply Gradients
# This includes the model's layers and the learnable `log_C_coefficient`.
trainable_vars = self.trainable_variables
gradients = tape.gradient(total_loss, trainable_vars)
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# 6. Update and Return Metrics
# Update the metrics passed in compile()
# (e.g., 'mae', 'rmse' for each output)
self.compiled_metrics.update_state(targets, y_pred_for_loss)
# Build a dictionary of results to be displayed by Keras.
results = {m.name: m.result() for m in self.metrics}
results.update({
"total_loss": total_loss, # The main loss we optimized
"data_loss": data_loss, # The part of the loss from data
"physics_loss": loss_pde, # The part of the loss from physics
})
return results
def _normalize_C_descriptor(
self,
raw: Union[LearnableC, FixedC, DisabledC, str, float, None]
):
"""
Internal helper: turn the user‐passed `raw` into exactly one of
our three classes (LearnableC, FixedC, DisabledC).
Raises
------
ValueError
If `raw` is an unrecognized string or negative float.
TypeError
If `raw` is not one of the expected types.
"""
# 1) Already one of our classes?
if isinstance(raw, (LearnableC, FixedC, DisabledC)):
return raw
# 2) If user passed a bare float or int: treat as FixedC(value=raw)
if isinstance(raw, (float, int)):
if raw < 0:
raise ValueError(
"Numeric pinn_coefficient_C must"
f" be non‐negative, got {raw}"
)
# Nonzero means 'fixed to that value'
return FixedC(value=float(raw))
# 3) If user passed a string, allow the legacy
# values "learnable" or "fixed"
if isinstance(raw, str):
low = raw.strip().lower()
if low == "learnable":
# Default initial value = 0.01 is built into LearnableC
return LearnableC(initial_value=0.01)
if low == "fixed":
# Default fixed value = 1.0
return FixedC(value=1.0)
if low in ("none", "disabled", "off"):
return DisabledC()
raise ValueError(
f"Unrecognized pinn_coefficient_C string: '{raw}'. "
"Expected 'learnable', 'fixed', 'none', or use"
" a LearnableC/FixedC/DisabledC instance."
)
# 4) If user passed None, treat as DisabledC()
if raw is None:
return DisabledC()
raise TypeError(
f"pinn_coefficient_C must be LearnableC, FixedC, DisabledC, "
f"str, float, or None; got {type(raw).__name__}."
)
def _build_pinn_components(self):
"""
Instantiates components required for the physics‐informed module.
Specifically, sets up how we obtain C:
- If LearnableC, create a trainable variable log_C.
- If FixedC, store a lambda that returns a fixed tf.constant(value).
- If DisabledC, store a lambda that returns tf.constant(1.0).
"""
# Check which descriptor we have:
desc = self.pinn_coefficient_C_config
if isinstance(desc, LearnableC):
# We learn log(C) so that C = exp(log_C) is always > 0
self.log_C_coefficient = self.add_weight(
name="log_pinn_coefficient_C",
shape=(), # scalar
initializer=Constant(tf_log(desc.initial_value)),
trainable=True
)
self._get_C = lambda: tf_exp(self.log_C_coefficient)
elif isinstance(desc, FixedC):
# Fixed value, non‐trainable
val = desc.value
self._get_C = lambda: tf_constant(val, dtype=tf_float32)
elif isinstance(desc, DisabledC):
# Physics disabled => C internally 1.0 but not used if lambda_pde==0
# in compile()
self._get_C = lambda: tf_constant(1.0, dtype=tf_float32)
else:
# Should never happen if _normalize_C_descriptor is correct
raise RuntimeError(
"Internal error: pinn_coefficient_C_config is not a recognized type."
)
[docs]
def get_pinn_coefficient_C(self) -> Tensor:
"""Returns the physical coefficient C."""
return self._get_C()
def _build_halnet_layers(self):
"""Instantiates all layers for the HALNet architecture.
This method sets up all necessary components, including conditional
Variable Selection Networks (VSNs) and the core layers for the
encoder-decoder structure like LSTMs and attention mechanisms.
"""
# --- 1. Variable Selection Networks (Conditional) ---
# These layers are created only if `self.use_vsn` is True.
# They serve to select the most relevant input features and embed
# them into a common feature space.
if self.use_vsn:
if self.static_input_dim > 0:
self.static_vsn = VariableSelectionNetwork(
num_inputs=self.static_input_dim,
units=self.vsn_units,
dropout_rate=self.dropout_rate,
name="static_vsn"
)
self.static_vsn_grn = GatedResidualNetwork(
units=self.hidden_units,
dropout_rate=self.dropout_rate,
name="static_vsn_grn"
)
else:
self.static_vsn, self.static_vsn_grn = None, None
if self.dynamic_input_dim > 0:
self.dynamic_vsn = VariableSelectionNetwork(
num_inputs=self.dynamic_input_dim,
units=self.vsn_units,
dropout_rate=self.dropout_rate,
use_time_distributed=True,
name="dynamic_vsn"
)
self.dynamic_vsn_grn = GatedResidualNetwork(
units=self.embed_dim,
dropout_rate=self.dropout_rate,
name="dynamic_vsn_grn"
)
else:
self.dynamic_vsn, self.dynamic_vsn_grn = None, None
if self.future_input_dim > 0:
self.future_vsn = VariableSelectionNetwork(
num_inputs=self.future_input_dim,
units=self.vsn_units,
dropout_rate=self.dropout_rate,
use_time_distributed=True,
name="future_vsn"
)
self.future_vsn_grn = GatedResidualNetwork(
units=self.embed_dim,
dropout_rate=self.dropout_rate,
name="future_vsn_grn"
)
else:
self.future_vsn, self.future_vsn_grn = None, None
else:
# If not using VSNs, ensure all related attributes are None.
self.static_vsn, self.static_vsn_grn = None, None
self.dynamic_vsn, self.dynamic_vsn_grn = None, None
self.future_vsn, self.future_vsn_grn = None, None
# --- 2. Shared & Non-VSN Path Layers ---
# These layers are essential for processing attention outputs or
# initial features if VSNs are disabled.
# This GRN is used to process static features (if not using VSN)
# and to refine the output of the cross-attention layer. Its
# output dimension is set to `attention_units` for consistency
# within the attention block.
self.attention_processing_grn = GatedResidualNetwork(
units=self.attention_units,
dropout_rate=self.dropout_rate,
activation=self.activation_fn_str,
name="attention_processing_grn"
)
# This layer projects the combined decoder context into a
# consistent feature space (`attention_units`) before it's used
# in attention mechanisms and residual connections.
self.decoder_input_projection = Dense(
self.attention_units,
activation=self.activation_fn_str,
name="decoder_input_projection"
)
# These layers are only created if VSN is NOT used.
if not self.use_vsn:
if self.static_input_dim > 0:
self.static_dense = Dense(
self.hidden_units, activation=self.activation_fn_str
)
# This GRN is specifically for the non-VSN static path. Its
# dimensionality matches the static context (`hidden_units`).
self.grn_static_non_vsn = GatedResidualNetwork(
units=self.hidden_units,
dropout_rate=self.dropout_rate,
activation=self.activation_fn_str,
name="grn_static_non_vsn"
)
else:
self.static_dense = None
self.grn_static_non_vsn = None
# Create dense layers for dynamic and future features
# for non-VSN path
self.dynamic_dense = Dense(self.embed_dim)
self.future_dense = Dense(self.embed_dim)
else:
self.static_dense =None
self.grn_static_non_vsn = None
self.dynamic_dense =None
self.future_dense = None
# --- 3. Core Architectural Layers ---
# self.positional_encoding = PositionalEncoding()
# *** FIX: Create two separate instances of PositionalEncoding ***
self.encoder_positional_encoding = PositionalEncoding(
name="encoder_pos_encoding")
self.decoder_positional_encoding = PositionalEncoding(
name="decoder_pos_encoding")
self.multi_scale_lstm = MultiScaleLSTM(
lstm_units=self.lstm_units,
scales=self.scales,
return_sequences=True # Critical for the encoder path
)
self.hierarchical_attention = HierarchicalAttention(
units=self.attention_units,
num_heads=self.num_heads
)
self.cross_attention = CrossAttention(
units=self.attention_units,
num_heads=self.num_heads
)
self.memory_augmented_attention = MemoryAugmentedAttention(
units=self.attention_units,
memory_size=self.memory_size,
num_heads=self.num_heads
)
self.multi_resolution_attention_fusion = \
MultiResolutionAttentionFusion(
units=self.attention_units,
num_heads=self.num_heads
)
self.dynamic_time_window = DynamicTimeWindow(
max_window_size=self.max_window_size
)
self.multi_decoder = MultiDecoder(
output_dim=self._combined_output_target_dim,
num_horizons=self.forecast_horizon
)
self.quantile_distribution_modeling = QuantileDistributionModeling(
quantiles=self.quantiles,
output_dim=self._combined_output_target_dim
)
# --- 4. Layers for Residual Connections (Conditional) ---
# Instantiate Add and LayerNormalization layers here to avoid
# re-creation inside the `call` method, which is incompatible
# with tf.function.
if self.use_residuals:
self.residual_dense = Dense(self.attention_units)
# Layers for the first residual connection in the decoder
self.decoder_add_norm = [Add(), LayerNormalization()]
# Layers for the final residual connection
self.final_add_norm = [Add(), LayerNormalization()]
else:
self.residual_dense = None
self.decoder_add_norm = None
self.final_add_norm = None
[docs]
def get_config(self):
config = super().get_config()
base_config = {
'static_input_dim': self.static_input_dim,
'dynamic_input_dim': self.dynamic_input_dim,
'future_input_dim': self.future_input_dim,
'output_subsidence_dim': self.output_subsidence_dim,
'output_gwl_dim': self.output_gwl_dim,
'embed_dim': self.embed_dim,
'hidden_units': self.hidden_units,
'lstm_units': self.lstm_units,
'attention_units': self.attention_units,
'num_heads': self.num_heads,
'dropout_rate': self.dropout_rate,
'forecast_horizon': self.forecast_horizon,
'quantiles': self.quantiles,
'max_window_size': self.max_window_size,
'memory_size': self.memory_size,
'scales': self.scales,
'multi_scale_agg': self.multi_scale_agg_mode,
'final_agg': self.final_agg,
'activation': self.activation_fn_str,
'use_residuals': self.use_residuals,
'use_batch_norm': self.use_batch_norm,
'pinn_coefficient_C': self.pinn_coefficient_C_config,
'use_vsn': self.use_vsn,
'vsn_units': self.vsn_units,
'name': self.name,
'mode': self.mode,
'pde_mode': self.pde_modes_active,
'gw_flow_coeffs': self.gw_flow_coeffs_config,
}
config.update(base_config)
return config
[docs]
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
PiHALNet.__doc__+=r"""\n
The architecture can operate in two modes for its physical coefficient “C”:
1. **Parameter Specification:** Use a user-supplied constant value.
2. **Parameter Discovery:** Treat the coefficient as trainable (default),
learning log(C) during training to ensure positivity.
PIHALNet’s total loss is a weighted sum of the data fidelity loss on
subsidence/GWL predictions and a physics residual loss (PDE loss).
Formulation
~~~~~~~~~~~~
Given inputs :math:`\mathbf{x}_{\text{static}}`, :math:`\mathbf{x}_{\text{dyn}}`,
and (optionally) :math:`\mathbf{x}_{\text{fut}}`, PIHALNet produces multi-horizon
predictions::
:math:`\hat{s}[t+h],\; \hat{h}[t+h] \quad (h=1,\dots,H),`
for subsidence :math:`s` and GWL :math:`h`. The data loss
(:math:`L_{\text{data}}`) is::
.. math::
L_{\text{data}} = \sum_{h=1}^H
\bigl\{ \ell\bigl( \hat{s}[t+h], s[t+h]\bigr)
+ \ell\bigl( \hat{h}[t+h], h[t+h]\bigr) \bigr\},
where :math:`\ell` is typically MSE or MAE. The PDE residual loss
(:math:`L_{\text{pde}}`) for Terzaghi’s consolidation equation is::
.. math::
L_{\text{pde}} =
\frac{1}{H} \sum_{h=1}^H \bigl\| C \, \Delta s[t+h]
- \frac{\partial h}{\partial t}[t+h] \bigr\|^2,
computed via finite differences on the sequence of mean predictions.
The total loss is::
.. math::
L_{\text{total}} = L_{\text{data}}
+ \lambda_{\text{pde}} \, L_{\text{pde}},
where :math:`\lambda_{\text{pde}}` is a user-defined weight.
Parameters
----------
static_input_dim : int
Dimensionality of static (time-invariant) feature vector. Must be
:math:`\geq 0`. If zero, static inputs are omitted.
dynamic_input_dim : int
Dimensionality of the historical dynamic feature vector at each time
step. Must be :math:`\geq 1`.
future_input_dim : int
Dimensionality of known future covariates at each forecast step. Must
be :math:`\geq 0`. If zero, no future covariates are used.
output_subsidence_dim : int, default 1
Number of simultaneous subsidence targets (usually 1). Must be
:math:`\geq 1`.
output_gwl_dim : int, default 1
Number of simultaneous groundwater-level targets (usually 1). Must be
:math:`\geq 1`.
embed_dim : int, default 32
Size of the embedding after initial feature processing (VSN/GRN).
Controls hidden dimension for attention and LSTM inputs.
hidden_units : int, default 64
Number of hidden units in the Gated Residual Networks (GRNs) and
Dense layers. Must be :math:`\geq 1`.
lstm_units : int, default 64
Number of units in each LSTM cell. Must be :math:`\geq 1`.
For multi-scale LSTM, this is the base size at each scale.
attention_units : int, default 32
Number of units in multi-head attention and Hierarchical Attention
layers. Must be :math:`\geq 1`.
num_heads : int, default 4
Number of attention heads in all multi-head attention modules.
Must be :math:`\geq 1`.
dropout_rate : float, default 0.1
Dropout rate applied in various layers (VSN GRNs, attention heads,
final MLP). Must be in :math:`[0.0, 1.0]`.
forecast_horizon : int, default 1
Number of future time steps to predict. Must be :math:`\geq 1`.
Multi-horizon predictions are produced for
:math:`h=1,\dots,\text{forecast_horizon}`.
quantiles : list of float, optional
If provided, PIHALNet will output quantile forecasts at each horizon.
Each quantile dimension produces an additional branch. If *None*, only
mean predictions are used for PDE residual (physics) loss.
max_window_size : int, default 10
Maximum time-window length for DynamicTimeWindow layer. Must be
:math:`\geq 1`. Controls the longest subsequence of historical dynamic
features used at each decoding step.
memory_size : int, default 100
Size of the memory bank for MemoryAugmentedAttention. Must be
:math:`\geq 1`.
scales : list of int or “auto”, optional
Scales used in MultiScaleLSTM. If “auto”, scales are chosen
automatically based on forecast_horizon. Otherwise, each scale must
divide the forecast_horizon. Example: :math:`[1, 3, 6]` for a 6-step
horizon.
multi_scale_agg : {“last”, “average”, “flatten”, “auto”}, default “last”
Aggregation method for multi-scale outputs:
- “last”: take last time-step output from each scale.
- “average”: average outputs over time.
- “flatten”: concatenate outputs over time.
- “auto”: choose “last” by default.
final_agg : {“last”, “average”, “flatten”}, default “last”
Aggregation method after DynamicTimeWindow:
- “last”: use final time-step.
- “average”: average over windows.
- “flatten”: flatten all window outputs.
activation : str or callable, default “relu”
Activation function for all GRNs, Dense layers, and VSNs.
If a string, must be one of Keras built-ins (e.g. “relu”, “gelu”).
use_residuals : bool, default True
If True, apply a residual connection via a Dense layer to embeddings.
use_batch_norm : bool, default False
If True, apply LayerNormalization after each Dense/GRN block.
pde_mode : str or list of str or None, default “consolidation”
Determines which PDE component(s) to include in the physics loss:
- “consolidation”: solve Terzaghi’s consolidation only (1-D vertical).
- “gw_flow”: solve coupled groundwater flow (reserved for future release).
- “both”: include both consolidation and gw_flow (reserved).
- “none”: disable physics loss entirely.
If a list is provided, only those modes are active.
“consolidation” is enforced by default for this release.
pinn_coefficient_C : str, float, or None, default “learnable”
Configuration for consolidation coefficient :math:`C`:
- “learnable”: estimate :math:`\log(C)` as a trainable scalar.
- float (:math:`>0`): use this fixed constant.
- None: disable physics entirely (:math:`C=1` but unused).
gw_flow_coeffs : dict or None, default None
Dictionary of groundwater-flow coefficients:
- “K”: hydraulic conductivity (:math:`>0`), default “learnable”.
- “Ss”: specific storage (:math:`>0`), default “learnable”.
- “Q”: source/sink term, default 0.0.
Only used if “gw_flow” is in pde_mode.
use_vsn : bool, default True
If True, apply VariableSelectionNetwork blocks to static, dynamic,
and future inputs. If False, skip VSN and use simple dense projections.
vsn_units : int or None, default None
Output dimension of each VSN block. If None, defaults to hidden_units.
name : str, default “PIHALNet”
Keras model name.
**kwargs
Additional keyword arguments forwarded to tf.keras.Model initializer.
Attributes
----------
static_vsn : VariableSelectionNetwork or None
VSN block for static features (if use_vsn=True and static_input_dim>0).
Otherwise None.
dynamic_vsn : VariableSelectionNetwork or None
VSN block for dynamic features (if use_vsn=True).
future_vsn : VariableSelectionNetwork or None
VSN block for future features (if use_vsn=True and future_input_dim>0).
multi_modal_embedding : MultiModalEmbedding or None
Fallback embedding layer if VSNs are disabled or incomplete.
multi_scale_lstm : MultiScaleLSTM
LSTM module that operates at multiple temporal scales.
hierarchical_attention : HierarchicalAttention
Performs hierarchical attention over dynamic/future features.
cross_attention : CrossAttention
Cross-attends dynamic features with fused embeddings.
memory_augmented_attention : MemoryAugmentedAttention
Attention mechanism with an external memory bank.
dynamic_time_window : DynamicTimeWindow
Splits attention-fused features into overlapping windows.
multi_decoder : MultiDecoder
Produces final multi-horizon mean forecasts for combined targets.
quantile_distribution_modeling : QuantileDistributionModeling or None
If quantiles is not None, applies quantile modeling over decoder outputs.
positional_encoding : PositionalEncoding
Adds positional embeddings to fused features.
learned_normalization : LearnedNormalization
Optional normalization layer applied to raw inputs or VSN outputs.
log_C_coefficient : tf.Variable or None
If pinn_coefficient_C == “learnable”, stores :math:`\log(C)`. Otherwise None.
log_K_gwflow_var : tf.Variable or None
Trainable log(K) for groundwater flow, if enabled.
log_Ss_gwflow_var : tf.Variable or None
Trainable log(Ss) for groundwater flow, if enabled.
Q_gwflow_var : tf.Variable or None
Trainable or fixed Q term for groundwater flow, if enabled.
Methods
-------
call(inputs, training=False) -> dict[str, tf.Tensor]
Forward pass computing predictions and PDE residual:
1. Process inputs via process_pinn_inputs.
2. Validate tensor shapes (via validate_model_inputs).
3. Extract features with build_halnet_layers and attention/LSTM.
4. Decode multi-horizon mean predictions via multi_decoder.
5. If quantiles is provided, produce quantile outputs.
6. Split outputs into subs_pred, gwl_pred, plus mean for PDE.
7. Compute PDE residual via compute_consolidation_residual.
Returns a dict containing:
- “subs_pred”: subsidence forecasts (with quantiles if requested).
- “gwl_pred”: GWL forecasts (with quantiles if requested).
- “pde_residual”: tensor of physics residuals.
compile(optimizer, loss, metrics=None, loss_weights=None, lambda_pde=1.0, **kwargs)
Configures the model for training with a composite PINN loss.
Args:
optimizer : Keras optimizer instance (e.g. Adam).
loss : dict mapping output names (“subs_pred”, “gwl_pred”) to Keras loss
functions or string identifiers (e.g. “mse”).
metrics : dict mapping output names to lists of metrics to track.
loss_weights : dict mapping output names to scalar weights for data loss.
lambda_pde : float, weight for physics residual loss. Defaults to 1.0.
**kwargs : Additional args for tf.keras.Model.compile.
train_step(data: tuple) -> dict[str, tf.Tensor]
Custom training step to handle the composite PINN loss.
1. Unpack (inputs_dict, targets_dict) from data.
2. Forward pass with self(inputs, training=True).
3. Extract subs_pred, gwl_pred for data loss.
4. Compute data loss via self.compute_loss(x, y, y_pred).
5. Compute physics loss: :math:`\mathrm{MSE}(\text{pde_residual})`.
6. Total loss = data_loss + lambda_pde * physics_loss.
7. Compute/apply gradients via optimizer.
8. Update compiled metrics.
Returns a dict of results:
- “loss” (total PINN loss), “data_loss”, “physics_loss”,
plus any compiled metrics (e.g. “subs_mae”, “gwl_mae”).
get_pinn_coefficient_C() -> tf.Tensor or None
Returns the positive consolidation coefficient :math:`C`:
- If “learnable”, returns :math:`\exp(\log_C_coefficient)`.
- If fixed float was provided, returns that constant tensor.
- If disabled, returns :math:`1.0` if only consolidation is active, else None.
get_K_gwflow(), get_Ss_gwflow(), get_Q_gwflow() -> tf.Tensor or None
Return positive hydraulic conductivity :math:`K`, specific storage
:math:`S_s`, and source/sink term :math:`Q` for groundwater flow PDE,
if “gw_flow” mode is active. If not, return None.
get_config() -> dict
Returns a dict of all initialization arguments (static, dynamic, future
dims; PINN coefficients; architectural HPs). Enables model saving/loading
via tf.keras.models.clone_model.
from_config(config: dict, custom_objects=None) -> PIHALNet
Reconstructs a PIHALNet instance from get_config() output.
run_halnet_core(static_input, dynamic_input, future_input, training) -> tf.Tensor
Executes the core data-driven feature pipeline:
- Applies VSNs (if enabled) or Dense to each input block.
- Aligns future features via align_temporal_dimensions.
- Concatenates dynamic + future embeddings (or uses MME if no VSN).
- Applies positional encoding and optional residual connection.
- Runs MultiScaleLSTM, hierarchical/cross/memory-attention, and fusion.
- Returns final features to feed into MultiDecoder.
split_outputs(predictions_combined, decoded_outputs_for_mean) -> tuple
Separates combined predictions into subsidence and GWL components:
- predictions_combined: may include a quantile dimension (Rank 4) or be
Rank 3 if only mean forecasts. Splits along last axis into subsidence
and GWL for data loss.
- decoded_outputs_for_mean: always Rank 3 (Batch, Horizon, CombinedDim)
before quantile modeling. Splits into s_pred_mean_for_pde and
gwl_pred_mean_for_pde for physics residual calculation.
Notes
-----
- If quantiles is provided, final outputs shape is
(batch_size, horizon, num_quantiles, combined_output_dim).
Otherwise shape is (batch_size, horizon, combined_output_dim).
- Consolidation residual uses finite differences along horizon steps
to approximate :math:`\partial h / \partial t` and :math:`\Delta s`.
- Groundwater-flow PDE is reserved for a future release (“gw_flow” mode).
- VariableSelectionNetworks (VSNs) refine feature selection. If
use_vsn=False, simple Dense projections are used instead.
- MultiScaleLSTM can process temporal patterns at different resolutions.
Scales must divide the forecast horizon if not “auto”.
Examples
--------
# 1) Instantiate PIHALNet for point forecasts (no quantiles):
>>> from fusionlab.nn.pinn.models import PiHALNet
>>> model = PiHALNet(
... static_input_dim=5,
... dynamic_input_dim=3,
... future_input_dim=2,
... output_subsidence_dim=1,
... output_gwl_dim=1,
... embed_dim=32,
... hidden_units=64,
... lstm_units=64,
... attention_units=32,
... num_heads=4,
... dropout_rate=0.1,
... forecast_horizon=1,
... quantiles=None,
... max_window_size=10,
... memory_size=100,
... scales='auto',
... multi_scale_agg='last',
... final_agg='last',
... activation='relu',
... use_residuals=True,
... use_batch_norm=False,
... pde_mode='consolidation',
... pinn_coefficient_C='learnable',
... gw_flow_coeffs=None,
... use_vsn=True,
... vsn_units=None,
... )
>>> model.compile(
... optimizer='adam',
... loss={'subs_pred': 'mse', 'gwl_pred': 'mse'},
... metrics={'subs_pred': ['mae'], 'gwl_pred': ['mae']},
... loss_weights={'subs_pred': 1.0, 'gwl_pred': 0.8},
... lambda_pde=0.5,
... )
>>> import numpy as np
>>> inputs = {
... 'coords': np.zeros((2, 2), dtype='float32'),
... 'static_features': np.zeros((2, 5), dtype='float32'),
... 'dynamic_features': np.zeros((2, 1, 3), dtype='float32'),
... 'future_features': np.zeros((2, 1, 2), dtype='float32'),
... }
>>> targets = {
... 'subs_pred': np.zeros((2, 1, 1), dtype='float32'),
... 'gwl_pred': np.zeros((2, 1, 1), dtype='float32'),
... }
>>> outputs = model(inputs, training=False)
>>> print(outputs['subs_pred'].shape, outputs['gwl_pred'].shape)
(2, 1, 1) (2, 1, 1)
# 2) Instantiate with quantile forecasting (e.g., [0.1, 0.5, 0.9]):
>>> model_q = PIHALNet(
... static_input_dim=5,
... dynamic_input_dim=3,
... future_input_dim=2,
... output_subsidence_dim=1,
... output_gwl_dim=1,
... embed_dim=32,
... hidden_units=64,
... lstm_units=64,
... attention_units=32,
... num_heads=4,
... dropout_rate=0.1,
... forecast_horizon=3,
... quantiles=[0.1, 0.5, 0.9],
... max_window_size=10,
... memory_size=100,
... scales=[1, 3],
... multi_scale_agg='average',
... final_agg='flatten',
... activation='gelu',
... use_residuals=True,
... use_batch_norm=True,
... pde_mode='consolidation',
... pinn_coefficient_C=0.02, # fixed C = 0.02
... gw_flow_coeffs={'K': 1e-4, 'Ss': 1e-5, 'Q': 0.0},
... use_vsn=False,
... vsn_units=None,
... )
>>> model_q.compile(
... optimizer='adam',
... loss={'subs_pred': 'mse', 'gwl_pred': 'mse'},
... metrics={'subs_pred': ['mae'], 'gwl_pred': ['mae']},
... loss_weights={'subs_pred': 1.0, 'gwl_pred': 0.8},
... lambda_pde=1.0,
... )
>>> outputs_q = model_q(inputs, training=False)
>>> # Since quantiles=3, final outputs have shape (2, 3, 3, 2):
>>> print(outputs_q['subs_pred'].shape, outputs_q['gwl_pred'].shape)
(2, 3, 3, 1) (2, 3, 3, 1)
See Also
--------
fusionlab.nn.pinn.tuning.PIHALTuner
Hyperparameter tuner specifically built for PIHALNet.
fusionlab.nn.pinn.utils.process_pinn_inputs
Preprocessing of nested input dict to tensors.
fusionlab.nn.pinn._tensor_validation.validate_model_inputs
Ensures dynamic/static/future tensors match declared dims.
References
----------
.. [1] Raissi, M., Perdikaris, P., & Karniadakis, G.E. (2019).
*Physics-informed neural networks: A deep learning framework
for solving forward and inverse problems involving nonlinear
partial differential equations*. Journal of Computational Physics,
378, 686–707.
.. [2] Karniadakis, G.E., Kevrekidis, I.G., Lu, L., Perdikaris, P.,
Wang, S., & Yang, L. (2021). *Physics-informed machine learning*.
Nature Reviews Physics, 3(6), 422–440.
.. [3] Heng, M.H., Chen, W., & Smith, E.C. (2022). *Joint modeling of
land subsidence and groundwater levels with PINNs*. Environmental
Modelling & Software, 150, 105347.
"""