# -*- coding: utf-8 -*-
# Author: LKouadio <etanoyau@gmail.com>
# License: BSD-3-Clause
from __future__ import annotations
from numbers import Integral, Real
from typing import Optional, Union, Dict, List, Tuple
from ..._fusionlog import fusionlog, OncePerMessageFilter
from ...api.docs import DocstringComponents, _halnet_core_params
from ...compat.sklearn import validate_params, Interval, StrOptions
from ...utils.deps_utils import ensure_pkg
from ...utils.generic_utils import rename_dict_keys
from ...params import (
LearnableK, LearnableSs, LearnableQ, LearnableC,
FixedC, DisabledC
)
from .. import KERAS_BACKEND, KERAS_DEPS, dependency_message
from .._base_attentive import BaseAttentive
if KERAS_BACKEND:
from .._tensor_validation import check_inputs, validate_model_inputs
from .op import process_pinn_inputs
from .utils import process_pde_modes, extract_txy_in, _get_coords
from ..comp_utils import resolve_gw_coeffs, normalize_C_descriptor
LSTM = KERAS_DEPS.LSTM
Dense = KERAS_DEPS.Dense
LayerNormalization = KERAS_DEPS.LayerNormalization
Sequential =KERAS_DEPS.Sequential
InputLayer =KERAS_DEPS.InputLayer
Model= KERAS_DEPS.Model
Tensor=KERAS_DEPS.Tensor
Variable =KERAS_DEPS.Variable
Add =KERAS_DEPS.Add
Constant =KERAS_DEPS.Constant
GradientTape =KERAS_DEPS.GradientTape
tf_zeros_like= KERAS_DEPS.zeros_like
tf_zeros =KERAS_DEPS.zeros
tf_reduce_mean =KERAS_DEPS.reduce_mean
tf_square =KERAS_DEPS.square
tf_constant =KERAS_DEPS.constant
tf_log = KERAS_DEPS.log
tf_expand_dims = KERAS_DEPS.expand_dims
tf_tile = KERAS_DEPS.tile
tf_concat = KERAS_DEPS.concat
tf_shape = KERAS_DEPS.shape
tf_float32=KERAS_DEPS.float32
tf_exp =KERAS_DEPS.exp
tf_rank =KERAS_DEPS.rank
tf_assert_equal = KERAS_DEPS.assert_equal
tf_convert_to_tensor =KERAS_DEPS.convert_to_tensor
register_keras_serializable = KERAS_DEPS.register_keras_serializable
deserialize_keras_object= KERAS_DEPS.deserialize_keras_object
tf_autograph=KERAS_DEPS.autograph
tf_autograph.set_verbosity(0)
DEP_MSG = dependency_message('nn.pinn.models')
logger = fusionlog().get_fusionlab_logger(__name__)
logger.addFilter(OncePerMessageFilter())
_param_docs = DocstringComponents.from_nested_components(
base=DocstringComponents(_halnet_core_params)
)
__all__ = ["TransFlowSubsNet"]
[docs]
@register_keras_serializable(
'fusionlab.nn.pinn', name="TransFlowSubsNet")
class TransFlowSubsNet(BaseAttentive):
[docs]
@validate_params({
'output_subsidence_dim': [Interval(Integral,1, None, closed="left")],
'output_gwl_dim': [Interval(Integral,1, None, closed="left"),],
"pde_mode": [
StrOptions({'consolidation', 'gw_flow', 'both', 'none'}),
'array-like', None
],
"pinn_coefficient_C": [
str, Real, None, StrOptions({"learnable", "fixed"}),
LearnableC, FixedC, DisabledC
],
"K": [Real, None, StrOptions({"learnable", "fixed"}),LearnableK],
"Ss": [Real, None, StrOptions({"learnable", "fixed"}),LearnableSs],
"Q": [Real, None, StrOptions({"learnable", "fixed"}),LearnableQ],
"gw_flow_coeffs": [dict, None], }
)
@ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG)
def __init__(
self,
static_input_dim: int,
dynamic_input_dim: int,
future_input_dim: int,
output_subsidence_dim: int = 1,
output_gwl_dim: int = 1,
embed_dim: int = 32,
hidden_units: int = 64,
lstm_units: int = 64,
attention_units: int = 32,
num_heads: int = 4,
dropout_rate: float = 0.1,
forecast_horizon: int = 1,
quantiles: Optional[List[float]] = None,
max_window_size: int = 10,
memory_size: int = 100,
scales: Optional[List[int]] = None,
multi_scale_agg: str = 'last',
final_agg: str = 'last',
activation: str = 'relu',
use_residuals: bool = True,
use_batch_norm: bool = False,
pde_mode: Union[str, List[str]] = 'both',
K: Union[str, float, LearnableK
] = LearnableK(initial_value=1e-4),
Ss: Union[float, LearnableSs, str
] = LearnableSs(initial_value =1e-5),
Q: Union[float, LearnableQ, str
] = LearnableQ(initial_value =0.0),
pinn_coefficient_C: Union[
LearnableC, FixedC, DisabledC, str, float, None
] = LearnableC(initial_value=0.01),
gw_flow_coeffs: Optional[Dict[str, Union[str, float, None]]] = None,
use_vsn: bool = True,
vsn_units: Optional[int] = None,
mode: Optional[str]=None,
objective: Optional[str]=None,
attention_levels:Optional[Union[str, List[str]]]=None,
architecture_config: Optional[Dict] = None,
name: str = "TransFlowSubsNet",
**kwargs
):
# The total output dimension for the base data-driven model
# is the sum of the subsidence and GWL dimensions.
self._combined_output_dim = (
output_subsidence_dim + output_gwl_dim
)
super().__init__(
static_input_dim=static_input_dim,
dynamic_input_dim=dynamic_input_dim,
future_input_dim=future_input_dim,
output_dim=self._combined_output_dim,
forecast_horizon=forecast_horizon,
mode=mode,
quantiles=quantiles,
embed_dim=embed_dim,
hidden_units=hidden_units,
lstm_units=lstm_units,
attention_units=attention_units,
num_heads=num_heads,
dropout_rate=dropout_rate,
max_window_size=max_window_size,
memory_size=memory_size,
scales=scales,
multi_scale_agg=multi_scale_agg,
final_agg=final_agg,
activation=activation,
use_residuals=use_residuals,
use_vsn=use_vsn,
use_batch_norm =use_batch_norm,
vsn_units=vsn_units,
attention_levels =attention_levels,
objective=objective,
architecture_config=architecture_config,
name=name,
**kwargs
)
self.output_subsidence_dim = output_subsidence_dim
self.output_gwl_dim = output_gwl_dim
self.pde_modes_active = process_pde_modes(pde_mode)
self.pinn_coefficient_C =pinn_coefficient_C
self.pinn_coefficient_C_config = normalize_C_descriptor(
pinn_coefficient_C
)
self.gw_flow_coeffs = gw_flow_coeffs
K, Ss, Q = resolve_gw_coeffs(
gw_flow_coeffs=self.gw_flow_coeffs,
K=K, Ss=Ss, Q=Q,
)
logger.info(f"Initialized with K={K}, Ss={Ss}, Q={Q}")
self.K_config = K
self.Ss_config = Ss
self.Q_config = Q
self._init_coordinate_corrections()
self._build_pinn_components()
def _init_coordinate_corrections(
self,
gwl_units: Union [int, None] = None,
subs_units: Union [int, None] = None,
hidden: Tuple[int, int] = (32, 16),
act: str = "gelu",
) -> None:
r"""
Build two compact MLPs that transform coordinates into
additive corrections for the model's head and subsidence
predictions.
Parameters
----------
gwl_units : int | None, default ``self.output_gwl_dim``
Output width of the groundwater‐head branch. Pass *None* to
reuse ``self.output_gwl_dim``.
subs_units : int | None, default ``self.output_subsidence_dim``
Output width of the subsidence branch. Pass *None* to reuse
``self.output_subsidence_dim``.
hidden : tuple[int, int], default ``(32, 16)``
Widths of the two hidden dense layers *shared by both* MLPs.
act : str, default ``"gelu"``
Activation applied after each hidden layer.
Notes
-----
Let the prediction network output
.. math::
h^{\text{net}}(t,x,y),\qquad s^{\text{net}}(t,x,y).
This helper adds two learnable corrections
.. math::
\Delta h = f_{\theta_h}(t,x,y), \qquad
\Delta s = f_{\theta_s}(t,x,y),
where :math:`f_{\theta_*}` are shallow MLPs.
The final quantities used in the PDE terms become
.. math::
h = h^{\text{net}} + \Delta h, \qquad
s = s^{\text{net}} + \Delta s.
The two branches are exposed on the instance as
``self.coord_mlp`` and ``self.subs_coord_mlp`` and are trained
jointly with the rest of the network.
The models are stored on the instance as
``self.coord_mlp`` and ``self.subs_coord_mlp``. They are **not**
compiled; they inherit the parent model’s optimizer and are trained
end-to-end inside :py:meth:`train_step`.
"""
# fallback to instance-level defaults
gwl_units = gwl_units or self.output_gwl_dim
subs_units = subs_units or self.output_subsidence_dim
def _branch(out_units: int, name: str) -> Sequential:
return Sequential(
[
InputLayer(input_shape=(None, 3)),
Dense(hidden[0], activation=act),
Dense(hidden[1], activation=act),
Dense(out_units),
],
name=name,
)
# Ground-water head and subsidence correction heads
self.coord_mlp = _branch(gwl_units, "coord_mlp")
self.subs_coord_mlp = _branch(subs_units, "subs_coord_mlp")
def _build_C_components(self):
"""
Instantiates components required for the physics‐informed module.
Specifically, sets up how we obtain C:
- If LearnableC, create a trainable variable log_C.
- If FixedC, store a lambda that returns a fixed tf.constant(value).
- If DisabledC, store a lambda that returns tf.constant(1.0).
"""
# Check which descriptor we have:
desc = self.pinn_coefficient_C_config
if isinstance(desc, LearnableC):
# We learn log(C) so that C = exp(log_C) is always > 0
self.log_C_coefficient = self.add_weight(
name="log_pinn_coefficient_C",
shape=(), # scalar
initializer=Constant(tf_log(desc.initial_value)),
trainable=True
)
self._get_C = lambda: tf_exp(self.log_C_coefficient)
elif isinstance(desc, FixedC):
# Fixed value, non‐trainable
val = desc.value
self._get_C = lambda: tf_constant(val, dtype=tf_float32)
elif isinstance(desc, DisabledC):
# Physics disabled => C internally 1.0
# but not used if lambda_cons==0 in compile
self._get_C = lambda: tf_constant(1.0, dtype=tf_float32)
else:
# Should never happen if _normalize_C_descriptor is correct
raise RuntimeError(
"Internal error: pinn_coefficient_C_config"
" is not a recognized type."
)
[docs]
def get_pinn_coefficient_C(self) -> Tensor:
"""Returns the physical coefficient C."""
return self._get_C()
def _build_pinn_components(self):
"""
Instantiates trainable/fixed physical coefficients using the
correct Keras API to ensure they are tracked by the model.
"""
# This method is already correct because it uses self.add_weight.
# It creates `self.log_C_coefficient` if learnable.
self._build_C_components()
# This getter now correctly accesses the learnable variable.
self.C = self.get_pinn_coefficient_C()
# Create K, Ss, and Q using self.add_weight ---
# Handle Hydraulic Conductivity (K)
if isinstance(self.K_config, LearnableK):
self.log_K = self.add_weight(
name="log_param_K",
shape=(),
initializer=Constant(
tf_log(self.K_config.initial_value)
),
trainable=True
)
self.K = tf_exp(self.log_K)
else: # Fixed value
self.K = tf_constant(float(self.K_config), dtype=tf_float32)
# Handle Specific Storage (Ss)
if isinstance(self.Ss_config, LearnableSs):
self.log_Ss = self.add_weight(
name="log_param_Ss",
shape=(),
initializer=Constant(
tf_log(self.Ss_config.initial_value)
),
trainable=True
)
self.Ss = tf_exp(self.log_Ss)
else: # Fixed value
self.Ss = tf_constant(float(self.Ss_config), dtype=tf_float32)
# Handle Source/Sink Term (Q)
if isinstance(self.Q_config, LearnableQ):
self.Q = self.add_weight(
name="param_Q",
shape=(),
initializer=Constant(
self.Q_config.initial_value
),
trainable=True
)
else: # Fixed value
self.Q = tf_constant(float(self.Q_config), dtype=tf_float32)
[docs]
@tf_autograph.experimental.do_not_convert
def call(
self, inputs: Dict[str, Optional[Tensor]],
training: bool = False
):
r"""
Single forward sweep mixing data and physics paths.
The routine
1. extracts :tmath:`t,x,y` and covariate tensors from *inputs*;
2. runs sanity checks on dimensionality;
3. feeds the validated features through the inherited
encoder–decoder stack; and
4. splits the decoder output into mean and final
predictions that will later enter the data‐ and
PDE‐loss terms.
Returns
-------
dict
``{"subs_pred": s, "gwl_pred": h,
"subs_pred_mean": \bar s, "gwl_pred_mean": \bar h}``
with shapes
.. math::
s,\;h &\in \mathbb R^{B\times H\times d},\\
\bar s,\;\bar h &\in \mathbb R^{B\times H\times d}.
Here *B* is the batch size, *H* the forecast horizon and
*d* each target's width.
Notes
-----
* All coordinate tensors are **not** differentiated here;
their gradients are taken in :py:meth:`train_step`.
* The method remains side‐effect free: no weights are updated,
no losses are added. It purely produces tensors needed by the
custom training loop that follows.
"""
# --- 1. Unpack and Validate All Inputs ---
# The `process_pinn_inputs` helper unpacks the input dict and
# isolates the coordinate tensors for later use. It is assumed
# that the `inputs` dict contains all necessary keys.
logger.debug("TransFlowSubsNet call: Unpacking and validating inputs.")
(t, x, y, static_features,
dynamic_features, future_features) = process_pinn_inputs(
inputs, mode='auto'
)
# basic tensors checks.
check_inputs(
dynamic_inputs= dynamic_features,
static_inputs= static_features,
future_inputs= future_features,
dynamic_input_dim= self.dynamic_input_dim,
static_input_dim = self.static_input_dim,
future_input_dim= self.future_input_dim,
forecast_horizon= self.forecast_horizon,
verbose=0 # Set to >0 for detailed logging from checks
)
logger.debug(
"Input shapes after validation:"
f" S={getattr(static_features, 'shape', 'None')}, "
f"D={getattr(dynamic_features, 'shape', 'None')},"
f" F={getattr(future_features, 'shape', 'None')}"
)
# The `validate_model_inputs` provides a detailed check on the
# unpacked feature tensors to ensure they match model expectations.
static_p, dynamic_p, future_p = validate_model_inputs(
inputs=[static_features, dynamic_features, future_features],
static_input_dim=self.static_input_dim,
dynamic_input_dim=self.dynamic_input_dim,
future_covariate_dim=self.future_input_dim,
mode='strict',
verbose= 0
)
# Validate future_p shape based on mode
if self.mode == 'tft_like':
expected_future_span = self.max_window_size + self.forecast_horizon
else: # pihal_like
expected_future_span = self.forecast_horizon
actual_future_span = tf_shape(future_p)[1]
expected_span_tensor = tf_convert_to_tensor(
expected_future_span, dtype=actual_future_span.dtype)
tf_assert_equal(
actual_future_span, expected_span_tensor,
message=(
f"Incorrect 'future_features' tensor length for "
f"mode='{self.mode}'. Expected time dimension of "
f"{expected_future_span}, but got {actual_future_span}."
)
)
# --- 2. Run Core Data-Driven Feature Extraction ---
# This method performs the complex feature engineering using LSTMs
# and attention mechanisms from the BaseAttentive class.
logger.debug("Running data-driven core for feature extraction.")
final_features_for_decode = self.run_encoder_decoder_core(
static_input=static_p,
dynamic_input=dynamic_p,
future_input=future_p,
training=training
)
# --- 3. Generate Predictions ---
# Get mean predictions (for PDE) from the multi-horizon decoder.
decoded_means = self.multi_decoder(
final_features_for_decode, training=training
)
# Get final predictions (potentially with quantiles) for data loss.
final_predictions = decoded_means
if self.quantiles is not None:
final_predictions = self.quantile_distribution_modeling(
decoded_means, training=training
)
# --- 4. Split and Organize Outputs ---
# Separate the combined output tensor into subsidence and GWL streams.
(s_pred_final, gwl_pred_final,
s_pred_mean, gwl_pred_mean) = self.split_outputs(
predictions_combined=final_predictions,
decoded_outputs_for_mean=decoded_means
)
logger.debug("Computing PDE residuals from mean predictions.")
# --- 5. Return All Predictions ---
# The call method now returns all necessary prediction tensors.
# The train_step will be responsible for using these to compute
# the final composite loss.
# Return All Components for the Loss Function ---
return {
"subs_pred": s_pred_final,
"gwl_pred": gwl_pred_final,
"subs_pred_mean": s_pred_mean,
"gwl_pred_mean": gwl_pred_mean,
}
[docs]
def compute_physics_loss(
self, inputs: Dict[str, Tensor]
) -> Tuple[Tensor, Tensor]:
"""
Computes the physics-based loss terms for both consolidation
and groundwater flow.
"""
# This method uses its own tape to calculate the gradients
# required for the PDE residuals.
with GradientTape(persistent=True) as tape:
# Watch the coordinate tensor and the model's predictions.
coords = inputs['coords']
tape.watch(coords)
# Re-run the forward pass *within this tape's context*
# to get predictions that are differentiable wrt coords.
predictions = self(inputs, training=True)
s_pred_mean = predictions['subs_pred_mean']
h_pred_mean = predictions['gwl_pred_mean']
# Explicitly watch the prediction tensors too
tape.watch(s_pred_mean)
tape.watch(h_pred_mean)
# Unpack coordinates for differentiation
t, x, y = coords[..., 0:1], coords[..., 1:2], coords[..., 2:3]
# --- First-Order Derivatives ---
ds_dt = tape.gradient(s_pred_mean, t)
dh_dt = tape.gradient(h_pred_mean, t)
dh_dx = tape.gradient(h_pred_mean, x)
dh_dy = tape.gradient(h_pred_mean, y)
# --- Second-Order Derivatives ---
d2h_dx2 = tape.gradient(dh_dx, x)
d2h_dy2 = tape.gradient(dh_dy, y)
# Clean up the persistent tape
del tape
# Validate gradients
if any(g is None for g in [ds_dt, dh_dt, d2h_dx2, d2h_dy2]):
raise ValueError("Failed to compute one or more PDE gradients.")
# Assemble residuals using stateless helpers
cons_res = self._compute_consolidation_residual(ds_dt, d2h_dx2, d2h_dy2)
gw_res = self._compute_gw_flow_residual(dh_dt, d2h_dx2, d2h_dy2)
# Calculate the loss for each residual
loss_cons = tf_reduce_mean(tf_square(cons_res))
loss_gw = tf_reduce_mean(tf_square(gw_res))
return loss_cons, loss_gw
[docs]
def train_step(self, data):
"""
One optimization step uniting data and physics terms.
Uses a single :class:`tf.GradientTape` to obtain all
first- and second-order coordinate derivatives required
by the groundwater-flow and consolidation PDEs.
Data loss: any Keras loss supplied in :py:meth:`compile`.
PDE losses:
.. math::
S_s \,\partial_t h
\;-\; K (\partial_{xx} h + \partial_{yy} h) - Q &= 0 \\
\partial_t s
\;-\; C (\partial_{xx} h + \partial_{yy} h) &= 0
The final objective is
:math:`\mathcal L = L_\text{data}
+ \lambda_c L_\text{cons} + \lambda_g L_\text{gw}`.
On return the method updates built-in metrics and
provides a dict with total, data and PDE losses.
"""
inputs, targets = data
if isinstance (targets, dict):
# For consistency, map targets if users explicitely
# provide 'subsidence', and 'gwl' as keys .
targets = rename_dict_keys(
targets.copy(),
param_to_rename={
"subsidence": "subs_pred",
"gwl": "gwl_pred"
}
)
with GradientTape(persistent=True) as tape:
# The tape must watch the input coordinates to compute
# gradients of the model's output with respect to them.
coords = _get_coords(inputs)
# coords = inputs['coords']
t, x, y = extract_txy_in(coords)
tape.watch([t, x, y,])
# --- FORWARD PASS & DATA LOSS ---
outputs = self(inputs, training=True)
# Same t,x,y that the tape watches → gradients exist
coords_flat = tf_concat([t, x, y], axis=-1) # [B,T,3]
mlp_corr = self.coord_mlp(coords_flat, training=True)
s_corr = self.subs_coord_mlp(coords_flat, training=True)
# inject the correction into the head that feeds the PDE
h_pred_mean = outputs["gwl_pred_mean"] + mlp_corr
s_pred_mean = outputs["subs_pred_mean"] + s_corr
# save a copy for the data loss dictionary
outputs["gwl_pred_mean"] = h_pred_mean
outputs["subs_pred_mean"] = s_pred_mean
y_pred_for_loss = {
'subs_pred': outputs['subs_pred'],
'gwl_pred': outputs['gwl_pred']
}
data_loss = self.compiled_loss(
y_true=targets, y_pred=y_pred_for_loss,
regularization_losses=self.losses
)
# --- PDE RESIDUAL CALCULATION ---
s_pred_mean = outputs['subs_pred_mean']
# h_pred_mean = outputs['gwl_pred_mean']
tape.watch([s_pred_mean , h_pred_mean])
# --- First-order derivatives ---
ds_dt = tape.gradient(s_pred_mean, t)
dh_dt = tape.gradient(h_pred_mean, t)
dh_dx = tape.gradient(h_pred_mean, x)
dh_dy = tape.gradient(h_pred_mean, y)
# --- Second-order derivatives ---
d2h_dx2 = tape.gradient(dh_dx, x)
d2h_dy2 = tape.gradient(dh_dy, y)
# Validate that all necessary gradients were computed.
if any(g is None for g in [ds_dt, dh_dt, d2h_dx2, d2h_dy2]):
raise ValueError(
"One or more PDE gradients are missing; "
"check t, x, y in the forward graph and "
"ensure they influence the model's predictions."
)
# Assemble residuals using the stateless helpers.
cons_res = self._compute_consolidation_residual(
ds_dt, d2h_dx2, d2h_dy2)
gw_res = self._compute_gw_flow_residual(
dh_dt, d2h_dx2, d2h_dy2)
# --- COMPOSITE LOSS ---
loss_cons = tf_reduce_mean(tf_square(cons_res))
loss_gw = tf_reduce_mean(tf_square(gw_res))
# physics_loss = (+ self.lambda_cons * loss_cons
# + self.lambda_gw * loss_gw)
total_loss = (
data_loss
+ self.lambda_cons * loss_cons
+ self.lambda_gw * loss_gw
)
# --- APPLY GRADIENTS ---
trainable_vars = self.trainable_variables
grads = tape.gradient(total_loss, trainable_vars)
del tape # free memory
self.optimizer.apply_gradients(zip(grads, trainable_vars))
# --- METRICS & RETURN ---
self.compiled_metrics.update_state(targets, y_pred_for_loss)
results = {m.name: m.result() for m in self.metrics}
results.update({
"total_loss": total_loss,
"data_loss": data_loss,
"physics_loss": (
self.lambda_cons * loss_cons + self.lambda_gw * loss_gw),
"consolidation_loss": loss_cons,
"gw_flow_loss": loss_gw,
})
return results
def _compute_consolidation_residual(
self, ds_dt, d2h_dx2, d2h_dy2):
"""
Residual of the consolidation balance.
.. math::
R_c = \partial_t s - C(\partial_{xx} h + \partial_{yy} h)
"""
if 'consolidation' not in self.pde_modes_active:
return tf_zeros_like(ds_dt)
return ds_dt - self.C * (d2h_dx2 + d2h_dy2)
def _compute_gw_flow_residual(
self, dh_dt, d2h_dx2, d2h_dy2):
"""
Residual of the transient groundwater flow.
.. math::
R_g = S_s \,\partial_t h
- K(\partial_{xx} h + \partial_{yy} h) - Q
"""
if 'gw_flow' not in self.pde_modes_active:
return tf_zeros_like(dh_dt)
return (self.Ss * dh_dt) - self.K * (d2h_dx2 + d2h_dy2) - self.Q
[docs]
def split_outputs(
self,
predictions_combined: Tensor,
decoded_outputs_for_mean: Tensor
) -> Tuple[Tensor, Tensor, Tensor, Tensor]:
r"""
Separate the **combined output tensor** into individual
subsidence and groundwater‑level (GWL) components and return
both the *final* and *mean* predictions needed for the two loss
terms used in TransFlowSubsNet (data loss and physics/PDE loss).
The method supports two output shapes:
* **Quantile mode**
``(B, H, Q, C)`` where *Q* is the number of quantiles and
*C* = ``output_subsidence_dim + output_gwl_dim``.
* **Deterministic mode**
``(B, H, C)`` when quantiles are disabled.
Parameters
----------
predictions_combined : Tensor
Network output after the
:class:`~fusionlab.nn.pinn.QuantileDistributionModeling`
stage. Shape is ``(B, H, C)`` or ``(B, H, Q, C)``.
decoded_outputs_for_mean : Tensor
Decoder output *before* quantile distribution, used to
compute the PDE residual. Shape is ``(B, H, C)``.
training : bool, optional
*Inherited from the calling context.* Present only in
TensorFlow graph mode; not used explicitly here.
Returns
-------
s_pred_final : Tensor
Subsidence predictions ready for the data‑fidelity loss.
Shape matches ``predictions_combined`` minus the *C* split.
gwl_pred_final : Tensor
GWL predictions ready for the data‑fidelity loss.
s_pred_mean_for_pde : Tensor
Mean (deterministic) subsidence predictions used when
computing physics‑based derivatives.
gwl_pred_mean_for_pde : Tensor
Mean GWL predictions for the PDE residual term.
Notes
-----
* Mean predictions are extracted *only* from
``decoded_outputs_for_mean`` because applying the quantile
mapping first would break the differentiability required for
spatial–temporal derivatives.
* When TensorFlow executes in graph mode and the rank of
*predictions_combined* is dynamic, the function falls back to
:pyfunc:`tf.rank` for shape inspection.
Examples
--------
>>> outputs = model(...) # forward pass
>>> s_final, gwl_final, s_mean, gwl_mean = (
... model.split_outputs(
... predictions_combined=outputs["pred"],
... decoded_outputs_for_mean=outputs["dec_mean"],
... )
... )
>>> s_final.shape
TensorShape([32, 24, 3]) # e.g. B=32, H=24, Q=3
>>> gwl_mean.shape
TensorShape([32, 24, 1]) # deterministic mean
See Also
--------
fusionlab.nn.pinn.QuantileDistributionModeling :
Layer that adds the quantile dimension.
fusionlab.nn.pinn.PiHALNet.run_halnet_core :
Produces ``decoded_outputs_for_mean``.
"""
# --- 1. Extract Mean Predictions (for PDE Loss) ---
# These come from the decoder output *before* quantile distribution.
# This provides a stable point forecast for derivative calculation.
s_pred_mean_for_pde = decoded_outputs_for_mean[
..., :self.output_subsidence_dim
]
gwl_pred_mean_for_pde = decoded_outputs_for_mean[
..., self.output_subsidence_dim:
]
# --- 2. Extract Final Predictions (for Data Loss) ---
# These may or may not include a quantile dimension.
# We check the tensor's rank to decide how to slice.
# Keras may return a known static rank during build time,
# or we can use tf.rank for dynamic graph execution.
if self.quantiles and hasattr(
predictions_combined, 'shape') and len(
predictions_combined.shape) == 4:
# Case: Quantiles are present.
# Shape is (Batch, Horizon, NumQuantiles, CombinedOutputDim)
s_pred_final = predictions_combined[
..., :self.output_subsidence_dim
]
gwl_pred_final = predictions_combined[
..., self.output_subsidence_dim:
]
elif (
hasattr(predictions_combined, 'shape')
and len(predictions_combined.shape) == 3
):
# Case: No quantiles. Shape is (Batch, Horizon, CombinedOutputDim)
s_pred_final = predictions_combined[
..., :self.output_subsidence_dim
]
gwl_pred_final = predictions_combined[
..., self.output_subsidence_dim:
]
else:
# This case handles dynamic shapes during graph execution
# and acts as a fallback.
if self.quantiles and tf_rank(predictions_combined) == 4:
s_pred_final = predictions_combined[..., :self.output_subsidence_dim]
gwl_pred_final = predictions_combined[..., self.output_subsidence_dim:]
elif tf_rank(predictions_combined) == 3:
s_pred_final = predictions_combined[..., :self.output_subsidence_dim]
gwl_pred_final = predictions_combined[..., self.output_subsidence_dim:]
else:
# This case should ideally not be reached if QDM is consistent
raise ValueError(
f"Unexpected shape from QuantileDistributionModeling: "
f"Rank is {tf_rank(predictions_combined)}"
)
return (s_pred_final, gwl_pred_final,
s_pred_mean_for_pde, gwl_pred_mean_for_pde)
def _split_outputs(
self,
predictions_combined: Tensor,
decoded_outputs_for_mean: Tensor
) -> Tuple[Tensor, Tensor, Tensor, Tensor]:
"""Splits combined model outputs into subsidence and GWL tensors.
This helper function takes the stacked output tensors from the
final layers and separates them into their respective parts for
subsidence and groundwater level, for both the final (potentially
quantile) predictions and the mean predictions.
Args:
predictions_combined: The final output tensor from the model,
which may include a quantile dimension. Shape is either
(B, H, O_s + O_g) or (B, H, O_s + O_g, Q).
decoded_outputs_for_mean: The mean predictions from the
decoder, used for PDE calculations. Shape is
(B, H, O_s + O_g).
Returns:
A tuple containing four tensors:
(s_pred_final, gwl_pred_final, s_pred_mean, gwl_pred_mean)
"""
# The feature dimension is always the one before the quantiles,
# or the last one if no quantiles are present.
# feature_axis = -2 if self.quantiles is not None else -1
# Split the final predictions (for data loss)
s_pred_final = predictions_combined[
..., :self.output_subsidence_dim, :]
gwl_pred_final = predictions_combined[
..., self.output_subsidence_dim:, :]
# Split the mean predictions (for physics loss)
s_pred_mean_for_pde = decoded_outputs_for_mean[
..., :self.output_subsidence_dim]
gwl_pred_mean_for_pde = decoded_outputs_for_mean[
..., self.output_subsidence_dim:]
return (s_pred_final, gwl_pred_final,
s_pred_mean_for_pde, gwl_pred_mean_for_pde)
[docs]
def compile(
self,
lambda_cons: float = 1.0,
lambda_gw: float = 1.0,
**kwargs
):
"""Compiles the model with composite loss weights.
This method extends the default Keras `compile` method to
accept weights for the physics-based loss components.
Args:
lambda_cons (float, optional): The weight for the
consolidation PDE residual loss. Defaults to 1.0.
lambda_gw (float, optional): The weight for the
groundwater flow PDE residual loss. Defaults to 1.0.
**kwargs: Standard arguments for `tf.keras.Model.compile`,
such as `optimizer`, `loss`, and `metrics`.
"""
super().compile(**kwargs)
# Store the weights for the physics loss components
self.lambda_cons = lambda_cons
self.lambda_gw = lambda_gw
[docs]
def get_config(self) -> dict:
"""Returns the full configuration of the model.
This method serializes the model's configuration by
combining the configuration of the parent `BaseAttentive`
class with the parameters specific to this PINN subclass.
Returns
-------
dict
A dictionary containing all the necessary parameters to
reconstruct the model.
"""
# Get the configuration from the base class, which includes
# all the data-driven architectural parameters.
base_config = super().get_config()
# Add the parameters that are unique to this subclass.
pinn_config = {
"output_subsidence_dim": self.output_subsidence_dim,
"output_gwl_dim": self.output_gwl_dim,
"pde_mode": self.pde_modes_active,
"pinn_coefficient_C": self.pinn_coefficient_C ,
"gw_flow_coeffs": self.gw_flow_coeffs,
"K": self.K_config,
"Ss": self.Ss_config,
"Q": self.Q_config,
}
base_config.update(pinn_config)
return base_config
[docs]
@classmethod
def from_config(cls, config: dict, custom_objects=None):
"""Reconstructs a model instance from its configuration.
Args:
config (dict): Configuration dictionary from `get_config`.
custom_objects (dict, optional): Unused here, as custom
objects are expected to be registered with Keras.
Returns
-------
TransFlowSubsNet
A new instance of the model.
"""
# The configuration dictionary contains all the arguments
# needed by the __init__ method. Keras will handle the
# deserialization of registered custom objects like LearnableC.
if 'output_dim' in config:
config.pop('output_dim')
# Re-create model and nested learnable parameters.
if custom_objects is None:
# ensure all helper classes are available to the deserializer
custom_objects = {
"LearnableK": LearnableK,
"LearnableSs": LearnableSs,
"LearnableQ": LearnableQ,
"LearnableC": LearnableC,
"FixedC": FixedC,
"DisabledC": DisabledC,
}
# keys that may be nested serialisable dicts
for key in ("K", "Ss", "Q", "pinn_coefficient_C"):
obj = config.get(key)
if isinstance(obj, dict) and "class_name" in obj:
# turn the JSON blob back into a live object
config[key] = deserialize_keras_object(obj, custom_objects)
return cls(**config)
# ------------------------------------ docstring-------------------------------
TransFlowSubsNet.__doc__ = r"""
Transient Ground-Water–Driven Subsidence Network
TransFlowSubsNet fuses deep-learning encoder–decoder with two
physics losses so that the network **learns** a forecast **and**
honours the governing PDEs at once.
* **Consolidation** loss forces surface settlement :math:`s`
to balance the Laplacian of hydraulic head :math:`h`.
* **Transient ground-water flow** loss constrains head
to obey the diffusivity equation with source/sink term.
Both terms vanish when *pde_mode* switches them off.
See :ref:`User Guide <user_guide_transflowsubsnet>` for a walkthrough.
Parameters
----------
{params.base.static_input_dim}
{params.base.dynamic_input_dim}
{params.base.future_input_dim}
output_subsidence_dim : int, default 1
How many subsidence series are produced at each horizon
step. A multi-well scenario with *n* Digital Leveling
benchmarks would use ``n``.
output_gwl_dim : int, default 1
How many head series are produced. Use >1 for multi-aquifer
or multi-well settings.
forecast_horizon : int, default 1
Horizon length :math:`H`. The decoder emits :math:`H`
steps; the physics terms are evaluated for every emitted
step.
quantiles : list[float] | None, default None
Optional list of quantile levels; enables the
Quantile-Distribution head.
{params.base.embed_dim}
{params.base.hidden_units}
{params.base.lstm_units}
{params.base.attention_units}
{params.base.num_heads}
{params.base.dropout_rate}
{params.base.max_window_size}
{params.base.memory_size}
{params.base.scales}
{params.base.multi_scale_agg}
{params.base.final_agg}
{params.base.activation}
{params.base.use_residuals}
{params.base.use_batch_norm}
{params.base.use_vsn}
{params.base.vsn_units}
pde_mode : {{'consolidation', 'gw_flow', 'both', 'none'}}, \
default ``'both'``
Select which PDE residuals participate in the loss:
┌─────────────────┬────────────────────────────────────────┐
│ 'consolidation' │ only :math:`s`–balance term │
│ 'gw_flow' │ only flow equation for :math:`h` │
│ 'both' │ both residuals (recommended) │
│ 'none' │ pure data-driven; behaves like HAL-Net │
└─────────────────┴────────────────────────────────────────┘
K, Ss, Q : float | str | Learnable*, defaults 1e-4, 1e-5, 0.
Hydraulic conductivity :math:`K`, specific storage
:math:`S_s`, and volumetric source/sink :math:`Q`.
Accepted forms:
* **float / int** → fixed numeric.
* ``'learnable'`` → wrap into the corresponding
:class:`LearnableK` / *Ss* / *Q*. Initial seed is taken
from the numeric value given *in the same call* or falls
back to 1e-4 / 1e-5 / 0.
* ``'fixed'`` → force numeric even if
*param_status='learnable'* in
:func:`resolve_gw_coeffs`.
* **Learnable* instance** → forwarded unchanged.
pinn_coefficient_C : float | str | LearnableC | FixedC | DisabledC, \
default ``LearnableC(0.01)``
Coefficient in the consolidation PDE:
.. math:: \partial_t s - C\,\nabla^2 h = 0
* ``float`` – fixed.
* ``'learnable'`` or :class:`LearnableC` – optimised in log-space
to keep :math:`C>0`.
* :class:`DisabledC` – disables consolidation regardless of
*pde_mode*.
gw_flow_coeffs : dict | None, default None
Convenience container overriding *K/Ss/Q* in one go, e.g. ::
gw_flow_coeffs = {{'K': 'learnable',
'Ss': 1e-6,
'Q': 'fixed'}}
Dict entries win over the individual keyword arguments.
mode : {{'pihal_like', 'tft_like'}}, default ``None``
Routing for *future_features*:
* **pihal_like** – decoder gets all :math:`H` rows, encoder none.
* **tft_like** – first *max_window_size* rows to encoder,
next :math:`H` rows to decoder, matching the original
Temporal Fusion Transformer. ``None`` inherits
BaseAttentive default ('tft_like').
objective : {{'hybrid', 'transformer'}}, default ``'hybrid'``
Selects the backbone architecture that processes dynamic-past
and (optionally) known-future covariates before the decoding stage.
* ``'hybrid'`` – **Multi-scale LSTM -> Transformer**.
The encoder first extracts multi-resolution temporal features
with a stack of LSTMs (one per *scale*), then refines these
features with hierarchical/cross attention blocks.
This configuration balances the strong sequence-memory capability
of recurrent networks with the global-context modelling power of
Transformers and is recommended for most tabular time-series data.
* ``'transformer'`` – **Pure Transformer**.
Bypasses the LSTM stack and feeds the embeddings directly into the
attention encoder, resulting in a lightweight, fully self-attention
model. Choose this if your data exhibit long-range dependencies
for which an LSTM adds little benefit, or when you need faster
training/inference at the cost of some short-term pattern capture.
In future release:
Shortcut for common loss presets. Should be recognised:
* ``'nse'`` – Nash–Sutcliffe model-efficiency score.
* ``'rmse'`` – root-mean-square error.
When *None* we will supply losses via :py:meth:`compile`.
attention_levels : str | list[str] | None
Which hierarchical attention outputs are returned when the
model is called with ``training=False``. Use ``'all'`` or a
subset such as ``['scale', 'cross']`` for interpretability.
name : str, default ``"TransFlowSubsNet"``
Model scope as registered in Keras.
**kwargs
Forwarded verbatim to :class:`tf.keras.Model`.
Notes
-----
Physics loss is added **outside** the Keras loss container inside
``train_step``; compile with ``lambda_cons`` and ``lambda_gw`` to
scale them. When any parameter is *learnable* its
:pyattr:`tf.Variable` automatically appears in ``model.trainable_variables``.
See Also
--------
fusionlab.nn.models.HALNet
Purely data-driven encoder–decoder (no physics terms).
fusionlab.nn.pinn.models.PIHALNet
Physics-informed HAL-Net that couples consolidation PDEs
and adds an anomaly module.
fusionlab.nn.pinn.models.PiTGWFlow
Stand-alone PINN that solves 2-D / 3-D transient groundwater-flow
equations without subsidence coupling.
Examples
--------
>>> model = TransFlowSubsNet(
... static_input_dim=3, dynamic_input_dim=8, future_input_dim=4,
... output_subsidence_dim=1, output_gwl_dim=1,
... K='learnable', Ss=1e-5, Q='fixed',
... pde_mode='both', scales=[1, 3], multi_scale_agg='concat'
... )
>>> batch = {{
... "static_features": tf.zeros([8, 3]),
... "dynamic_features": tf.zeros([8, 12, 8]),
... "future_features": tf.zeros([8, 6, 4]),
... "coords": tf.zeros([8, 6, 3]),
... }}
>>> pred = model(batch, training=False)
>>> list(pred)
['subs_pred', 'gwl_pred', 'subs_pred_mean', 'gwl_pred_mean']
""".format (params =_param_docs)