Source code for fusionlab.nn.pinn._gw_models

# -*- coding: utf-8 -*-
# Author: LKouadio <etanoyau@gmail.com>
# License: BSD-3-Clause

""" Physics-informed for Groundwater flow modeling."""

from __future__ import annotations
from typing import Optional, Union, Dict,List, Tuple, Any  

from ..._fusionlog import fusionlog 
from ...api.property import NNLearner
from ...core.handlers import columns_manager 
from ...params import (
    LearnableK, LearnableSs, LearnableQ, resolve_physical_param
)
from ...utils.deps_utils import ensure_pkg 

from .. import KERAS_DEPS, KERAS_BACKEND, dependency_message
from .._tensor_validation import validate_tensors
from ..comp_utils import resolve_gw_coeffs 
from .utils import extract_txy_in 

Tensor = KERAS_DEPS.Tensor 
GradientTape =KERAS_DEPS.GradientTape
Dense =KERAS_DEPS.Dense 
Model =KERAS_DEPS.Model 
InputLayer= KERAS_DEPS.InputLayer
Sequential =KERAS_DEPS.Sequential 
Adam =KERAS_DEPS.Adam

tf_concat =KERAS_DEPS.concat
tf_reduce_mean=KERAS_DEPS.reduce_mean
tf_square =KERAS_DEPS.square
tf_shape =KERAS_DEPS.shape 
tf_reshape = KERAS_DEPS.reshape 
    

DEP_MSG = dependency_message('nn.pinn') 

logger = fusionlog().get_fusionlab_logger(__name__)

__all__=['PiTGWFlow']


[docs] class PiTGWFlow(Model, NNLearner): """Physics-Informed Transient Groundwater Flow. A self-contained Physics-Informed Neural Network (PINN) designed to solve the 2D/3D transient groundwater flow equation. This model leverages a simple Multi-Layer Perceptron (MLP) to approximate the hydraulic head :math:`h` as a continuous function of time and space, :math:`h = h(t, x, y)`. The model is trained by minimizing the residual of the governing PDE at a set of collocation points, rather than by fitting to observed data. The core of the model is its custom training and evaluation steps, which compute the necessary spatial and temporal derivatives to enforce the physical law. The governing equation solved by this PINN is: .. math:: S_s \\frac{\partial h}{\partial t} - K \\left( \\frac{\partial^2 h}{\partial x^2} + \\frac{\partial^2 h}{\partial y^2} \\right) - Q = 0 Where :math:`K` is the hydraulic conductivity, :math:`S_s` is the specific storage, and :math:`Q` is a source/sink term. These physical parameters can be set as fixed constants or as trainable variables. Parameters ---------- hidden_units : int or list of int, optional Defines the architecture of the internal MLP. - If an **int**, three hidden layers of that size are created. - If a **list**, its first three values are used as the sizes for the three hidden layers. Defaults to ``[32, 32, 32]``. activation : str, default 'tanh' The activation function to use in the hidden layers of the MLP. 'tanh' is often recommended for PINNs. learning_rate : float, default 1e-3 The learning rate for the Adam optimizer used to train the network weights and any learnable physical parameters. K : float or LearnableK, default 1.0 The hydraulic conductivity. Can be provided as a fixed Python float or as a ``LearnableK`` instance to be optimized during training. Ss : float or LearnableSs, default 1e-4 The specific storage. Can be provided as a fixed Python float or as a ``LearnableSs`` instance to be optimized during training. Q : float or LearnableQ, default 0.0 The volumetric source/sink term. Can be provided as a fixed Python float or as a ``LearnableQ`` instance to be optimized during training. gw_coeffs_config : dict, optional A dictionary to conveniently configure physical parameters. If provided, its values will supersede the individual ``K``, ``Ss``, and ``Q`` arguments. For example:: gw_coeffs_config = {'K': LearnableK(0.5), 'Ss': 1e-5} name : str, default 'PiTGWFlow' The name of the Keras model. **kwargs Additional keyword arguments passed to the parent ``tf.keras.Model`` constructor. Notes ----- This model is a pure physics solver, meaning its loss function is derived entirely from the PDE residual. It does not perform data fitting in the traditional sense. Consequently, the ``y_true`` component of a dataset provided to ``fit()`` or ``evaluate()`` is ignored. The model implements custom ``train_step`` and ``test_step`` methods. Within these steps, ``tf.GradientTape`` is used to calculate the first and second-order derivatives of the network's output (:math:`h`) with respect to its inputs (:math:`t, x, y`). This is necessary to compute the PDE residual, which forms the loss. When a physical parameter is made learnable (e.g., by passing ``K=LearnableK(...)``), it is automatically added to the model's list of trainable variables and is updated via backpropagation to minimize the physics loss. See Also -------- fusionlab.nn.pinn.TransFlowSubsNet : A more complex, hybrid data-physics model that couples groundwater flow with land subsidence and is trained on both physical laws and observed data. Examples -------- >>> import tensorflow as tf >>> from fusionlab.nn.pinn import PiTGWFlow >>> from fusionlab.params import LearnableK, LearnableSs ... >>> # 1. Instantiate the model with a learnable K and a fixed Ss >>> model = PiTGWFlow( ... hidden_units=[64, 64, 64], ... K=LearnableK(initial_value=0.5), ... Ss=1e-5, ... Q=0.0 ... ) ... >>> # 2. Create a dataset of collocation points (t, x, y) >>> n_points = 1000 >>> coords = { ... "t": tf.random.uniform((n_points, 1)), ... "x": tf.random.uniform((n_points, 1)), ... "y": tf.random.uniform((n_points, 1)), ... } >>> # Dummy targets are needed for the Keras API, but are ignored >>> dummy_y = tf.zeros((n_points, 1)) >>> dataset = tf.data.Dataset.from_tensor_slices((coords, dummy_y)).batch(32) ... >>> # 3. Compile and train the model >>> # The optimizer is taken from the model instance. >>> model.compile() >>> history = model.fit(dataset, epochs=10) ... >>> # The history will contain the physics-based loss >>> print(list(history.history.keys())) ['pde_loss'] """
[docs] @ensure_pkg(KERAS_BACKEND or "keras", extra=DEP_MSG) def __init__( self, hidden_units: Optional[Union[int, List[int]]] = None, activation: str = "tanh", learning_rate: float = 1e-3, K: Union[float, LearnableK] = 1.0, Ss: Union[float, LearnableSs] = 1e-4, Q: Union[float, LearnableQ] = 0.0, gw_coeffs_config : Dict[str, Any]=None, name: str = "PiTGWFlow", **kwargs ): super().__init__(name=name, **kwargs) # --- Store configuration and physical parameters --- if hidden_units is None: hidden_units = [32, 32, 32] self.hidden_units_config = hidden_units if isinstance(self.hidden_units_config, (float, int)): self.hidden_units = [int(self.hidden_units_config)] * 3 else: temp_units = columns_manager(self.hidden_units_config) self.hidden_units = (temp_units * 3)[:3] self.activation = activation self.learning_rate = learning_rate self.gw_coeffs_config = gw_coeffs_config K, Ss, Q = resolve_gw_coeffs( gw_flow_coeffs= gw_coeffs_config, K=K, Ss =Ss, Q =Q, param_status="fixed", ) # collected the fixed values even Learnable parameters # The learnable param should be create with resolve_physical_param self.K_config = K self.Ss_config = Ss self.Q_config = Q # Create tf.Variables for learnable parameters self.K = resolve_physical_param( self.K_config, name="param_K", status="learnable") self.Ss = resolve_physical_param( self.Ss_config, name="param_Ss", status="learnable") self.Q = resolve_physical_param( self.Q_config, name="param_Q", status="learnable") # --- Build the neural network (MLP) --- layers = [InputLayer(input_shape=(3,))] for units, name in zip (self.hidden_units, ['K', 'Ss', 'Q']): layers.append(Dense(units, activation=self.activation, name=f"param_{name}")) layers.append(Dense(1, activation="linear", name="h_pred")) self.coord_mlp = Sequential(layers, name="GWFlow_MLP") # Optimizer self.optimizer = Adam(learning_rate=self.learning_rate)
[docs] def call( self, inputs: Union[Tensor, Dict[str, Tensor]], training: bool = False ) -> Tensor: """Performs the forward pass of the network. This method maps the input spatio-temporal coordinates :math:`(t, x, y)` to the predicted hydraulic head :math:`h`. It is designed to be flexible, accepting coordinates in several formats. Parameters ---------- inputs : tf.Tensor or dict The input coordinates. Supported formats include: - A dictionary with keys ``'t'``, ``'x'``, and ``'y'``, where each value is a tensor of shape ``(batch, time_steps, 1)``. - A single concatenated tensor of shape ``(batch, time_steps, 3)``. training : bool, optional Indicates whether the model is in training mode. This is passed to internal Keras layers. Defaults to ``False``. Returns ------- tf.Tensor A tensor of shape ``(batch, time_steps, 1)`` containing the predicted hydraulic head :math:`h` at each input coordinate. """ t_coords, x_coords, y_coords = extract_txy_in(inputs, verbose=0) validate_tensors(t_coords, x_coords, y_coords, last_dim=1, check_N=True) coords_tensor = tf_concat([t_coords, x_coords, y_coords], axis=-1) original_shape = tf_shape(coords_tensor) num_features = original_shape[-1] # Reshape from (batch, time, 3) to (batch * time, 3) for MLP reshaped_coords = tf_reshape(coords_tensor, [-1, num_features]) h_pred_flat = self.coord_mlp(reshaped_coords, training=training) # Reshape output back to (batch, time, 1) output_shape = [original_shape[0], original_shape[1], 1] return tf_reshape(h_pred_flat, output_shape)
[docs] def train_step( self, data: Tuple[Dict[str, Tensor], Any] ) -> Dict[str, Tensor]: """Defines the logic for a single optimization step. This method overrides the default Keras training behavior to implement the physics-informed loss. In each step, it computes the residual of the governing PDE and uses it as the loss to update all trainable variables, including both the network weights and any learnable physical parameters. The process involves: 1. Computing derivatives of the output :math:`h` with respect to inputs :math:`(t, x, y)` using ``tf.GradientTape``. 2. Assembling the PDE residual using these derivatives. 3. Calculating the mean squared error of the residual. 4. Applying gradients to update the model's variables. Parameters ---------- data : tuple A tuple of ``(inputs, targets)`` as provided by a ``tf.data.Dataset``. The ``inputs`` are a dictionary of coordinate tensors, while the ``targets`` are ignored as the loss is unsupervised. Returns ------- dict A dictionary mapping the metric name ``'pde_loss'`` to its scalar value for this training step. """ """Custom training step to minimize the PDE residual.""" coords_batch, y_true = data # y_true is unused, for compatibility with GradientTape(persistent=True) as tape: # Extract coordinates and watch them for gradient computation t, x, y = extract_txy_in(coords_batch) tape.watch([t, x, y]) # Combine coordinates for the forward pass coords_for_model = tf_concat([t, x, y], axis=-1) # --- FORWARD PASS --- # h_pred must be computed inside the tape's context h_pred = self(coords_for_model, training=True) tape.watch(h_pred) # --- PDE RESIDUAL CALCULATION --- # First-order derivatives dh_dt = tape.gradient(h_pred, t) dh_dx = tape.gradient(h_pred, x) dh_dy = tape.gradient(h_pred, y) # Second-order derivatives d2h_dx2 = tape.gradient(dh_dx, x) d2h_dy2 = tape.gradient(dh_dy, y) # Validate gradients if any(g is None for g in [dh_dt, d2h_dx2, d2h_dy2]): raise ValueError( "Failed to compute one or more PDE gradients. " "Ensure t, x, y inputs influence the model output." ) # Assemble the PDE residual laplacian_h = d2h_dx2 + d2h_dy2 residual = ( self.Ss.get_value() * dh_dt) - ( self.K.get_value() * laplacian_h) - self.Q.get_value() # The loss is the mean of the squared residuals. pde_loss = tf_reduce_mean(tf_square(residual)) # --- APPLY GRADIENTS --- trainable_vars = self.trainable_variables grads = tape.gradient(pde_loss, trainable_vars) del tape # Clean up the persistent tape self.optimizer.apply_gradients(zip(grads, trainable_vars)) # --- METRICS & RETURN --- if self.compiled_metrics: self.compiled_metrics.update_state(y_true, residual) results = {"pde_loss": pde_loss} for m in self.metrics: results[m.name] = m.result() return results
[docs] def test_step( self, data: Tuple[Dict[str, Tensor], Any] ) -> Dict[str, Tensor]: """Defines the logic for a single evaluation step. This method overrides the default Keras evaluation behavior to report the model's physics fidelity. It calculates the PDE residual on the validation or test data but does not perform any weight updates. This is crucial for assessing how well the model adheres to the physical laws on unseen collocation points. Parameters ---------- data : tuple A tuple of ``(inputs, targets)`` from the validation or test dataset. The ``inputs`` are a dictionary of coordinate tensors; the ``targets`` are ignored. Returns ------- dict A dictionary mapping the metric name ``'pde_loss'`` to its scalar value for the evaluation step. """ coords_batch, y_true = data with GradientTape(persistent=True) as tape: t, x, y = extract_txy_in(coords_batch) tape.watch([t, x, y]) coords_for_model = tf_concat([t, x, y], axis=-1) h_pred = self(coords_for_model, training=False) tape.watch(h_pred) dh_dt = tape.gradient(h_pred, t) dh_dx = tape.gradient(h_pred, x) dh_dy = tape.gradient(h_pred, y) d2h_dx2 = tape.gradient(dh_dx, x) d2h_dy2 = tape.gradient(dh_dy, y) del tape if any(g is None for g in [dh_dt, d2h_dx2, d2h_dy2]): raise ValueError("Failed to compute gradients during validation.") laplacian_h = d2h_dx2 + d2h_dy2 residual = (self.Ss.get_value() * dh_dt) - ( self.K.get_value() * laplacian_h) - self.Q.get_value() pde_loss = tf_reduce_mean(tf_square(residual)) if self.compiled_metrics: self.compiled_metrics.update_state(y_true, residual) results = {"pde_loss": pde_loss} for m in self.metrics: results[m.name] = m.result() return results
[docs] def get_config(self) -> dict: """Returns the serializable configuration of the model. This method allows the model to be saved and re-instantiated without losing its architectural and parameter setup. It captures all initialization arguments, including the configuration of the MLP and the physical parameters. Returns ------- dict A dictionary containing all constructor parameters needed to recreate the model instance. Learnable parameter objects (e.g., ``LearnableK``) are serialized as part of this config. """ base_config = super().get_config() config = { "hidden_units": self.hidden_units_config, "activation": self.activation, "learning_rate": self.learning_rate, "K": self.K_config, "Ss": self.Ss_config, "Q": self.Q_config, "gw_coeffs_config": self.gw_coeffs_config } base_config.update(config) return base_config
[docs] @classmethod def from_config(cls, config: dict, custom_objects=None) -> "PiTGWFlow": """Creates a model instance from its configuration dictionary. This class method is the counterpart to ``get_config`` and is used by Keras's loading utilities to reconstruct a model from its saved configuration. Parameters ---------- config : dict The configuration dictionary, typically generated by ``get_config()``. custom_objects : dict, optional A dictionary mapping names to custom classes or functions. Keras uses this to deserialize custom objects like ``LearnableK``. Returns ------- PiTGWFlow A new instance of the ``PiTGWFlow`` model. """ return cls(**config)